nsivabalan commented on a change in pull request #2188:
URL: https://github.com/apache/hudi/pull/2188#discussion_r549017875



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
##########
@@ -480,6 +486,68 @@ private Integer getNumRegionServersAliveForTable() {
   @Override
   public boolean rollbackCommit(String instantTime) {
     // Rollback in HbaseIndex is managed via method {@link 
#checkIfValidCommit()}
+    synchronized (SparkHoodieHBaseIndex.class) {

Review comment:
       can you remove comment in line 488.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
##########
@@ -480,6 +486,68 @@ private Integer getNumRegionServersAliveForTable() {
   @Override
   public boolean rollbackCommit(String instantTime) {
     // Rollback in HbaseIndex is managed via method {@link 
#checkIfValidCommit()}
+    synchronized (SparkHoodieHBaseIndex.class) {
+      if (hbaseConnection == null || hbaseConnection.isClosed()) {
+        hbaseConnection = getHBaseConnection();
+      }
+    }
+    try (HTable hTable = (HTable) 
hbaseConnection.getTable(TableName.valueOf(tableName));
+         BufferedMutator mutator = 
hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
+      int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
+      boolean rollbackSync = config.getHBaseIndexRollbackSync();
+
+      Long rollbackTime = 
HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime();
+      Long currentTime = new Date().getTime();
+      Scan scan = new Scan();
+      scan.addFamily(SYSTEM_COLUMN_FAMILY);
+      scan.setTimeRange(rollbackTime, currentTime);
+      ResultScanner scanner = hTable.getScanner(scan);
+      Iterator<Result> scannerIterator = scanner.iterator();
+
+      List<Get> statements = new ArrayList<>();
+      List<Result> currentVersionResults = new ArrayList<Result>();
+      List<Mutation> mutations = new ArrayList<>();
+      while (scannerIterator.hasNext()) {
+        Result result = scannerIterator.next();
+        currentVersionResults.add(result);
+        statements.add(generateStatement(Bytes.toString(result.getRow()), 0L, 
rollbackTime - 1));
+
+        if (scannerIterator.hasNext() &&  statements.size() < 
multiGetBatchSize) {
+          continue;
+        }
+        Result[] lastVersionResults = hTable.get(statements);
+        for (int i = 0; i < lastVersionResults.length; i++) {
+          Result lastVersionResult = lastVersionResults[i];
+          if (null == lastVersionResult.getRow() && rollbackSync) {
+            Result currentVersionResult = currentVersionResults.get(i);
+            Delete delete = new Delete(currentVersionResult.getRow());
+            // delete.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, 
currentTime);

Review comment:
       why commented out code 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
##########
@@ -263,6 +265,66 @@ public void testTagLocationAndDuplicateUpdate() throws 
Exception {
         && 
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
   }
 
+  @Test
+  public void testTagLocationAndPartitionPathUpdateWithRollback() throws 
Exception {

Review comment:
       so can you confirm that this test fails if not for the fix? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to