This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aba8387  Update deprecated HBase API
aba8387 is described below

commit aba83876e7c20e8c2402c74c2465ef3ebc93ef85
Author: Wenning Ding <[email protected]>
AuthorDate: Fri Jan 3 12:36:14 2020 -0800

    Update deprecated HBase API
---
 .../org/apache/hudi/index/hbase/HBaseIndex.java    | 44 ++++++++--------------
 1 file changed, 16 insertions(+), 28 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java 
b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 3789bff..3f79096 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -41,11 +41,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
@@ -284,13 +286,10 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
           hbaseConnection = getHBaseConnection();
         }
       }
-      HTable hTable = null;
-      try {
-        hTable = (HTable) 
hbaseConnection.getTable(TableName.valueOf(tableName));
+      try (BufferedMutator mutator = 
hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
         while (statusIterator.hasNext()) {
           WriteStatus writeStatus = statusIterator.next();
-          List<Put> puts = new ArrayList<>();
-          List<Delete> deletes = new ArrayList<>();
+          List<Mutation> mutations = new ArrayList<>();
           try {
             for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
               if (!writeStatus.isErrored(rec.getKey())) {
@@ -304,20 +303,20 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
                   put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, 
Bytes.toBytes(loc.get().getInstantTime()));
                   put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, 
Bytes.toBytes(loc.get().getFileId()));
                   put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, 
Bytes.toBytes(rec.getPartitionPath()));
-                  puts.add(put);
+                  mutations.add(put);
                 } else {
                   // Delete existing index for a deleted record
                   Delete delete = new 
Delete(Bytes.toBytes(rec.getRecordKey()));
-                  deletes.add(delete);
+                  mutations.add(delete);
                 }
               }
-              if (puts.size() + deletes.size() < multiPutBatchSize) {
+              if (mutations.size() < multiPutBatchSize) {
                 continue;
               }
-              doPutsAndDeletes(hTable, puts, deletes);
+              doMutations(mutator, mutations);
             }
             // process remaining puts and deletes, if any
-            doPutsAndDeletes(hTable, puts, deletes);
+            doMutations(mutator, mutations);
           } catch (Exception e) {
             Exception we = new Exception("Error updating index for " + 
writeStatus, e);
             LOG.error(we);
@@ -327,32 +326,21 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
         }
       } catch (IOException e) {
         throw new HoodieIndexException("Failed to Update Index locations 
because of exception with HBase Client", e);
-      } finally {
-        if (hTable != null) {
-          try {
-            hTable.close();
-          } catch (IOException e) {
-            // Ignore
-          }
-        }
       }
       return writeStatusList.iterator();
     };
   }
 
   /**
-   * Helper method to facilitate performing puts and deletes in Hbase.
+   * Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
    */
-  private void doPutsAndDeletes(HTable hTable, List<Put> puts, List<Delete> 
deletes) throws IOException {
-    if (puts.size() > 0) {
-      hTable.put(puts);
-    }
-    if (deletes.size() > 0) {
-      hTable.delete(deletes);
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) 
throws IOException {
+    if (mutations.isEmpty()) {
+      return;
     }
-    hTable.flushCommits();
-    puts.clear();
-    deletes.clear();
+    mutator.mutate(mutations);
+    mutator.flush();
+    mutations.clear();
     sleepForTime(SLEEP_TIME_MILLISECONDS);
   }
 

Reply via email to