This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1092a89 [GOBBLIN-993] Support job level hive configuration override
1092a89 is described below
commit 1092a891af7eda5732c948695b92e34540751613
Author: zhchen <[email protected]>
AuthorDate: Wed Dec 4 18:10:11 2019 -0800
[GOBBLIN-993] Support job level hive configuration override
Closes #2839 from zxcware/ho
---
.../gobblin/hive/HiveMetastoreClientPool.java | 30 ++++++++++++++
.../gobblin/hive/HiveMetastoreClientPoolTest.java | 46 ++++++++++++++++++++++
2 files changed, 76 insertions(+)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetastoreClientPool.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetastoreClientPool.java
index 49915ed..a1b4051 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetastoreClientPool.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetastoreClientPool.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.gobblin.util.PropertiesUtils;
@@ -42,11 +43,13 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.AutoReturnableObject;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
/**
* A pool of {@link IMetaStoreClient} for querying the Hive metastore.
*/
+@Slf4j
public class HiveMetastoreClientPool {
private final GenericObjectPool<IMetaStoreClient> pool;
@@ -66,6 +69,18 @@ public class HiveMetastoreClientPool {
public static final String POOL_MIN_EVICTABLE_IDLE_TIME_MILLIS =
"pool.min.evictable.idle.time.millis";
+ /**
+ * To provide additional or override configuration of a certain hive
metastore,
+ * <p> firstly, set {@code hive.additionalConfig.targetUri=<the target hive
metastore uri>}
+ * <p> Then all configurations with {@value
#POOL_HIVE_ADDITIONAL_CONFIG_PREFIX} prefix will be extracted
+ * out of the job configurations and applied on top. for example, if there
is a job configuration
+ * {@code hive.additionalConfig.hive.metastore.sasl.enabled=false},
+ * {@code hive.metastore.sasl.enabled=false} will be extracted and applied
+ */
+ public static final String POOL_HIVE_ADDITIONAL_CONFIG_PREFIX =
"hive.additionalConfig.";
+
+ public static final String POOL_HIVE_ADDITIONAL_CONFIG_TARGET =
POOL_HIVE_ADDITIONAL_CONFIG_PREFIX + "targetUri";
+
public static final long DEFAULT_POOL_MIN_EVICTABLE_IDLE_TIME_MILLIS =
600000L;
public static final String POOL_TIME_BETWEEN_EVICTION_MILLIS =
"pool.time.between eviction.millis";
@@ -139,7 +154,22 @@ public class HiveMetastoreClientPool {
config.setMaxTotal(this.hiveRegProps.getNumThreads());
config.setMaxIdle(this.hiveRegProps.getNumThreads());
+ String extraConfigTarget =
properties.getProperty(POOL_HIVE_ADDITIONAL_CONFIG_TARGET, "");
+
this.factory = new HiveMetaStoreClientFactory(metastoreURI);
+ if (metastoreURI.isPresent() && StringUtils.isNotEmpty(extraConfigTarget)
+ && metastoreURI.get().equals(extraConfigTarget)) {
+ log.info("Setting additional hive config for metastore {}",
extraConfigTarget);
+ properties.forEach((key, value) -> {
+ String configKey = key.toString();
+ if (configKey.startsWith(POOL_HIVE_ADDITIONAL_CONFIG_PREFIX) &&
!configKey.equals(
+ POOL_HIVE_ADDITIONAL_CONFIG_TARGET)) {
+ log.info("Setting additional hive config {}={}",
configKey.substring(POOL_HIVE_ADDITIONAL_CONFIG_PREFIX.length()),
+ value.toString());
+
this.factory.getHiveConf().set(configKey.substring(POOL_HIVE_ADDITIONAL_CONFIG_PREFIX.length()),
value.toString());
+ }
+ });
+ }
this.pool = new GenericObjectPool<>(this.factory, config);
//Set the eviction policy for the client pool
this.pool.setEvictionPolicyClassName(properties.getProperty(POOL_EVICTION_POLICY_CLASS_NAME,
DEFAULT_POOL_EVICTION_POLICY_CLASS_NAME));
diff --git
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetastoreClientPoolTest.java
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetastoreClientPoolTest.java
new file mode 100644
index 0000000..43a95a3
--- /dev/null
+++
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetastoreClientPoolTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+
+@Test
+public class HiveMetastoreClientPoolTest {
+
+ public void testExtraHiveConf()
+ throws IOException {
+ String additionalHiveConf = "myhive.metastore.sasl.enabled";
+ Properties props = new Properties();
+ props.setProperty("hive.additionalConfig.targetUri", "test-target");
+ props.setProperty("hive.additionalConfig." + additionalHiveConf, "false");
+
+ HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(props,
Optional.of("test"));
+ Assert.assertNull(pool.getHiveConf().get(additionalHiveConf));
+
+ pool = HiveMetastoreClientPool.get(props, Optional.of("test-target"));
+
Assert.assertFalse(Boolean.valueOf(pool.getHiveConf().get(additionalHiveConf)));
+ }
+}