This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new aba8387 Update deprecated HBase API
aba8387 is described below
commit aba83876e7c20e8c2402c74c2465ef3ebc93ef85
Author: Wenning Ding <[email protected]>
AuthorDate: Fri Jan 3 12:36:14 2020 -0800
Update deprecated HBase API
---
.../org/apache/hudi/index/hbase/HBaseIndex.java | 44 ++++++++--------------
1 file changed, 16 insertions(+), 28 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 3789bff..3f79096 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -41,11 +41,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
@@ -284,13 +286,10 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
hbaseConnection = getHBaseConnection();
}
}
- HTable hTable = null;
- try {
- hTable = (HTable)
hbaseConnection.getTable(TableName.valueOf(tableName));
+ try (BufferedMutator mutator =
hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
while (statusIterator.hasNext()) {
WriteStatus writeStatus = statusIterator.next();
- List<Put> puts = new ArrayList<>();
- List<Delete> deletes = new ArrayList<>();
+ List<Mutation> mutations = new ArrayList<>();
try {
for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(rec.getKey())) {
@@ -304,20 +303,20 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN,
Bytes.toBytes(loc.get().getInstantTime()));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN,
Bytes.toBytes(loc.get().getFileId()));
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN,
Bytes.toBytes(rec.getPartitionPath()));
- puts.add(put);
+ mutations.add(put);
} else {
// Delete existing index for a deleted record
Delete delete = new
Delete(Bytes.toBytes(rec.getRecordKey()));
- deletes.add(delete);
+ mutations.add(delete);
}
}
- if (puts.size() + deletes.size() < multiPutBatchSize) {
+ if (mutations.size() < multiPutBatchSize) {
continue;
}
- doPutsAndDeletes(hTable, puts, deletes);
+ doMutations(mutator, mutations);
}
// process remaining puts and deletes, if any
- doPutsAndDeletes(hTable, puts, deletes);
+ doMutations(mutator, mutations);
} catch (Exception e) {
Exception we = new Exception("Error updating index for " +
writeStatus, e);
LOG.error(we);
@@ -327,32 +326,21 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
}
} catch (IOException e) {
throw new HoodieIndexException("Failed to Update Index locations
because of exception with HBase Client", e);
- } finally {
- if (hTable != null) {
- try {
- hTable.close();
- } catch (IOException e) {
- // Ignore
- }
- }
}
return writeStatusList.iterator();
};
}
/**
- * Helper method to facilitate performing puts and deletes in Hbase.
+ * Helper method to facilitate performing mutations (including puts and
deletes) in Hbase.
*/
- private void doPutsAndDeletes(HTable hTable, List<Put> puts, List<Delete>
deletes) throws IOException {
- if (puts.size() > 0) {
- hTable.put(puts);
- }
- if (deletes.size() > 0) {
- hTable.delete(deletes);
+ private void doMutations(BufferedMutator mutator, List<Mutation> mutations)
throws IOException {
+ if (mutations.isEmpty()) {
+ return;
}
- hTable.flushCommits();
- puts.clear();
- deletes.clear();
+ mutator.mutate(mutations);
+ mutator.flush();
+ mutations.clear();
sleepForTime(SLEEP_TIME_MILLISECONDS);
}