This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 70937f6 [GOBBLIN-921] Make pull/push mode when registering partition
to be configurable
70937f6 is described below
commit 70937f67a80e68a2a57b5aae3fe72e3d0fa087fd
Author: Zihan Li <[email protected]>
AuthorDate: Wed Oct 23 16:38:47 2019 -0700
[GOBBLIN-921] Make pull/push mode when registering partition to be
configurable
Closes #2777 from ZihanLi58/GOBBLIN-921
---
.../hive/metastore/HiveMetaStoreBasedRegister.java | 71 ++++++++++++++++++++--
1 file changed, 65 insertions(+), 6 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 5e8f999..6dc466b 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -103,6 +103,9 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
public static final String PATH_REGISTER_TIMER =
HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
public static final String SKIP_PARTITION_DIFF_COMPUTATION =
HIVE_REGISTER_METRICS_PREFIX + "skip.partition.diff.computation";
public static final String FETCH_LATEST_SCHEMA =
HIVE_REGISTER_METRICS_PREFIX + "fetchLatestSchemaFromSchemaRegistry";
+ //A config which when enabled checks for the existence of a partition in
Hive before adding the partition.
+ // This is done to minimize the add_partition calls sent to Hive.
+ public static final String REGISTER_PARTITION_WITH_PULL_MODE =
HIVE_REGISTER_METRICS_PREFIX + "registerPartitionWithPullMode";
/**
* To reduce lock aquisition and RPC to metaStoreClient, we cache the result
of query regarding to
* the existence of databases and tables in {@link
#tableAndDbExistenceCache},
@@ -117,6 +120,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
private final EventSubmitter eventSubmitter;
private final MetricContext metricContext;
private final boolean shouldUpdateLatestSchema;
+ private final boolean registerPartitionWithPullMode;
/**
* Local cache that contains records for both databases and tables.
@@ -151,6 +155,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
this.optimizedChecks =
state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
this.skipDiffComputation =
state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
this.shouldUpdateLatestSchema =
state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false);
+ this.registerPartitionWithPullMode =
state.getPropAsBoolean(this.REGISTER_PARTITION_WITH_PULL_MODE, false);
if(state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false)) {
this.schemaRegistry =
Optional.of(KafkaSchemaRegistryFactory.getSchemaRegistry(state.getProperties()));
topicName = state.getProp(KafkaSource.TOPIC_NAME);
@@ -201,6 +206,8 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
}
}
+
+
/**
* If table existed on Hive side will return false;
* Or will create the table thru. RPC and return retVal from remote
MetaStore.
@@ -492,7 +499,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
}
}
- private void addOrAlterPartition(IMetaStoreClient client, Table table,
HivePartition partition)
+ private void addOrAlterPartitionWithPushMode(IMetaStoreClient client, Table
table, HivePartition partition)
throws TException, IOException {
Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
@@ -514,7 +521,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
if (this.skipDiffComputation) {
onPartitionExistWithoutComputingDiff(table, nativePartition, e);
} else {
- onPartitionExist(client, table, partition, nativePartition);
+ onPartitionExist(client, table, partition, nativePartition, null);
}
} catch (Throwable e2) {
log.error(String.format(
@@ -526,11 +533,60 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
}
}
- private void onPartitionExist(IMetaStoreClient client, Table table,
HivePartition partition, Partition nativePartition) throws TException {
+ private void addOrAlterPartition(IMetaStoreClient client, Table table,
HivePartition partition)
+ throws TException, IOException {
+ if(!registerPartitionWithPullMode) {
+ addOrAlterPartitionWithPushMode(client, table, partition);
+ } else {
+ addOrAlterPartitionWithPullMode(client, table, partition);
+ }
+ }
+ private void addOrAlterPartitionWithPullMode(IMetaStoreClient client, Table
table, HivePartition partition)
+ throws TException, IOException {
+ Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
+
+ Preconditions.checkArgument(table.getPartitionKeysSize() ==
nativePartition.getValues().size(),
+ String.format("Partition key size is %s but partition value size is
%s", table.getPartitionKeys().size(),
+ nativePartition.getValues().size()));
+
+ try (AutoCloseableHiveLock lock =
+ this.locks.getPartitionLock(table.getDbName(), table.getTableName(),
nativePartition.getValues())) {
+
+ Partition existedPartition;
+ try {
+ try (Timer.Context context =
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
+ existedPartition = client.getPartition(table.getDbName(),
table.getTableName(), nativePartition.getValues());
+ if (this.skipDiffComputation) {
+ onPartitionExistWithoutComputingDiff(table, nativePartition, null);
+ } else {
+ onPartitionExist(client, table, partition, nativePartition,
existedPartition);
+ }
+ }
+ } catch (TException e) {
+ try (Timer.Context context =
this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
+ client.add_partition(getPartitionWithCreateTimeNow(nativePartition));
+ }
+ catch (Throwable e2) {
+ log.error(String.format(
+ "Unable to add or alter partition %s in table %s with location
%s: " + e2.getMessage(),
+ stringifyPartitionVerbose(nativePartition),
table.getTableName(), nativePartition.getSd().getLocation()), e2);
+ throw e2;
+ }
+ log.info(String.format("Added partition %s to table %s with location
%s", stringifyPartition(nativePartition),
+ table.getTableName(), nativePartition.getSd().getLocation()));
+ }
+ }
+ }
+
+ private void onPartitionExist(IMetaStoreClient client, Table table,
HivePartition partition, Partition nativePartition, Partition existedPartition)
throws TException {
HivePartition existingPartition;
- try (Timer.Context context =
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
- existingPartition = HiveMetaStoreUtils.getHivePartition(
- client.getPartition(table.getDbName(), table.getTableName(),
nativePartition.getValues()));
+ if(existedPartition == null) {
+ try (Timer.Context context =
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
+ existingPartition = HiveMetaStoreUtils.getHivePartition(
+ client.getPartition(table.getDbName(), table.getTableName(),
nativePartition.getValues()));
+ }
+ } else {
+ existingPartition =
HiveMetaStoreUtils.getHivePartition(existedPartition);
}
if (needToUpdatePartition(existingPartition, partition)) {
@@ -550,6 +606,9 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
}
private void onPartitionExistWithoutComputingDiff(Table table, Partition
nativePartition, TException e) throws TException {
+ if(e == null) {
+ return;
+ }
if (e instanceof AlreadyExistsException) {
log.debug(String.format("Partition %s in table %s with location %s
already exists and no need to update",
stringifyPartition(nativePartition), table.getTableName(),
nativePartition.getSd().getLocation()));