Repository: phoenix Updated Branches: refs/heads/4.0 fc1862d93 -> 152ce872c
PHOENIX-902 Allow family qualifiers in CSV loader Allow qualifying input column names with the column family. Contributed by James Violette. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/152ce872 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/152ce872 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/152ce872 Branch: refs/heads/4.0 Commit: 152ce872cc76aa1af10f54d92d2e956fc9c81ae4 Parents: fc1862d Author: Gabriel Reid <[email protected]> Authored: Sat Aug 9 22:37:53 2014 +0200 Committer: Gabriel Reid <[email protected]> Committed: Sat Aug 9 22:39:38 2014 +0200 ---------------------------------------------------------------------- .../apache/phoenix/util/CSVCommonsLoader.java | 83 +++++++++++++++----- 1 file changed, 65 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/152ce872/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java index 494696b..ef4375c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java @@ -17,18 +17,10 @@ */ package org.apache.phoenix.util; -import java.io.File; -import java.io.Reader; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; @@ -40,9 +32,19 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.File; +import java.io.Reader; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; /*** * Upserts CSV data using Phoenix JDBC connection @@ -119,7 +121,7 @@ public class CSVCommonsLoader { /** * default settings - * delimiter = ',' + * delimiter = ',' * quoteChar = '"', * escape = null * recordSeparator = CRLF, CR, or LF @@ -281,6 +283,8 @@ public class CSVCommonsLoader { String tableName, List<String> columns, boolean strict) throws SQLException { Map<String, Integer> columnNameToTypeMap = Maps.newLinkedHashMap(); + Set<String> ambiguousColumnNames = new HashSet<String>(); + Map<String, Integer> fullColumnNameToTypeMap = Maps.newLinkedHashMap(); DatabaseMetaData dbmd = conn.getMetaData(); int unfoundColumnCount = 0; // TODO: escape wildcard characters here because we don't want that @@ -294,9 +298,22 @@ public class CSVCommonsLoader { (schemaAndTable.length == 1 ? escapedTableName : schemaAndTable[1]), null); while (rs.next()) { + String colName = rs.getString(QueryUtil.COLUMN_NAME_POSITION); + String colFam = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION); + + // use family qualifier, if available, otherwise, use column name + String fullColumn = (colFam==null?colName:String.format("%s.%s",colFam,colName)); String sqlTypeName = rs.getString(QueryUtil.DATA_TYPE_NAME_POSITION); + + // allow for both bare and family qualified names. + if (columnNameToTypeMap.keySet().contains(colName)) { + ambiguousColumnNames.add(colName); + } columnNameToTypeMap.put( - rs.getString(QueryUtil.COLUMN_NAME_POSITION), + colName, + PDataType.fromSqlTypeName(sqlTypeName).getSqlType()); + fullColumnNameToTypeMap.put( + fullColumn, PDataType.fromSqlTypeName(sqlTypeName).getSqlType()); } if (columnNameToTypeMap.isEmpty()) { @@ -308,8 +325,10 @@ public class CSVCommonsLoader { } } List<ColumnInfo> columnInfoList = Lists.newArrayList(); + Set<String> unresolvedColumnNames = new TreeSet<String>(); if (columns == null) { - for (Map.Entry<String, Integer> entry : columnNameToTypeMap + // use family qualified names by default, if no columns are specified. + for (Map.Entry<String, Integer> entry : fullColumnNameToTypeMap .entrySet()) { columnInfoList.add(new ColumnInfo(entry.getKey(), entry.getValue())); } @@ -317,7 +336,35 @@ public class CSVCommonsLoader { // Leave "null" as indication to skip b/c it doesn't exist for (int i = 0; i < columns.size(); i++) { String columnName = columns.get(i).trim(); - Integer sqlType = columnNameToTypeMap.get(columnName); + Integer sqlType = null; + if (fullColumnNameToTypeMap.containsKey(columnName)) { + sqlType = fullColumnNameToTypeMap.get(columnName); + } else if (columnNameToTypeMap.containsKey(columnName)) { + if (ambiguousColumnNames.contains(columnName)) { + unresolvedColumnNames.add(columnName); + } + // fall back to bare column name. + sqlType = columnNameToTypeMap.get(columnName); + } + if (unresolvedColumnNames.size()>0) { + StringBuilder exceptionMessage = new StringBuilder(); + boolean first = true; + exceptionMessage.append("Unable to resolve these column names to a single column family:\n"); + for (String col : unresolvedColumnNames) { + if (first) first = false; + else exceptionMessage.append(","); + exceptionMessage.append(col); + } + exceptionMessage.append("\nAvailable columns with column families:\n"); + first = true; + for (String col : fullColumnNameToTypeMap.keySet()) { + if (first) first = false; + else exceptionMessage.append(","); + exceptionMessage.append(col); + } + throw new SQLException(exceptionMessage.toString()); + } + if (sqlType == null) { if (strict) { throw new SQLExceptionInfo.Builder(
