Repository: incubator-gobblin Updated Branches: refs/heads/master 2979d5e68 -> eb2c128dc
[GOBBLIN-562] Added HiveConfKey to reflect the metastoreURI Closes #2424 from autumnust/fixHiveConfKey Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/eb2c128d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/eb2c128d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/eb2c128d Branch: refs/heads/master Commit: eb2c128dcf0427f3ddc4a51599afc53b608d319f Parents: 2979d5e Author: Lei Sun <[email protected]> Authored: Wed Aug 15 10:33:26 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Aug 15 10:33:26 2018 -0700 ---------------------------------------------------------------------- .../apache/gobblin/hive/HiveConfFactory.java | 45 ++++++++++++++++--- .../hive/HiveMetaStoreClientFactory.java | 12 ++--- .../apache/gobblin/hive/SharedHiveConfKey.java | 46 ++++++++++++++++++++ .../hive/metastore/HiveMetaStoreUtils.java | 3 +- .../gobblin/hive/HiveConfFactoryTest.java | 33 +++++++++++--- .../hive/HiveMetaStoreClientFactoryTest.java | 7 +++ .../src/test/resources/hive-site.xml | 14 ++++++ 7 files changed, 138 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eb2c128d/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java index 447fc80..58aba1e 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java @@ -17,8 +17,13 @@ package org.apache.gobblin.hive; +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; +import com.google.common.base.Optional; + import org.apache.gobblin.broker.EmptyKey; import org.apache.gobblin.broker.ResourceInstance; import org.apache.gobblin.broker.iface.ConfigView; @@ -29,13 +34,15 @@ import org.apache.gobblin.broker.iface.SharedResourceFactory; import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import static org.apache.gobblin.hive.HiveMetaStoreClientFactory.HIVE_METASTORE_TOKEN_SIGNATURE; + /** * The factory that creates a {@link HiveConf} as shared resource. * {@link EmptyKey} is fair since {@link HiveConf} seems to be read-only. */ -public class HiveConfFactory<S extends ScopeType<S>> implements SharedResourceFactory<HiveConf, EmptyKey, S> { - public static final String FACTORY_NAME = "hiveConfFactory"; +public class HiveConfFactory<S extends ScopeType<S>> implements SharedResourceFactory<HiveConf, SharedHiveConfKey, S> { + static final String FACTORY_NAME = "hiveConfFactory"; @Override public String getName() { @@ -44,14 +51,40 @@ public class HiveConfFactory<S extends ScopeType<S>> implements SharedResourceFa @Override public SharedResourceFactoryResponse<HiveConf> createResource(SharedResourcesBroker<S> broker, - ScopedConfigView<S, EmptyKey> config) + ScopedConfigView<S, SharedHiveConfKey> config) throws NotConfiguredException { - // We could extend the constructor of HiveConf to accept arguments in the future. - return new ResourceInstance<>(new HiveConf()); + SharedHiveConfKey sharedHiveConfKey = config.getKey(); + HiveConf rawConf = new HiveConf(); + if (!sharedHiveConfKey.hiveConfUri.equals(SharedHiveConfKey.INSTANCE.toConfigurationKey()) && StringUtils + .isNotEmpty(sharedHiveConfKey.hiveConfUri)) { + rawConf.setVar(HiveConf.ConfVars.METASTOREURIS, sharedHiveConfKey.hiveConfUri); + rawConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, sharedHiveConfKey.hiveConfUri); + } + + return new ResourceInstance<>(rawConf); + } + + /** + * + * @param hcatURI User specified hcatURI. + * @param broker A shared resource broker + * @return a {@link HiveConf} with specified hcatURI if any. + * @throws IOException + */ + public static <S extends ScopeType<S>> HiveConf get(Optional<String> hcatURI, SharedResourcesBroker<S> broker) + throws IOException { + try { + SharedHiveConfKey confKey = + hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get()) ? new SharedHiveConfKey(hcatURI.get()) + : SharedHiveConfKey.INSTANCE; + return broker.getSharedResource(new HiveConfFactory<>(), confKey); + } catch (NotConfiguredException nce) { + throw new IOException(nce); + } } @Override - public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, EmptyKey> config) { + public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, SharedHiveConfKey> config) { return broker.selfScope().getType().rootScope(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eb2c128d/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java index 9907135..fcbe7af 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java @@ -17,6 +17,8 @@ package org.apache.gobblin.hive; +import java.io.IOException; + import org.apache.commons.lang.StringUtils; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; @@ -65,14 +67,8 @@ public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaSto private static HiveConf getHiveConf(Optional<String> hcatURI) { try { - HiveConf hiveConf = SharedResourcesBrokerFactory.getImplicitBroker() - .getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); - if (hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get())) { - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hcatURI.get()); - hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, hcatURI.get()); - } - return hiveConf; - } catch (NotConfiguredException nce) { + return HiveConfFactory.get(hcatURI, SharedResourcesBrokerFactory.getImplicitBroker()); + } catch (IOException nce) { throw new RuntimeException("Implicit broker is not correctly configured, failed to fetch a HiveConf object", nce); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eb2c128d/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/SharedHiveConfKey.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/SharedHiveConfKey.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/SharedHiveConfKey.java new file mode 100644 index 0000000..61f1981 --- /dev/null +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/SharedHiveConfKey.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.hive; + +import lombok.EqualsAndHashCode; + +import org.apache.gobblin.broker.iface.SharedResourceKey; + + +/** + * {@link SharedResourceKey} for {@link org.apache.gobblin.hive.HiveConfFactory}. Contains an identifier for + * a cluster's Hive Metastore URI. + */ +@EqualsAndHashCode +public class SharedHiveConfKey implements SharedResourceKey { + public final String hiveConfUri; + + /** + * A singleton instance used with empty hcatURI. + * */ + public static final SharedHiveConfKey INSTANCE = new SharedHiveConfKey(""); + + public SharedHiveConfKey(String hiveConfUri) { + this.hiveConfUri = hiveConfUri; + } + + @Override + public String toConfigurationKey() { + return this.hiveConfUri; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eb2c128d/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java index bdf8800..b05f697 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java @@ -64,6 +64,7 @@ import org.apache.gobblin.hive.HivePartition; import org.apache.gobblin.hive.HiveRegistrationUnit; import org.apache.gobblin.hive.HiveRegistrationUnit.Column; import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.SharedHiveConfKey; /** @@ -378,7 +379,7 @@ public class HiveMetaStoreUtils { Deserializer deserializer; try { hiveConf = SharedResourcesBrokerFactory - .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); + .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), SharedHiveConfKey.INSTANCE); deserializer = ReflectionUtils.newInstance(hiveConf.getClassByName(serde).asSubclass(Deserializer.class), hiveConf); } catch (ClassNotFoundException e) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eb2c128d/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java index de3ac6e..c221bc6 100644 --- a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java +++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java @@ -21,18 +21,37 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.testng.Assert; import org.testng.annotations.Test; -import org.apache.gobblin.broker.EmptyKey; +import com.google.common.base.Optional; + import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import static org.apache.gobblin.hive.HiveMetaStoreClientFactory.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; + public class HiveConfFactoryTest { @Test - public void testSameKey() throws Exception { - HiveConf hiveConf = SharedResourcesBrokerFactory - .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); - HiveConf hiveConf1 = SharedResourcesBrokerFactory - .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); - + public void testHiveConfFactory() throws Exception { + HiveConf hiveConf = HiveConfFactory.get(Optional.absent(), SharedResourcesBrokerFactory.getImplicitBroker()); + HiveConf hiveConf1 = HiveConfFactory.get(Optional.absent(), SharedResourcesBrokerFactory.getImplicitBroker()); Assert.assertEquals(hiveConf, hiveConf1); + // When there's no hcatURI specified, the default hive-site should be loaded. + Assert.assertTrue(hiveConf.getVar(METASTOREURIS).equals("file:///test")); + Assert.assertTrue(hiveConf.get(HIVE_METASTORE_TOKEN_SIGNATURE).equals("file:///test")); + + HiveConf hiveConf2 = HiveConfFactory.get(Optional.of("hcat1"), SharedResourcesBrokerFactory.getImplicitBroker()); + HiveConf hiveConf3 = HiveConfFactory.get(Optional.of("hcat1"), SharedResourcesBrokerFactory.getImplicitBroker()); + Assert.assertEquals(hiveConf2, hiveConf3); + + HiveConf hiveConf4 = HiveConfFactory.get(Optional.of("hcat11"), SharedResourcesBrokerFactory.getImplicitBroker()); + Assert.assertNotEquals(hiveConf3, hiveConf4); + Assert.assertNotEquals(hiveConf4, hiveConf); + + // THe uri should be correctly set. + Assert.assertEquals(hiveConf3.getVar(METASTOREURIS), "hcat1"); + Assert.assertEquals(hiveConf3.get(HIVE_METASTORE_TOKEN_SIGNATURE), "hcat1"); + Assert.assertEquals(hiveConf4.getVar(METASTOREURIS), "hcat11"); + Assert.assertEquals(hiveConf4.get(HIVE_METASTORE_TOKEN_SIGNATURE), "hcat11"); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eb2c128d/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetaStoreClientFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetaStoreClientFactoryTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetaStoreClientFactoryTest.java index 1cff2f6..91483e6 100644 --- a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetaStoreClientFactoryTest.java +++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveMetaStoreClientFactoryTest.java @@ -23,12 +23,19 @@ import org.apache.thrift.TException; import org.testng.Assert; import org.testng.annotations.Test; +import static org.apache.gobblin.hive.HiveMetaStoreClientFactory.HIVE_METASTORE_TOKEN_SIGNATURE; + public class HiveMetaStoreClientFactoryTest { @Test public void testCreate() throws TException { HiveConf hiveConf = new HiveConf(); HiveMetaStoreClientFactory factory = new HiveMetaStoreClientFactory(hiveConf); + + // Since we havE a specified hive-site in the classpath, so have to null it out here to proceed the test + // The original value it will get if no local hive-site is placed, will be an empty string. + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, ""); + hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, ""); IMetaStoreClient msc = factory.create(); String dbName = "test_db"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eb2c128d/gobblin-hive-registration/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/test/resources/hive-site.xml b/gobblin-hive-registration/src/test/resources/hive-site.xml new file mode 100644 index 0000000..3b00cd8 --- /dev/null +++ b/gobblin-hive-registration/src/test/resources/hive-site.xml @@ -0,0 +1,14 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<configuration> + <property> + <name>hive.metastore.uris</name> + <value>file:///test</value> + </property> + + <property> + <name>hive.metastore.token.signature</name> + <value>file:///test</value> + </property> +</configuration>
