stoty commented on a change in pull request #1144:
URL: https://github.com/apache/phoenix/pull/1144#discussion_r579992339
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -36,16 +39,30 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;
Review comment:
OK
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
ImportPreUpsertKeyValueProcessor {
@Override
- public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+ public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
return keyValues;
}
}
+
+ /**
+ * Updates the Empty cell value to VERIFIED for global index table rows
+ */
+ private static class IndexStatusUpdater {
+
+ private byte[] emptyKeyValueCF;
+ private int emptyKeyValueCFLength;
+ private byte[] emptyKeyValueQualifier;
+ private int emptyKeyValueQualifierLength;
Review comment:
OK
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
ImportPreUpsertKeyValueProcessor {
@Override
- public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+ public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
return keyValues;
}
}
+
+ /**
+ * Updates the Empty cell value to VERIFIED for global index table rows
+ */
+ private static class IndexStatusUpdater {
+
+ private byte[] emptyKeyValueCF;
+ private int emptyKeyValueCFLength;
+ private byte[] emptyKeyValueQualifier;
+ private int emptyKeyValueQualifierLength;
+
+ public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[]
emptyKeyValueQualifier) {
+ this.emptyKeyValueCF = emptyKeyValueCF;
+ this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+ emptyKeyValueCFLength = emptyKeyValueCF.length;
+ emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+ }
+
+ /**
+ * Update the Empty cell values to VERIFIED in the passed keyValues
list
+ *
+ * @param keyValues will be modified
+ */
+ public void setVerfied(List<Cell> keyValues) {
+ for(int i=0; i < keyValues.size() ; i++) {
+ Cell kv = keyValues.get(i);
Review comment:
I purposefully chose this construct, as we're processing a lot a
objects, and this is reportedly faster and easier on memory / GC.
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context
context) throws IOExcep
while (uncommittedDataIterator.hasNext()) {
Pair<byte[], List<Cell>> kvPair =
uncommittedDataIterator.next();
List<Cell> keyValueList = kvPair.getSecond();
- keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(),
keyValueList);
- byte[] first = kvPair.getFirst();
+ byte[] tableName = kvPair.getFirst();
+ keyValueList = preUpdateProcessor.preUpsert(tableName,
keyValueList);
// Create a list of KV for each table
for (int i = 0; i < tableNames.size(); i++) {
- if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)),
first) == 0) {
+ if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)),
tableName) == 0) {
if (!map.containsKey(i)) {
map.put(i, new ArrayList<Cell>());
}
- List<Cell> list = map.get(i);
+ List<Cell> cellsForTable = map.get(i);
+ if(indexStatusUpdaters[i] != null) {
+ indexStatusUpdaters[i].setVerfied(keyValueList);
+ }
for (Cell kv : keyValueList) {
- list.add(kv);
+ cellsForTable.add(kv);
}
Review comment:
OK
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -318,6 +335,27 @@ public void testImportWithIndex() throws Exception {
rs.close();
stmt.close();
+
+ checkIndexTableIsVerfied("TABLE3_IDX");
+ }
+
+ private void checkIndexTableIsVerfied(String indexTableName) throws
SQLException, IOException {
Review comment:
Thanks
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context
context) throws IOExcep
while (uncommittedDataIterator.hasNext()) {
Pair<byte[], List<Cell>> kvPair =
uncommittedDataIterator.next();
List<Cell> keyValueList = kvPair.getSecond();
- keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(),
keyValueList);
- byte[] first = kvPair.getFirst();
+ byte[] tableName = kvPair.getFirst();
+ keyValueList = preUpdateProcessor.preUpsert(tableName,
keyValueList);
// Create a list of KV for each table
for (int i = 0; i < tableNames.size(); i++) {
- if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)),
first) == 0) {
+ if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)),
tableName) == 0) {
if (!map.containsKey(i)) {
map.put(i, new ArrayList<Cell>());
}
- List<Cell> list = map.get(i);
+ List<Cell> cellsForTable = map.get(i);
+ if(indexStatusUpdaters[i] != null) {
+ indexStatusUpdaters[i].setVerfied(keyValueList);
+ }
Review comment:
Strictly speaking, we are not verifying them, as we're generating the
records in bulk, and we assume that they are correct (we assume that the bluk
load job runs fully).
We're just setting the status to verified to avoid repairing the freshly
generated index rows.
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
ImportPreUpsertKeyValueProcessor {
@Override
- public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+ public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
return keyValues;
}
}
+
+ /**
+ * Updates the Empty cell value to VERIFIED for global index table rows
+ */
+ private static class IndexStatusUpdater {
+
+ private byte[] emptyKeyValueCF;
+ private int emptyKeyValueCFLength;
+ private byte[] emptyKeyValueQualifier;
+ private int emptyKeyValueQualifierLength;
+
+ public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[]
emptyKeyValueQualifier) {
+ this.emptyKeyValueCF = emptyKeyValueCF;
+ this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+ emptyKeyValueCFLength = emptyKeyValueCF.length;
+ emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+ }
+
+ /**
+ * Update the Empty cell values to VERIFIED in the passed keyValues
list
+ *
+ * @param keyValues will be modified
+ */
+ public void setVerfied(List<Cell> keyValues) {
Review comment:
Thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]