This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3523dbc  [GOBBLIN-863] Handle race condition issue for hive 
registration
3523dbc is described below

commit 3523dbcdf41d489ece761c37f5849539cfc8948d
Author: Zihan Li <[email protected]>
AuthorDate: Tue Sep 3 08:30:39 2019 -0700

    [GOBBLIN-863] Handle race condition issue for hive registration
    
    Closes #2719 from ZihanLi58/GOBBLIN-863
---
 gobblin-docs/user-guide/Gobblin-on-Yarn.md         |  2 +-
 .../apache/gobblin/hive/AutoCloseableHiveLock.java | 39 ++++++++++++++++
 .../java/org/apache/gobblin/hive/HiveLock.java     | 37 +++++++++++----
 .../org/apache/gobblin/hive/HiveLockFactory.java   | 50 ++++++++++++++++++++
 .../java/org/apache/gobblin/hive/HiveLockImpl.java | 34 ++++++++++++++
 .../java/org/apache/gobblin/hive/HiveRegister.java | 35 +++++++-------
 .../hive/metastore/HiveMetaStoreBasedRegister.java | 53 ++++++++++++++++++----
 .../gobblin/hive/metastore/HiveMetaStoreUtils.java |  2 +-
 .../runtime/locks/DistributedHiveLockFactory.java  | 53 ++++++++++++++++++++++
 .../runtime/locks/ZookeeperBasedJobLock.java       |  7 ++-
 10 files changed, 272 insertions(+), 40 deletions(-)

diff --git a/gobblin-docs/user-guide/Gobblin-on-Yarn.md 
b/gobblin-docs/user-guide/Gobblin-on-Yarn.md
index 548f0f4..e3e2b67 100644
--- a/gobblin-docs/user-guide/Gobblin-on-Yarn.md
+++ b/gobblin-docs/user-guide/Gobblin-on-Yarn.md
@@ -225,7 +225,7 @@ qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
 
 # Use zookeeper for maintaining the job lock
 job.lock.enabled=true
-job.lock.type=org.apache.gobblin.runtime.locks.ZookeeperBasedJobLock
+job.lock.type=ZookeeperBasedJobLock
 
 # Directory where job locks are stored
 job.lock.dir=${gobblin.yarn.work.dir}/locks
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/AutoCloseableHiveLock.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/AutoCloseableHiveLock.java
new file mode 100644
index 0000000..abf01cb
--- /dev/null
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/AutoCloseableHiveLock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import java.io.IOException;
+
+
+/**
+ * An autoCloseable hive lock. Use a {@link HiveLockImpl} as real lock, but 
will unlock automatically when close
+ */
+public class AutoCloseableHiveLock implements AutoCloseable {
+
+  private final HiveLockImpl lock;
+
+  public AutoCloseableHiveLock(HiveLockImpl lock) throws IOException {
+    this.lock = lock;
+    this.lock.lock();
+  }
+
+  @Override
+  public void close() throws IOException{
+    this.lock.unlock();
+  }
+}
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
index 85df169..653ddcb 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
@@ -17,13 +17,15 @@
 
 package org.apache.gobblin.hive;
 
-import java.util.concurrent.locks.Lock;
+import java.io.IOException;
+import java.util.Properties;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.util.concurrent.Striped;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
 import org.apache.gobblin.util.AutoCloseableLock;
 
 
@@ -37,30 +39,45 @@ import org.apache.gobblin.util.AutoCloseableLock;
  * </p>
  */
 public class HiveLock {
+  private static String HIVE_LOCK_TYPE = 
HiveMetaStoreBasedRegister.HIVE_REGISTER_METRICS_PREFIX + "lock.type";
+  private static String HIVE_LOCK_TYPE_DEFAULT = 
"org.apache.gobblin.hive.HiveLockFactory";
 
-  private static final Joiner JOINER = Joiner.on(' ').skipNulls();
+  private Properties properties;
 
-  private final Striped<Lock> locks = Striped.lazyWeakLock(Integer.MAX_VALUE);
+  private static final Joiner JOINER = Joiner.on('/').skipNulls();
 
-  public AutoCloseableLock getDbLock(String dbName) {
+  private final HiveLockFactory locks;
+
+  public HiveLock(Properties properties) throws IOException {
+    this.properties = properties;
+    try {
+      locks = (HiveLockFactory) ConstructorUtils.invokeConstructor(
+          Class.forName(properties.getProperty(HIVE_LOCK_TYPE, 
HIVE_LOCK_TYPE_DEFAULT)), properties);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public AutoCloseableHiveLock getDbLock(String dbName) throws IOException{
     Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
 
-    return new AutoCloseableLock(this.locks.get(dbName));
+    return new AutoCloseableHiveLock(this.locks.get(dbName));
   }
 
-  public AutoCloseableLock getTableLock(String dbName, String tableName) {
+  public AutoCloseableHiveLock getTableLock(String dbName, String tableName) 
throws IOException{
     Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
     Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName));
 
-    return new AutoCloseableLock(this.locks.get(JOINER.join(dbName, 
tableName)));
+    return new AutoCloseableHiveLock(this.locks.get(JOINER.join(dbName, 
tableName)));
   }
 
-  public AutoCloseableLock getPartitionLock(String dbName, String tableName, 
Iterable<String> partitionValues) {
+  public AutoCloseableHiveLock getPartitionLock(String dbName, String 
tableName, Iterable<String> partitionValues)
+      throws IOException{
     Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
     Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName));
     Preconditions.checkArgument(partitionValues.iterator().hasNext());
 
-    return new AutoCloseableLock(this.locks.get(JOINER.join(dbName, tableName, 
JOINER.join(partitionValues))));
+    return new AutoCloseableHiveLock(this.locks.get(JOINER.join(dbName, 
tableName, JOINER.join(partitionValues))));
   }
 
 }
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockFactory.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockFactory.java
new file mode 100644
index 0000000..1884afa
--- /dev/null
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import com.google.common.util.concurrent.Striped;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.locks.Lock;
+
+
+/**
+ * A lock factory that provide a get method for a HiveLockImpl for a specific 
object
+ */
+public class HiveLockFactory {
+  protected Properties properties;
+  private final Striped<Lock> locks = Striped.lazyWeakLock(Integer.MAX_VALUE);
+  public HiveLockFactory(Properties _properties) {
+    this.properties = _properties;
+
+  }
+  public HiveLockImpl get(String name) {
+
+    return new HiveLockImpl<Lock>(locks.get(name)) {
+      @Override
+      public void lock() throws IOException {
+        this.lock.lock();
+      }
+
+      @Override
+      public void unlock() throws IOException {
+        this.lock.unlock();
+      }
+    };
+  }
+}
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockImpl.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockImpl.java
new file mode 100644
index 0000000..8fbaf6c
--- /dev/null
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLockImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import java.io.IOException;
+
+
+/**
+ * A wrapper lock to be used by hive.
+ * @param <T> The class of the real lock
+ */
+public abstract class HiveLockImpl<T> {
+  protected T lock;
+  public HiveLockImpl(T _lock){
+    this.lock = _lock;
+  }
+  public abstract void lock() throws IOException;
+  public abstract void unlock() throws IOException;
+}
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index 93dffe2..f481522 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -94,29 +94,32 @@ public abstract class HiveRegister implements Closeable {
 
       @Override
       public Void call() throws Exception {
+        try {
+          if (spec instanceof HiveSpecWithPredicates && 
!evaluatePredicates((HiveSpecWithPredicates) spec)) {
+            log.info("Skipping " + spec + " since predicates return false");
+            return null;
+          }
 
-        if (spec instanceof HiveSpecWithPredicates && 
!evaluatePredicates((HiveSpecWithPredicates) spec)) {
-          log.info("Skipping " + spec + " since predicates return false");
-          return null;
-        }
-
-        if (spec instanceof HiveSpecWithPreActivities) {
-          for (Activity activity : ((HiveSpecWithPreActivities) 
spec).getPreActivities()) {
-            activity.execute(HiveRegister.this);
+          if (spec instanceof HiveSpecWithPreActivities) {
+            for (Activity activity : ((HiveSpecWithPreActivities) 
spec).getPreActivities()) {
+              activity.execute(HiveRegister.this);
+            }
           }
-        }
 
-        registerPath(spec);
+          registerPath(spec);
 
-        if (spec instanceof HiveSpecWithPostActivities) {
-          for (Activity activity : ((HiveSpecWithPostActivities) 
spec).getPostActivities()) {
-            activity.execute(HiveRegister.this);
+          if (spec instanceof HiveSpecWithPostActivities) {
+            for (Activity activity : ((HiveSpecWithPostActivities) 
spec).getPostActivities()) {
+              activity.execute(HiveRegister.this);
+            }
           }
-        }
 
-        return null;
+          return null;
+        } catch (Exception e) {
+          log.error("Exception during hive registration", e);
+          throw e;
+        }
       }
-
     });
     this.futures.put(getSpecId(spec), future);
     return future;
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 9c07337..41ceaf3 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -29,6 +29,11 @@ import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.gobblin.hive.AutoCloseableHiveLock;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory;
+import org.apache.gobblin.kafka.schemareg.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -36,6 +41,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.thrift.TException;
 import org.joda.time.DateTime;
 
@@ -59,7 +65,6 @@ import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.GobblinMetricsRegistry;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.util.AutoCloseableLock;
 import org.apache.gobblin.util.AutoReturnableObject;
 
 
@@ -93,9 +98,11 @@ public class HiveMetaStoreBasedRegister extends HiveRegister 
{
   public static final String CREATE_HIVE_DATABASE = 
HIVE_REGISTER_METRICS_PREFIX + "createDatabaseTimer";
   public static final String CREATE_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX 
+ "createTableTimer";
   public static final String GET_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX + 
"getTableTimer";
+  public static final String GET_AND_SET_LATEST_SCHEMA = 
HIVE_REGISTER_METRICS_PREFIX + "getAndSetLatestSchemaTimer";
   public static final String DROP_TABLE = HIVE_REGISTER_METRICS_PREFIX + 
"dropTableTimer";
   public static final String PATH_REGISTER_TIMER = 
HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
   public static final String SKIP_PARTITION_DIFF_COMPUTATION = 
HIVE_REGISTER_METRICS_PREFIX + "skip.partition.diff.computation";
+  public static final String FETCH_LATEST_SCHEMA = 
HIVE_REGISTER_METRICS_PREFIX + "fetchLatestSchemaFromSchemaRegistry";
   /**
    * To reduce lock aquisition and RPC to metaStoreClient, we cache the result 
of query regarding to
    * the existence of databases and tables in {@link 
#tableAndDbExistenceCache},
@@ -106,9 +113,10 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   public static final String OPTIMIZED_CHECK_ENABLED = 
"hiveRegister.cacheDbTableExistence";
 
   private final HiveMetastoreClientPool clientPool;
-  private final HiveLock locks = new HiveLock();
+  private final HiveLock locks;
   private final EventSubmitter eventSubmitter;
   private final MetricContext metricContext;
+  private final boolean shouldUpdateLatestSchema;
 
   /**
    * Local cache that contains records for both databases and tables.
@@ -134,11 +142,19 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   //for a partition is immutable
   private final boolean skipDiffComputation;
 
+  private Optional<KafkaSchemaRegistry> schemaRegistry = Optional.absent();
+  private String topicName = "";
   public HiveMetaStoreBasedRegister(State state, Optional<String> 
metastoreURI) throws IOException {
     super(state);
+    this.locks = new HiveLock(state.getProperties());
 
     this.optimizedChecks = 
state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
     this.skipDiffComputation = 
state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
+    this.shouldUpdateLatestSchema = 
state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false);
+    if(state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false)) {
+      this.schemaRegistry = 
Optional.of(KafkaSchemaRegistryFactory.getSchemaRegistry(state.getProperties()));
+      topicName = state.getProp(KafkaSource.TOPIC_NAME);
+    }
 
     GenericObjectPoolConfig config = new GenericObjectPoolConfig();
     config.setMaxTotal(this.props.getNumThreads());
@@ -170,14 +186,28 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
       throw new IOException(e);
     }
   }
+  //TODO: We need to find a better to get the latest schema
+  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+    if (this.schemaRegistry.isPresent()) {
+      try (Timer.Context context = 
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
+        String latestSchema = 
this.schemaRegistry.get().getLatestSchema(topicName).toString();
+        
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
 latestSchema);
+        
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+      } catch (SchemaRegistryException | IOException e) {
+        log.error(String.format("Error when fetch latest schema for topic %s", 
topicName), e);
+        throw new IOException(e);
+      }
+    }
+  }
 
   /**
    * If table existed on Hive side will return false;
    * Or will create the table thru. RPC and return retVal from remote 
MetaStore.
    */
   private boolean ensureHiveTableExistenceBeforeAlternation(String tableName, 
String dbName, IMetaStoreClient client,
-      Table table, HiveSpec spec) throws TException{
-    try (AutoCloseableLock lock = this.locks.getTableLock(dbName, tableName)) {
+      Table table, HiveSpec spec) throws TException, IOException{
+    try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName)) {
       try {
         try (Timer.Context context = 
this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
           client.createTable(getTableWithCreateTimeNow(table));
@@ -197,6 +227,9 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
         try (Timer.Context context = 
this.metricContext.timer(GET_HIVE_TABLE).time()) {
           existingTable = 
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
         }
+        if(shouldUpdateLatestSchema) {
+          updateSchema(spec, table);
+        }
         if (needToUpdateTable(existingTable, spec.getTable())) {
           try (Timer.Context context = 
this.metricContext.timer(ALTER_TABLE).time()) {
             client.alter_table(dbName, tableName, 
getNewTblByMergingExistingTblProps(table, existingTable));
@@ -221,7 +254,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
    * @param hiveDbName is the hive databases to be checked for existence
    */
   private boolean ensureHiveDbExistence(String hiveDbName, IMetaStoreClient 
client) throws IOException{
-    try (AutoCloseableLock lock = this.locks.getDbLock(hiveDbName)) {
+    try (AutoCloseableHiveLock lock = this.locks.getDbLock(hiveDbName)) {
       Database db = new Database();
       db.setName(hiveDbName);
 
@@ -297,7 +330,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   @Override
   public boolean createTableIfNotExists(HiveTable table) throws IOException {
     try (AutoReturnableObject<IMetaStoreClient> client = 
this.clientPool.getClient();
-        AutoCloseableLock lock = this.locks.getTableLock(table.getDbName(), 
table.getTableName())) {
+        AutoCloseableHiveLock lock = 
this.locks.getTableLock(table.getDbName(), table.getTableName())) {
       return createTableIfNotExists(client.get(), 
HiveMetaStoreUtils.getTable(table), table);
     }
   }
@@ -309,7 +342,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   @Override
   public boolean addPartitionIfNotExists(HiveTable table, HivePartition 
partition) throws IOException {
     try (AutoReturnableObject<IMetaStoreClient> client = 
this.clientPool.getClient();
-        AutoCloseableLock lock = this.locks.getTableLock(table.getDbName(), 
table.getTableName())) {
+        AutoCloseableHiveLock lock = 
this.locks.getTableLock(table.getDbName(), table.getTableName())) {
       try {
         try (Timer.Context context = 
this.metricContext.timer(GET_HIVE_PARTITION).time()) {
           client.get().getPartition(table.getDbName(), table.getTableName(), 
partition.getValues());
@@ -337,7 +370,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
     String dbName = table.getDbName();
     String tableName = table.getTableName();
 
-    try (AutoCloseableLock lock = this.locks.getTableLock(dbName, tableName)) {
+    try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName)) {
       boolean tableExists;
       try (Timer.Context context = 
this.metricContext.timer(TABLE_EXISTS).time()) {
         tableExists = client.tableExists(table.getDbName(), 
table.getTableName());
@@ -460,14 +493,14 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   }
 
   private void addOrAlterPartition(IMetaStoreClient client, Table table, 
HivePartition partition)
-      throws TException {
+      throws TException, IOException {
     Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
 
     Preconditions.checkArgument(table.getPartitionKeysSize() == 
nativePartition.getValues().size(),
         String.format("Partition key size is %s but partition value size is 
%s", table.getPartitionKeys().size(),
             nativePartition.getValues().size()));
 
-    try (AutoCloseableLock lock =
+    try (AutoCloseableHiveLock lock =
         this.locks.getPartitionLock(table.getDbName(), table.getTableName(), 
nativePartition.getValues())) {
 
       try {
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 3064c35..efd0b9c 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -239,7 +239,7 @@ public class HiveMetaStoreUtils {
     return sd;
   }
 
-  private static SerDeInfo getSerDeInfo(HiveRegistrationUnit unit) {
+  public static SerDeInfo getSerDeInfo(HiveRegistrationUnit unit) {
     State props = unit.getSerDeProps();
     SerDeInfo si = new SerDeInfo();
     si.setParameters(getParameters(props));
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/DistributedHiveLockFactory.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/DistributedHiveLockFactory.java
new file mode 100644
index 0000000..ae463e2
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/DistributedHiveLockFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.locks;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.gobblin.hive.HiveLockFactory;
+import org.apache.gobblin.hive.HiveLockImpl;
+
+/**
+ * A lock factory that extends {@link HiveLockFactory} provide a get method 
for a distributed lock for a specific object
+ */
+public class DistributedHiveLockFactory extends HiveLockFactory {
+  public DistributedHiveLockFactory(Properties properties) {
+    super(properties);
+  }
+  public HiveLockImpl get(String name) {
+    return new HiveLockImpl<ZookeeperBasedJobLock>(new 
ZookeeperBasedJobLock(properties, name)) {
+      @Override
+      public void lock() throws IOException {
+        try {
+          this.lock.lock();
+        } catch (JobLockException e) {
+          throw new IOException(e);
+        }
+      }
+
+      @Override
+      public void unlock() throws IOException {
+        try {
+          this.lock.unlock();
+        } catch (JobLockException e) {
+          throw new IOException(e);
+        }
+      }
+    };
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
index 0b585a8..1604352 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/locks/ZookeeperBasedJobLock.java
@@ -69,8 +69,11 @@ public class ZookeeperBasedJobLock implements 
ListenableJobLock {
   private long lockAcquireTimeoutMilliseconds;
   private InterProcessLock lock;
 
-  public ZookeeperBasedJobLock(Properties properties) throws JobLockException {
-    String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+  public ZookeeperBasedJobLock(Properties properties) {
+    this(properties, properties.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+  }
+
+  public ZookeeperBasedJobLock(Properties properties, String jobName)  {
     this.lockAcquireTimeoutMilliseconds =
         getLong(properties, LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS, 
LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS_DEFAULT);
     this.lockPath = Paths.get(LOCKS_ROOT_PATH, jobName).toString();

Reply via email to