This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3523dbc [GOBBLIN-863] Handle race condition issue for hive
registration
3523dbc is described below
commit 3523dbcdf41d489ece761c37f5849539cfc8948d
Author: Zihan Li <[email protected]>
AuthorDate: Tue Sep 3 08:30:39 2019 -0700
[GOBBLIN-863] Handle race condition issue for hive registration
Closes #2719 from ZihanLi58/GOBBLIN-863
---
gobblin-docs/user-guide/Gobblin-on-Yarn.md | 2 +-
.../apache/gobblin/hive/AutoCloseableHiveLock.java | 39 ++++++++++++++++
.../java/org/apache/gobblin/hive/HiveLock.java | 37 +++++++++++----
.../org/apache/gobblin/hive/HiveLockFactory.java | 50 ++++++++++++++++++++
.../java/org/apache/gobblin/hive/HiveLockImpl.java | 34 ++++++++++++++
.../java/org/apache/gobblin/hive/HiveRegister.java | 35 +++++++-------
.../hive/metastore/HiveMetaStoreBasedRegister.java | 53 ++++++++++++++++++----
.../gobblin/hive/metastore/HiveMetaStoreUtils.java | 2 +-
.../runtime/locks/DistributedHiveLockFactory.java | 53 ++++++++++++++++++++++
.../runtime/locks/ZookeeperBasedJobLock.java | 7 ++-
10 files changed, 272 insertions(+), 40 deletions(-)
diff --git a/gobblin-docs/user-guide/Gobblin-on-Yarn.md
b/gobblin-docs/user-guide/Gobblin-on-Yarn.md
index 548f0f4..e3e2b67 100644
--- a/gobblin-docs/user-guide/Gobblin-on-Yarn.md
+++ b/gobblin-docs/user-guide/Gobblin-on-Yarn.md
@@ -225,7 +225,7 @@ qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
# Use zookeeper for maintaining the job lock
job.lock.enabled=true
-job.lock.type=org.apache.gobblin.runtime.locks.ZookeeperBasedJobLock
+job.lock.type=ZookeeperBasedJobLock
# Directory where job locks are stored
job.lock.dir=${gobblin.yarn.work.dir}/locks
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/AutoCloseableHiveLock.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/AutoCloseableHiveLock.java
new file mode 100644
index 0000000..abf01cb
--- /dev/null
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/AutoCloseableHiveLock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import java.io.IOException;
+
+
+/**
+ * An autoCloseable hive lock. Use a {@link HiveLockImpl} as real lock, but
will unlock automatically when close
+ */
+public class AutoCloseableHiveLock implements AutoCloseable {
+
+ private final HiveLockImpl lock;
+
+ public AutoCloseableHiveLock(HiveLockImpl lock) throws IOException {
+ this.lock = lock;
+ this.lock.lock();
+ }
+
+ @Override
+ public void close() throws IOException{
+ this.lock.unlock();
+ }
+}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
index 85df169..653ddcb 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
@@ -17,13 +17,15 @@
package org.apache.gobblin.hive;
-import java.util.concurrent.locks.Lock;
+import java.io.IOException;
+import java.util.Properties;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.util.concurrent.Striped;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.util.AutoCloseableLock;
@@ -37,30 +39,45 @@ import org.apache.gobblin.util.AutoCloseableLock;
* </p>
*/
public class HiveLock {
+ private static String HIVE_LOCK_TYPE =
HiveMetaStoreBasedRegister.HIVE_REGISTER_METRICS_PREFIX + "lock.type";
+ private static String HIVE_LOCK_TYPE_DEFAULT =
"org.apache.gobblin.hive.HiveLockFactory";
- private static final Joiner JOINER = Joiner.on(' ').skipNulls();
+ private Properties properties;
- private final Striped<Lock> locks = Striped.lazyWeakLock(Integer.MAX_VALUE);
+ private static final Joiner JOINER = Joiner.on('/').skipNulls();
- public AutoCloseableLock getDbLock(String dbName) {
+ private final HiveLockFactory locks;
+
+ public HiveLock(Properties properties) throws IOException {
+ this.properties = properties;
+ try {
+ locks = (HiveLockFactory) ConstructorUtils.invokeConstructor(
+ Class.forName(properties.getProperty(HIVE_LOCK_TYPE,
HIVE_LOCK_TYPE_DEFAULT)), properties);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public AutoCloseableHiveLock getDbLock(String dbName) throws IOException{
Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
- return new AutoCloseableLock(this.locks.get(dbName));
+ return new AutoCloseableHiveLock(this.locks.get(dbName));
}
- public AutoCloseableLock getTableLock(String dbName, String tableName) {
+ public AutoCloseableHiveLock getTableLock(String dbName, String tableName)
throws IOException{
Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName));
- return new AutoCloseableLock(this.locks.get(JOINER.join(dbName,
tableName)));
+ return new AutoCloseableHiveLock(this.locks.get(JOINER.join(dbName,
tableName)));
}
- public AutoCloseableLock getPartitionLock(String dbName, String tableName,
Iterable<String> partitionValues) {
+ public AutoCloseableHiveLock getPartitionLock(String dbName, String
tableName, Iterable<String> partitionValues)
+ throws IOException{
Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName));
Preconditions.checkArgument(partitionValues.iterator().hasNext());
- return new AutoCloseableLock(this.locks.get(JOINER.join(dbName, tableName,
JOINER.join(partitionValues))));
+ return new AutoCloseableHiveLock(this.locks.get(JOINER.join(dbName,
tableName, JOINER.join(partitionValues))));
}
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockFactory.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockFactory.java
new file mode 100644
index 0000000..1884afa
--- /dev/null
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import com.google.common.util.concurrent.Striped;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.locks.Lock;
+
+
+/**
+ * A lock factory that provide a get method for a HiveLockImpl for a specific
object
+ */
+public class HiveLockFactory {
+ protected Properties properties;
+ private final Striped<Lock> locks = Striped.lazyWeakLock(Integer.MAX_VALUE);
+ public HiveLockFactory(Properties _properties) {
+ this.properties = _properties;
+
+ }
+ public HiveLockImpl get(String name) {
+
+ return new HiveLockImpl<Lock>(locks.get(name)) {
+ @Override
+ public void lock() throws IOException {
+ this.lock.lock();
+ }
+
+ @Override
+ public void unlock() throws IOException {
+ this.lock.unlock();
+ }
+ };
+ }
+}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockImpl.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockImpl.java
new file mode 100644
index 0000000..8fbaf6c
--- /dev/null
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import java.io.IOException;
+
+
+/**
+ * A wrapper lock to be used by hive.
+ * @param <T> The class of the real lock
+ */
+public abstract class HiveLockImpl<T> {
+ protected T lock;
+ public HiveLockImpl(T _lock){
+ this.lock = _lock;
+ }
+ public abstract void lock() throws IOException;
+ public abstract void unlock() throws IOException;
+}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index 93dffe2..f481522 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -94,29 +94,32 @@ public abstract class HiveRegister implements Closeable {
@Override
public Void call() throws Exception {
+ try {
+ if (spec instanceof HiveSpecWithPredicates &&
!evaluatePredicates((HiveSpecWithPredicates) spec)) {
+ log.info("Skipping " + spec + " since predicates return false");
+ return null;
+ }
- if (spec instanceof HiveSpecWithPredicates &&
!evaluatePredicates((HiveSpecWithPredicates) spec)) {
- log.info("Skipping " + spec + " since predicates return false");
- return null;
- }
-
- if (spec instanceof HiveSpecWithPreActivities) {
- for (Activity activity : ((HiveSpecWithPreActivities)
spec).getPreActivities()) {
- activity.execute(HiveRegister.this);
+ if (spec instanceof HiveSpecWithPreActivities) {
+ for (Activity activity : ((HiveSpecWithPreActivities)
spec).getPreActivities()) {
+ activity.execute(HiveRegister.this);
+ }
}
- }
- registerPath(spec);
+ registerPath(spec);
- if (spec instanceof HiveSpecWithPostActivities) {
- for (Activity activity : ((HiveSpecWithPostActivities)
spec).getPostActivities()) {
- activity.execute(HiveRegister.this);
+ if (spec instanceof HiveSpecWithPostActivities) {
+ for (Activity activity : ((HiveSpecWithPostActivities)
spec).getPostActivities()) {
+ activity.execute(HiveRegister.this);
+ }
}
- }
- return null;
+ return null;
+ } catch (Exception e) {
+ log.error("Exception during hive registration", e);
+ throw e;
+ }
}
-
});
this.futures.put(getSpecId(spec), future);
return future;
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 9c07337..41ceaf3 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -29,6 +29,11 @@ import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.gobblin.hive.AutoCloseableHiveLock;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory;
+import org.apache.gobblin.kafka.schemareg.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -36,6 +41,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.thrift.TException;
import org.joda.time.DateTime;
@@ -59,7 +65,6 @@ import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.util.AutoCloseableLock;
import org.apache.gobblin.util.AutoReturnableObject;
@@ -93,9 +98,11 @@ public class HiveMetaStoreBasedRegister extends HiveRegister
{
public static final String CREATE_HIVE_DATABASE =
HIVE_REGISTER_METRICS_PREFIX + "createDatabaseTimer";
public static final String CREATE_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX
+ "createTableTimer";
public static final String GET_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX +
"getTableTimer";
+ public static final String GET_AND_SET_LATEST_SCHEMA =
HIVE_REGISTER_METRICS_PREFIX + "getAndSetLatestSchemaTimer";
public static final String DROP_TABLE = HIVE_REGISTER_METRICS_PREFIX +
"dropTableTimer";
public static final String PATH_REGISTER_TIMER =
HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
public static final String SKIP_PARTITION_DIFF_COMPUTATION =
HIVE_REGISTER_METRICS_PREFIX + "skip.partition.diff.computation";
+ public static final String FETCH_LATEST_SCHEMA =
HIVE_REGISTER_METRICS_PREFIX + "fetchLatestSchemaFromSchemaRegistry";
/**
* To reduce lock aquisition and RPC to metaStoreClient, we cache the result
of query regarding to
* the existence of databases and tables in {@link
#tableAndDbExistenceCache},
@@ -106,9 +113,10 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
public static final String OPTIMIZED_CHECK_ENABLED =
"hiveRegister.cacheDbTableExistence";
private final HiveMetastoreClientPool clientPool;
- private final HiveLock locks = new HiveLock();
+ private final HiveLock locks;
private final EventSubmitter eventSubmitter;
private final MetricContext metricContext;
+ private final boolean shouldUpdateLatestSchema;
/**
* Local cache that contains records for both databases and tables.
@@ -134,11 +142,19 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
//for a partition is immutable
private final boolean skipDiffComputation;
+ private Optional<KafkaSchemaRegistry> schemaRegistry = Optional.absent();
+ private String topicName = "";
public HiveMetaStoreBasedRegister(State state, Optional<String>
metastoreURI) throws IOException {
super(state);
+ this.locks = new HiveLock(state.getProperties());
this.optimizedChecks =
state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
this.skipDiffComputation =
state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
+ this.shouldUpdateLatestSchema =
state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false);
+ if(state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false)) {
+ this.schemaRegistry =
Optional.of(KafkaSchemaRegistryFactory.getSchemaRegistry(state.getProperties()));
+ topicName = state.getProp(KafkaSource.TOPIC_NAME);
+ }
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(this.props.getNumThreads());
@@ -170,14 +186,28 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
throw new IOException(e);
}
}
+ //TODO: We need to find a better to get the latest schema
+ private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+ if (this.schemaRegistry.isPresent()) {
+ try (Timer.Context context =
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
+ String latestSchema =
this.schemaRegistry.get().getLatestSchema(topicName).toString();
+
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchema);
+
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+ } catch (SchemaRegistryException | IOException e) {
+ log.error(String.format("Error when fetch latest schema for topic %s",
topicName), e);
+ throw new IOException(e);
+ }
+ }
+ }
/**
* If table existed on Hive side will return false;
* Or will create the table thru. RPC and return retVal from remote
MetaStore.
*/
private boolean ensureHiveTableExistenceBeforeAlternation(String tableName,
String dbName, IMetaStoreClient client,
- Table table, HiveSpec spec) throws TException{
- try (AutoCloseableLock lock = this.locks.getTableLock(dbName, tableName)) {
+ Table table, HiveSpec spec) throws TException, IOException{
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName)) {
try {
try (Timer.Context context =
this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
client.createTable(getTableWithCreateTimeNow(table));
@@ -197,6 +227,9 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
try (Timer.Context context =
this.metricContext.timer(GET_HIVE_TABLE).time()) {
existingTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
}
+ if(shouldUpdateLatestSchema) {
+ updateSchema(spec, table);
+ }
if (needToUpdateTable(existingTable, spec.getTable())) {
try (Timer.Context context =
this.metricContext.timer(ALTER_TABLE).time()) {
client.alter_table(dbName, tableName,
getNewTblByMergingExistingTblProps(table, existingTable));
@@ -221,7 +254,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
* @param hiveDbName is the hive databases to be checked for existence
*/
private boolean ensureHiveDbExistence(String hiveDbName, IMetaStoreClient
client) throws IOException{
- try (AutoCloseableLock lock = this.locks.getDbLock(hiveDbName)) {
+ try (AutoCloseableHiveLock lock = this.locks.getDbLock(hiveDbName)) {
Database db = new Database();
db.setName(hiveDbName);
@@ -297,7 +330,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
@Override
public boolean createTableIfNotExists(HiveTable table) throws IOException {
try (AutoReturnableObject<IMetaStoreClient> client =
this.clientPool.getClient();
- AutoCloseableLock lock = this.locks.getTableLock(table.getDbName(),
table.getTableName())) {
+ AutoCloseableHiveLock lock =
this.locks.getTableLock(table.getDbName(), table.getTableName())) {
return createTableIfNotExists(client.get(),
HiveMetaStoreUtils.getTable(table), table);
}
}
@@ -309,7 +342,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
@Override
public boolean addPartitionIfNotExists(HiveTable table, HivePartition
partition) throws IOException {
try (AutoReturnableObject<IMetaStoreClient> client =
this.clientPool.getClient();
- AutoCloseableLock lock = this.locks.getTableLock(table.getDbName(),
table.getTableName())) {
+ AutoCloseableHiveLock lock =
this.locks.getTableLock(table.getDbName(), table.getTableName())) {
try {
try (Timer.Context context =
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
client.get().getPartition(table.getDbName(), table.getTableName(),
partition.getValues());
@@ -337,7 +370,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
String dbName = table.getDbName();
String tableName = table.getTableName();
- try (AutoCloseableLock lock = this.locks.getTableLock(dbName, tableName)) {
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName)) {
boolean tableExists;
try (Timer.Context context =
this.metricContext.timer(TABLE_EXISTS).time()) {
tableExists = client.tableExists(table.getDbName(),
table.getTableName());
@@ -460,14 +493,14 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
}
private void addOrAlterPartition(IMetaStoreClient client, Table table,
HivePartition partition)
- throws TException {
+ throws TException, IOException {
Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
Preconditions.checkArgument(table.getPartitionKeysSize() ==
nativePartition.getValues().size(),
String.format("Partition key size is %s but partition value size is
%s", table.getPartitionKeys().size(),
nativePartition.getValues().size()));
- try (AutoCloseableLock lock =
+ try (AutoCloseableHiveLock lock =
this.locks.getPartitionLock(table.getDbName(), table.getTableName(),
nativePartition.getValues())) {
try {
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 3064c35..efd0b9c 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -239,7 +239,7 @@ public class HiveMetaStoreUtils {
return sd;
}
- private static SerDeInfo getSerDeInfo(HiveRegistrationUnit unit) {
+ public static SerDeInfo getSerDeInfo(HiveRegistrationUnit unit) {
State props = unit.getSerDeProps();
SerDeInfo si = new SerDeInfo();
si.setParameters(getParameters(props));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/DistributedHiveLockFactory.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/DistributedHiveLockFactory.java
new file mode 100644
index 0000000..ae463e2
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/DistributedHiveLockFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.locks;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.gobblin.hive.HiveLockFactory;
+import org.apache.gobblin.hive.HiveLockImpl;
+
+/**
+ * A lock factory that extends {@link HiveLockFactory} provide a get method
for a distributed lock for a specific object
+ */
+public class DistributedHiveLockFactory extends HiveLockFactory {
+ public DistributedHiveLockFactory(Properties properties) {
+ super(properties);
+ }
+ public HiveLockImpl get(String name) {
+ return new HiveLockImpl<ZookeeperBasedJobLock>(new
ZookeeperBasedJobLock(properties, name)) {
+ @Override
+ public void lock() throws IOException {
+ try {
+ this.lock.lock();
+ } catch (JobLockException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void unlock() throws IOException {
+ try {
+ this.lock.unlock();
+ } catch (JobLockException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
index 0b585a8..1604352 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
@@ -69,8 +69,11 @@ public class ZookeeperBasedJobLock implements
ListenableJobLock {
private long lockAcquireTimeoutMilliseconds;
private InterProcessLock lock;
- public ZookeeperBasedJobLock(Properties properties) throws JobLockException {
- String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+ public ZookeeperBasedJobLock(Properties properties) {
+ this(properties, properties.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+
+ public ZookeeperBasedJobLock(Properties properties, String jobName) {
this.lockAcquireTimeoutMilliseconds =
getLong(properties, LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS,
LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS_DEFAULT);
this.lockPath = Paths.get(LOCKS_ROOT_PATH, jobName).toString();