PHOENIX-2925 CsvBulkloadTool not working properly if there are multiple local 
indexes to the same table(After PHOENIX-1973)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9569cb52
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9569cb52
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9569cb52

Branch: refs/heads/4.x-HBase-0.98
Commit: 9569cb5212da949763742b6106314a1a07b0f771
Parents: 09747fc
Author: Sergey Soldatov <sergey.solda...@gmail.com>
Authored: Mon May 23 02:22:45 2016 -0700
Committer: Sergey Soldatov <sergey.solda...@gmail.com>
Committed: Mon May 23 10:38:39 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/CsvBulkLoadToolIT.java      |   4 +
 .../mapreduce/FormatToBytesWritableMapper.java  | 111 +++++++++----------
 .../mapreduce/FormatToKeyValueReducer.java      |  75 +++++++------
 3 files changed, 100 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9569cb52/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 1e9c1d9..8968555 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -277,6 +277,10 @@ public class CsvBulkLoadToolIT extends 
BaseOwnClusterHBaseManagedTimeIT {
         assertEquals(2, rs.getInt(1));
         assertEquals("FirstName 2", rs.getString(2));
 
+        rs = stmt.executeQuery("SELECT LAST_NAME FROM TABLE6  where 
last_name='LastName 1'");
+        assertTrue(rs.next());
+        assertEquals("LastName 1", rs.getString(1));
+
         rs.close();
         stmt.close();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9569cb52/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index ff21d4f..eb0e3ed 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
@@ -108,7 +107,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
     /*
     lookup table for column index. Index in the List matches to the index in 
tableNames List
      */
-    protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+    protected Map<byte[], Integer> columnIndexes;
 
     protected abstract UpsertExecutor<RECORD,?> 
buildUpsertExecutor(Configuration conf);
     protected abstract LineParser<RECORD> getLineParser();
@@ -135,7 +134,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
             tableNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
             logicalNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
 
-            columnIndexes = initColumnIndexes();
+            initColumnIndexes();
         } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
@@ -193,7 +192,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
                 int tableIndex = rowEntry.getKey();
                 List<KeyValue> lkv = rowEntry.getValue();
                 // All KV values combines to a single byte array
-                writeAggregatedRow(context, tableIndex, lkv);
+                writeAggregatedRow(context, tableNames.get(tableIndex), lkv);
             }
             conn.rollback();
         } catch (Exception e) {
@@ -201,99 +200,99 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
         }
     }
 
-    private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws 
SQLException {
-        List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
-        int tableIndex;
-        for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
-            PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(tableIndex));
-            Map<byte[], Map<byte[], Integer>> columnMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+    /*
+    Map all unique pairs <family, name>  to index. Table name is part of 
TableRowkey, so we do
+    not care about it
+     */
+    private void initColumnIndexes() throws SQLException {
+        columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+        int columnIndex = 0;
+        for(int index = 0; index < logicalNames.size(); index++) {
+            PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(index));
             List<PColumn> cls = table.getColumns();
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
-                if (c.getFamilyName() == null) continue; // Skip PK column
-                byte[] family = c.getFamilyName().getBytes();
+                byte[] family = new byte[0];
+                if (c.getFamilyName() != null)  // Skip PK column
+                    family = c.getFamilyName().getBytes();
                 byte[] name = c.getName().getBytes();
-                if (!columnMap.containsKey(family)) {
-                    columnMap.put(family, new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR));
+                byte[] cfn = Bytes.add(family,":".getBytes(), name);
+                if (!columnIndexes.containsKey(cfn)) {
+                    columnIndexes.put(cfn, new Integer(columnIndex));
+                    columnIndex++;
                 }
-                Map<byte[], Integer> qualifier = columnMap.get(family);
-                qualifier.put(name, i);
             }
-            tableMap.add(columnMap);
         }
-        return tableMap;
     }
 
     /**
      * Find the column index which will replace the column name in
      * the aggregated array and will be restored in Reducer
      *
-     * @param tableIndex Table index in tableNames list
      * @param cell       KeyValue for the column
      * @return column index for the specified cell or -1 if was not found
      */
-    private int findIndex(int tableIndex, Cell cell) {
-        Map<byte[], Map<byte[], Integer>> columnMap = 
columnIndexes.get(tableIndex);
-        Map<byte[], Integer> qualifiers = 
columnMap.get(Bytes.copy(cell.getFamilyArray(),
-                cell.getFamilyOffset(), cell.getFamilyLength()));
-        if (qualifiers != null) {
-            Integer result = 
qualifiers.get(Bytes.copy(cell.getQualifierArray(),
-                    cell.getQualifierOffset(), cell.getQualifierLength()));
-            if (result != null) {
-                return result;
-            }
+    private int findIndex(Cell cell) {
+        byte[] familyName = Bytes.copy(cell.getFamilyArray(), 
cell.getFamilyOffset(),
+                cell.getFamilyLength());
+        byte[] name = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                cell.getQualifierLength());
+        byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+        if(columnIndexes.containsKey(cfn)) {
+            return columnIndexes.get(cfn);
         }
         return -1;
     }
 
     /**
-     * Collect all column values for the same rowKey
+     * Collect all column values for the same Row. RowKey may be different if 
indexes are involved,
+     * so it writes a separate record for each unique RowKey
      *
      * @param context    Current mapper context
-     * @param tableIndex Table index in tableNames list
+     * @param tableName Table index in tableNames list
      * @param lkv        List of KV values that will be combined in a single 
ImmutableBytesWritable
      * @throws IOException
      * @throws InterruptedException
      */
 
-    private void writeAggregatedRow(Context context, int tableIndex, 
List<KeyValue> lkv)
+    private void writeAggregatedRow(Context context, String tableName, 
List<KeyValue> lkv)
             throws IOException, InterruptedException {
         ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
         DataOutputStream outputStream = new DataOutputStream(bos);
-        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+        ImmutableBytesWritable outputKey =null;
         if (!lkv.isEmpty()) {
-            // All Key Values for the same row are supposed to be the same, so 
init rowKey only once
-            Cell first = lkv.get(0);
-            outputKey.set(first.getRowArray(), first.getRowOffset(), 
first.getRowLength());
             for (KeyValue cell : lkv) {
-                if (isEmptyCell(cell)) {
-                    continue;
-                }
-                int i = findIndex(tableIndex, cell);
-                if (i == -1) {
-                    throw new IOException("No column found for KeyValue");
+                if (outputKey == null || Bytes.compareTo(outputKey.get(), 
outputKey.getOffset(),
+                        outputKey.getLength(), cell.getRowArray(), 
cell.getRowOffset(), cell
+                                .getRowLength()) != 0) {
+                    // This a the first RowKey or a different from previous
+                    if (outputKey != null) { //It's a different RowKey, so we 
need to write it
+                        ImmutableBytesWritable aggregatedArray =
+                                new ImmutableBytesWritable(bos.toByteArray());
+                        outputStream.close();
+                        context.write(new TableRowkeyPair(tableName, 
outputKey), aggregatedArray);
+                    }
+                    outputKey = new ImmutableBytesWritable(cell.getRowArray(), 
cell.getRowOffset()
+                            , cell.getRowLength());
+                    bos = new ByteArrayOutputStream(1024);
+                    outputStream = new DataOutputStream(bos);
                 }
-                WritableUtils.writeVInt(outputStream, i);
+                /*
+                The order of aggregation: type, index of column, length of 
value, value itself
+                 */
                 outputStream.writeByte(cell.getTypeByte());
+                int i = findIndex(cell);
+                WritableUtils.writeVInt(outputStream, i);
                 WritableUtils.writeVInt(outputStream, cell.getValueLength());
                 outputStream.write(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
+
             }
+            ImmutableBytesWritable aggregatedArray = new 
ImmutableBytesWritable(bos.toByteArray());
+            outputStream.close();
+            context.write(new TableRowkeyPair(tableName, outputKey), 
aggregatedArray);
         }
-        ImmutableBytesWritable aggregatedArray = new 
ImmutableBytesWritable(bos.toByteArray());
-        outputStream.close();
-        context.write(new TableRowkeyPair(tableNames.get(tableIndex), 
outputKey), aggregatedArray);
     }
 
-    protected boolean isEmptyCell(KeyValue cell) {
-        if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
-                cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 
0,
-                QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
-            return false;
-        else
-            return true;
-    }
-
-
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
         try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9569cb52/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 799b3dc..aa807c4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -21,16 +21,18 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -42,7 +44,6 @@ import 
org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -63,8 +64,8 @@ public class FormatToKeyValueReducer
     protected List<String> tableNames;
     protected List<String> logicalNames;
     protected KeyValueBuilder builder;
-    List<List<Pair<byte[], byte[]>>> columnIndexes;
-    List<ImmutableBytesPtr> emptyFamilyName;
+    private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
+    private Map<String, ImmutableBytesPtr> emptyFamilyName;
 
 
     @Override
@@ -76,7 +77,6 @@ public class FormatToKeyValueReducer
         for (Map.Entry<String, String> entry : conf) {
             clientInfos.setProperty(entry.getKey(), entry.getValue());
         }
-
         try {
             PhoenixConnection conn = (PhoenixConnection) 
QueryUtil.getConnectionOnServer(clientInfos, conf);
             builder = conn.getKeyValueBuilder();
@@ -84,9 +84,6 @@ public class FormatToKeyValueReducer
             final String logicalNamesConf = 
conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
             tableNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
             logicalNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
-            columnIndexes = new ArrayList<>(tableNames.size());
-            emptyFamilyName = new ArrayList<>();
             initColumnsMap(conn);
         } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
@@ -94,24 +91,30 @@ public class FormatToKeyValueReducer
     }
 
     private void initColumnsMap(PhoenixConnection conn) throws SQLException {
-        for (String tableName : logicalNames) {
-            PTable table = PhoenixRuntime.getTable(conn, tableName);
-            emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
+        Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+        emptyFamilyName = new HashMap<>();
+        columnIndexes = new HashMap<>();
+        int columnIndex = 0;
+        for(int index = 0; index < logicalNames.size(); index++) {
+            PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(index));
+            emptyFamilyName.put(tableNames.get(index), 
SchemaUtil.getEmptyColumnFamilyPtr(table));
             List<PColumn> cls = table.getColumns();
-            List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
-                if (c.getFamilyName() == null) {
-                    list.add(null); // Skip PK column
-                    continue;
+                byte[] family = new byte[0];
+                if (c.getFamilyName() != null) {
+                    family = c.getFamilyName().getBytes();
                 }
-                byte[] family = c.getFamilyName().getBytes();
                 byte[] name = c.getName().getBytes();
-                list.add(new Pair(family, name));
+                byte[] cfn = Bytes.add(family,":".getBytes(), name);
+                Pair<byte[], byte[]> pair = new Pair(family, name);
+                if (!indexMap.containsKey(cfn)) {
+                    indexMap.put(cfn, new Integer(columnIndex));
+                    columnIndexes.put(new Integer(columnIndex), pair);
+                    columnIndex++;
+                }
             }
-            columnIndexes.add(list);
         }
-
     }
 
     @Override
@@ -119,15 +122,27 @@ public class FormatToKeyValueReducer
                           Reducer<TableRowkeyPair, ImmutableBytesWritable, 
TableRowkeyPair, KeyValue>.Context context)
             throws IOException, InterruptedException {
         TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-        int tableIndex = tableNames.indexOf(key.getTableName());
-        List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
+        ImmutableBytesWritable rowKey = key.getRowkey();
         for (ImmutableBytesWritable aggregatedArray : values) {
             DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(aggregatedArray.get()));
             while (input.available() != 0) {
-                int index = WritableUtils.readVInt(input);
-                Pair<byte[], byte[]> pair = columns.get(index);
                 byte type = input.readByte();
-                ImmutableBytesWritable value = null;
+                int index = WritableUtils.readVInt(input);
+                ImmutableBytesWritable family;
+                ImmutableBytesWritable name;
+                ImmutableBytesWritable value = 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
+                if (index == -1) {
+                    family = emptyFamilyName.get(key.getTableName());
+                    name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
+                } else {
+                    Pair<byte[], byte[]> pair = columnIndexes.get(index);
+                    if(pair.getFirst() != null) {
+                        family = new ImmutableBytesWritable(pair.getFirst());
+                    } else {
+                        family = emptyFamilyName.get(key.getTableName());
+                    }
+                    name = new ImmutableBytesWritable(pair.getSecond());
+                }
                 int len = WritableUtils.readVInt(input);
                 if (len > 0) {
                     byte[] array = new byte[len];
@@ -138,24 +153,16 @@ public class FormatToKeyValueReducer
                 KeyValue.Type kvType = KeyValue.Type.codeToType(type);
                 switch (kvType) {
                     case Put: // not null value
-                        kv = builder.buildPut(key.getRowkey(),
-                                new ImmutableBytesWritable(pair.getFirst()),
-                                new ImmutableBytesWritable(pair.getSecond()), 
value);
+                        kv = builder.buildPut(key.getRowkey(), family, name, 
value);
                         break;
                     case DeleteColumn: // null value
-                        kv = builder.buildDeleteColumns(key.getRowkey(),
-                                new ImmutableBytesWritable(pair.getFirst()),
-                                new ImmutableBytesWritable(pair.getSecond()));
+                        kv = builder.buildDeleteColumns(key.getRowkey(), 
family, name);
                         break;
                     default:
                         throw new IOException("Unsupported KeyValue type " + 
kvType);
                 }
                 map.add(kv);
             }
-            KeyValue empty = builder.buildPut(key.getRowkey(),
-                    emptyFamilyName.get(tableIndex),
-                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, 
ByteUtil.EMPTY_BYTE_ARRAY_PTR);
-            map.add(empty);
             Closeables.closeQuietly(input);
         }
         context.setStatus("Read " + map.getClass());

Reply via email to