PHOENIX-2967 CSV BulkLoad should properly handle empty family for logical tables.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b538c1a9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b538c1a9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b538c1a9 Branch: refs/heads/4.x-HBase-1.0 Commit: b538c1a95c7b8c589aa1e42cab9e7a7967d6f3e2 Parents: 9b8f274 Author: Sergey Soldatov <s...@apache.org> Authored: Mon Jun 6 14:30:04 2016 -0700 Committer: Sergey Soldatov <s...@apache.org> Committed: Mon Jun 6 23:39:54 2016 -0700 ---------------------------------------------------------------------- .../mapreduce/FormatToBytesWritableMapper.java | 15 +++++++--- .../mapreduce/FormatToKeyValueReducer.java | 30 +++++++------------- 2 files changed, 22 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b538c1a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index eb0e3ed..a736fc4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -44,11 +44,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.UpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,12 +218,17 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri if (c.getFamilyName() != null) // Skip PK column family = c.getFamilyName().getBytes(); byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family,":".getBytes(), name); + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); if (!columnIndexes.containsKey(cfn)) { columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } } + byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); + byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, + QueryConstants.EMPTY_COLUMN_BYTES); + columnIndexes.put(cfn, new Integer(columnIndex)); + columnIndex++; } } @@ -232,16 +239,16 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri * @param cell KeyValue for the column * @return column index for the specified cell or -1 if was not found */ - private int findIndex(Cell cell) { + private int findIndex(Cell cell) throws IOException { byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - byte[] cfn = Bytes.add(familyName, ":".getBytes(), name); + byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); if(columnIndexes.containsKey(cfn)) { return columnIndexes.get(cfn); } - return -1; + throw new IOException("Unable to map cell to column index"); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/b538c1a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index aa807c4..15d6d2f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; @@ -65,7 +64,6 @@ public class FormatToKeyValueReducer protected List<String> logicalNames; protected KeyValueBuilder builder; private Map<Integer, Pair<byte[], byte[]>> columnIndexes; - private Map<String, ImmutableBytesPtr> emptyFamilyName; @Override @@ -91,13 +89,11 @@ public class FormatToKeyValueReducer } private void initColumnsMap(PhoenixConnection conn) throws SQLException { - Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR); - emptyFamilyName = new HashMap<>(); + Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR); columnIndexes = new HashMap<>(); int columnIndex = 0; - for(int index = 0; index < logicalNames.size(); index++) { + for (int index = 0; index < logicalNames.size(); index++) { PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); - emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table)); List<PColumn> cls = table.getColumns(); for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); @@ -106,7 +102,7 @@ public class FormatToKeyValueReducer family = c.getFamilyName().getBytes(); } byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family,":".getBytes(), name); + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); Pair<byte[], byte[]> pair = new Pair(family, name); if (!indexMap.containsKey(cfn)) { indexMap.put(cfn, new Integer(columnIndex)); @@ -114,6 +110,11 @@ public class FormatToKeyValueReducer columnIndex++; } } + byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); + Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants + .EMPTY_COLUMN_BYTES); + columnIndexes.put(new Integer(columnIndex), pair); + columnIndex++; } } @@ -131,18 +132,9 @@ public class FormatToKeyValueReducer ImmutableBytesWritable family; ImmutableBytesWritable name; ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR; - if (index == -1) { - family = emptyFamilyName.get(key.getTableName()); - name = QueryConstants.EMPTY_COLUMN_BYTES_PTR; - } else { - Pair<byte[], byte[]> pair = columnIndexes.get(index); - if(pair.getFirst() != null) { - family = new ImmutableBytesWritable(pair.getFirst()); - } else { - family = emptyFamilyName.get(key.getTableName()); - } - name = new ImmutableBytesWritable(pair.getSecond()); - } + Pair<byte[], byte[]> pair = columnIndexes.get(index); + family = new ImmutableBytesWritable(pair.getFirst()); + name = new ImmutableBytesWritable(pair.getSecond()); int len = WritableUtils.readVInt(input); if (len > 0) { byte[] array = new byte[len];