This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 70937f6  [GOBBLIN-921] Make pull/push mode when registering partition 
to be configurable
70937f6 is described below

commit 70937f67a80e68a2a57b5aae3fe72e3d0fa087fd
Author: Zihan Li <[email protected]>
AuthorDate: Wed Oct 23 16:38:47 2019 -0700

    [GOBBLIN-921] Make pull/push mode when registering partition to be 
configurable
    
    Closes #2777 from ZihanLi58/GOBBLIN-921
---
 .../hive/metastore/HiveMetaStoreBasedRegister.java | 71 ++++++++++++++++++++--
 1 file changed, 65 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 5e8f999..6dc466b 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -103,6 +103,9 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   public static final String PATH_REGISTER_TIMER = 
HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
   public static final String SKIP_PARTITION_DIFF_COMPUTATION = 
HIVE_REGISTER_METRICS_PREFIX + "skip.partition.diff.computation";
   public static final String FETCH_LATEST_SCHEMA = 
HIVE_REGISTER_METRICS_PREFIX + "fetchLatestSchemaFromSchemaRegistry";
+  //A config which when enabled checks for the existence of a partition in 
Hive before adding the partition.
+  // This is done to minimize the add_partition calls sent to Hive.
+  public static final String REGISTER_PARTITION_WITH_PULL_MODE = 
HIVE_REGISTER_METRICS_PREFIX + "registerPartitionWithPullMode";
   /**
    * To reduce lock aquisition and RPC to metaStoreClient, we cache the result 
of query regarding to
    * the existence of databases and tables in {@link 
#tableAndDbExistenceCache},
@@ -117,6 +120,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   private final EventSubmitter eventSubmitter;
   private final MetricContext metricContext;
   private final boolean shouldUpdateLatestSchema;
+  private final boolean registerPartitionWithPullMode;
 
   /**
    * Local cache that contains records for both databases and tables.
@@ -151,6 +155,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
     this.optimizedChecks = 
state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
     this.skipDiffComputation = 
state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
     this.shouldUpdateLatestSchema = 
state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false);
+    this.registerPartitionWithPullMode = 
state.getPropAsBoolean(this.REGISTER_PARTITION_WITH_PULL_MODE, false);
     if(state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false)) {
       this.schemaRegistry = 
Optional.of(KafkaSchemaRegistryFactory.getSchemaRegistry(state.getProperties()));
       topicName = state.getProp(KafkaSource.TOPIC_NAME);
@@ -201,6 +206,8 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
     }
   }
 
+
+
   /**
    * If table existed on Hive side will return false;
    * Or will create the table thru. RPC and return retVal from remote 
MetaStore.
@@ -492,7 +499,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
     }
   }
 
-  private void addOrAlterPartition(IMetaStoreClient client, Table table, 
HivePartition partition)
+  private void addOrAlterPartitionWithPushMode(IMetaStoreClient client, Table 
table, HivePartition partition)
       throws TException, IOException {
     Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
 
@@ -514,7 +521,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
           if (this.skipDiffComputation) {
             onPartitionExistWithoutComputingDiff(table, nativePartition, e);
           } else {
-            onPartitionExist(client, table, partition, nativePartition);
+            onPartitionExist(client, table, partition, nativePartition, null);
           }
         } catch (Throwable e2) {
           log.error(String.format(
@@ -526,11 +533,60 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
     }
   }
 
-  private void onPartitionExist(IMetaStoreClient client, Table table, 
HivePartition partition, Partition nativePartition) throws TException {
+  private void addOrAlterPartition(IMetaStoreClient client, Table table, 
HivePartition partition)
+      throws TException, IOException {
+    if(!registerPartitionWithPullMode) {
+      addOrAlterPartitionWithPushMode(client, table, partition);
+    } else {
+      addOrAlterPartitionWithPullMode(client, table, partition);
+    }
+  }
+  private void addOrAlterPartitionWithPullMode(IMetaStoreClient client, Table 
table, HivePartition partition)
+      throws TException, IOException {
+    Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
+
+    Preconditions.checkArgument(table.getPartitionKeysSize() == 
nativePartition.getValues().size(),
+        String.format("Partition key size is %s but partition value size is 
%s", table.getPartitionKeys().size(),
+            nativePartition.getValues().size()));
+
+    try (AutoCloseableHiveLock lock =
+        this.locks.getPartitionLock(table.getDbName(), table.getTableName(), 
nativePartition.getValues())) {
+
+      Partition existedPartition;
+      try {
+        try (Timer.Context context = 
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
+          existedPartition =  client.getPartition(table.getDbName(), 
table.getTableName(), nativePartition.getValues());
+          if (this.skipDiffComputation) {
+            onPartitionExistWithoutComputingDiff(table, nativePartition, null);
+          } else {
+            onPartitionExist(client, table, partition, nativePartition, 
existedPartition);
+          }
+        }
+      } catch (TException e) {
+        try (Timer.Context context = 
this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
+          client.add_partition(getPartitionWithCreateTimeNow(nativePartition));
+        }
+        catch (Throwable e2) {
+          log.error(String.format(
+              "Unable to add or alter partition %s in table %s with location 
%s: " + e2.getMessage(),
+              stringifyPartitionVerbose(nativePartition), 
table.getTableName(), nativePartition.getSd().getLocation()), e2);
+          throw e2;
+        }
+        log.info(String.format("Added partition %s to table %s with location 
%s", stringifyPartition(nativePartition),
+            table.getTableName(), nativePartition.getSd().getLocation()));
+      }
+    }
+  }
+
+  private void onPartitionExist(IMetaStoreClient client, Table table, 
HivePartition partition, Partition nativePartition, Partition existedPartition) 
throws TException {
     HivePartition existingPartition;
-    try (Timer.Context context = 
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
-      existingPartition = HiveMetaStoreUtils.getHivePartition(
-          client.getPartition(table.getDbName(), table.getTableName(), 
nativePartition.getValues()));
+    if(existedPartition == null) {
+      try (Timer.Context context = 
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
+        existingPartition = HiveMetaStoreUtils.getHivePartition(
+            client.getPartition(table.getDbName(), table.getTableName(), 
nativePartition.getValues()));
+      }
+    } else {
+      existingPartition = 
HiveMetaStoreUtils.getHivePartition(existedPartition);
     }
 
     if (needToUpdatePartition(existingPartition, partition)) {
@@ -550,6 +606,9 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   }
 
   private void onPartitionExistWithoutComputingDiff(Table table, Partition 
nativePartition, TException e) throws TException {
+    if(e == null) {
+      return;
+    }
     if (e instanceof AlreadyExistsException) {
       log.debug(String.format("Partition %s in table %s with location %s 
already exists and no need to update",
           stringifyPartition(nativePartition), table.getTableName(), 
nativePartition.getSd().getLocation()));

Reply via email to