virajjasani commented on a change in pull request #1144:
URL: https://github.com/apache/phoenix/pull/1144#discussion_r579813022



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[] 
emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues 
list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<Cell> keyValues) {

Review comment:
       nit: `"s/setVerfied/setVerified"`

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;

Review comment:
       nit: good to mark all `final`

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -318,6 +335,27 @@ public void testImportWithIndex() throws Exception {
 
         rs.close();
         stmt.close();
+
+        checkIndexTableIsVerfied("TABLE3_IDX");
+    }
+
+    private void checkIndexTableIsVerfied(String indexTableName) throws 
SQLException, IOException {

Review comment:
       nit: `"s/checkIndexTableIsVerfied/checkIndexTableIsVerified"`

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[] 
emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues 
list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<Cell> keyValues) {
+            for(int i=0; i < keyValues.size() ; i++) {
+                Cell kv = keyValues.get(i);

Review comment:
       can be replaced by
   ```
   for (Cell kv : keyValues) {
   ```

##########
File path: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -36,16 +39,30 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;

Review comment:
       Few unused imports, nothing urgent, can be removed while committing 
changes

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context 
context) throws IOExcep
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<Cell>> kvPair = 
uncommittedDataIterator.next();
                 List<Cell> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), 
keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, 
keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), 
first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), 
tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<Cell>());
                         }
-                        List<Cell> list = map.get(i);
+                        List<Cell> cellsForTable = map.get(i);
+                        if(indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
+                        }

Review comment:
       This should solve the problem right?

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context 
context) throws IOExcep
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<Cell>> kvPair = 
uncommittedDataIterator.next();
                 List<Cell> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), 
keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, 
keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), 
first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), 
tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<Cell>());
                         }
-                        List<Cell> list = map.get(i);
+                        List<Cell> cellsForTable = map.get(i);
+                        if(indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
+                        }
                         for (Cell kv : keyValueList) {
-                            list.add(kv);
+                            cellsForTable.add(kv);
                         }

Review comment:
       nit: can be replaced with `cellsForTable.addAll(keyValueList)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to