hj2016 commented on a change in pull request #2188:
URL: https://github.com/apache/hudi/pull/2188#discussion_r549255511



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
##########
@@ -480,6 +486,68 @@ private Integer getNumRegionServersAliveForTable() {
   @Override
   public boolean rollbackCommit(String instantTime) {
     // Rollback in HbaseIndex is managed via method {@link 
#checkIfValidCommit()}
+    synchronized (SparkHoodieHBaseIndex.class) {
+      if (hbaseConnection == null || hbaseConnection.isClosed()) {
+        hbaseConnection = getHBaseConnection();
+      }
+    }
+    try (HTable hTable = (HTable) 
hbaseConnection.getTable(TableName.valueOf(tableName));
+         BufferedMutator mutator = 
hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
+      int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
+      boolean rollbackSync = config.getHBaseIndexRollbackSync();
+
+      Long rollbackTime = 
HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime();
+      Long currentTime = new Date().getTime();
+      Scan scan = new Scan();
+      scan.addFamily(SYSTEM_COLUMN_FAMILY);
+      scan.setTimeRange(rollbackTime, currentTime);
+      ResultScanner scanner = hTable.getScanner(scan);
+      Iterator<Result> scannerIterator = scanner.iterator();
+
+      List<Get> statements = new ArrayList<>();
+      List<Result> currentVersionResults = new ArrayList<Result>();
+      List<Mutation> mutations = new ArrayList<>();
+      while (scannerIterator.hasNext()) {
+        Result result = scannerIterator.next();
+        currentVersionResults.add(result);
+        statements.add(generateStatement(Bytes.toString(result.getRow()), 0L, 
rollbackTime - 1));
+
+        if (scannerIterator.hasNext() &&  statements.size() < 
multiGetBatchSize) {
+          continue;
+        }
+        Result[] lastVersionResults = hTable.get(statements);
+        for (int i = 0; i < lastVersionResults.length; i++) {
+          Result lastVersionResult = lastVersionResults[i];
+          if (null == lastVersionResult.getRow() && rollbackSync) {
+            Result currentVersionResult = currentVersionResults.get(i);
+            Delete delete = new Delete(currentVersionResult.getRow());
+            // delete.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, 
currentTime);

Review comment:
       Because I thought it was necessary to specify the column to delete, I 
later found that it is not necessary to specify the column. I think I can 
delete the comment code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to