Repository: incubator-gobblin Updated Branches: refs/heads/master 28e3aece7 -> 25530f075
[GOBBLIN-557] Reuse HiveConf object by resource broker Closes #2418 from autumnust/fixHiveConfReinstantiate Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/25530f07 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/25530f07 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/25530f07 Branch: refs/heads/master Commit: 25530f0755a336ebc86f482f0496459a8cc033fc Parents: 28e3aec Author: Lei Sun <[email protected]> Authored: Mon Aug 6 11:46:21 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Aug 6 11:46:21 2018 -0700 ---------------------------------------------------------------------- .../apache/gobblin/hive/HiveConfFactory.java | 57 ++++++++++++++++++++ .../hive/HiveMetaStoreClientFactory.java | 29 ++++++---- .../hive/metastore/HiveMetaStoreUtils.java | 14 +++-- .../gobblin/hive/HiveConfFactoryTest.java | 38 +++++++++++++ 4 files changed, 126 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java new file mode 100644 index 0000000..447fc80 --- /dev/null +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.hive; + +import org.apache.hadoop.hive.conf.HiveConf; + +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.ResourceInstance; +import org.apache.gobblin.broker.iface.ConfigView; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.ScopeType; +import org.apache.gobblin.broker.iface.ScopedConfigView; +import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + + +/** + * The factory that creates a {@link HiveConf} as shared resource. + * {@link EmptyKey} is fair since {@link HiveConf} seems to be read-only. + */ +public class HiveConfFactory<S extends ScopeType<S>> implements SharedResourceFactory<HiveConf, EmptyKey, S> { + public static final String FACTORY_NAME = "hiveConfFactory"; + + @Override + public String getName() { + return FACTORY_NAME; + } + + @Override + public SharedResourceFactoryResponse<HiveConf> createResource(SharedResourcesBroker<S> broker, + ScopedConfigView<S, EmptyKey> config) + throws NotConfiguredException { + // We could extend the constructor of HiveConf to accept arguments in the future. + return new ResourceInstance<>(new HiveConf()); + } + + @Override + public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, EmptyKey> config) { + return broker.selfScope().getType().rootScope(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java index 772b5d0..9907135 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java @@ -17,8 +17,6 @@ package org.apache.gobblin.hive; -import lombok.Getter; - import org.apache.commons.lang.StringUtils; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; @@ -30,8 +28,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; - -import com.google.common.base.Optional; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -39,12 +35,22 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.iface.NotConfiguredException; + import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; /** * An implementation of {@link BasePooledObjectFactory} for {@link IMetaStoreClient}. */ +@Slf4j public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> { private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClientFactory.class); @@ -58,12 +64,17 @@ public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaSto } private static HiveConf getHiveConf(Optional<String> hcatURI) { - HiveConf hiveConf = new HiveConf(); - if (hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get())) { - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hcatURI.get()); - hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, hcatURI.get()); + try { + HiveConf hiveConf = SharedResourcesBrokerFactory.getImplicitBroker() + .getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); + if (hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get())) { + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hcatURI.get()); + hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, hcatURI.get()); + } + return hiveConf; + } catch (NotConfiguredException nce) { + throw new RuntimeException("Implicit broker is not correctly configured, failed to fetch a HiveConf object", nce); } - return hiveConf; } public HiveMetaStoreClientFactory(HiveConf hiveConf) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java index 61d3f5f..bdf8800 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java @@ -17,7 +17,6 @@ package org.apache.gobblin.hive.metastore; -import com.google.common.base.Splitter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; @@ -49,12 +48,17 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.iface.NotConfiguredException; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.hive.HiveConfFactory; import org.apache.gobblin.hive.HiveConstants; import org.apache.gobblin.hive.HivePartition; import org.apache.gobblin.hive.HiveRegistrationUnit; @@ -370,15 +374,19 @@ public class HiveMetaStoreUtils { } String serde = serdeClass.get(); - HiveConf hiveConf = new HiveConf(); - + HiveConf hiveConf; Deserializer deserializer; try { + hiveConf = SharedResourcesBrokerFactory + .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); deserializer = ReflectionUtils.newInstance(hiveConf.getClassByName(serde).asSubclass(Deserializer.class), hiveConf); } catch (ClassNotFoundException e) { LOG.warn("Serde class " + serde + " not found!", e); return null; + } catch (NotConfiguredException nce) { + LOG.error("Implicit broker is not configured properly", nce); + return null; } Properties props = new Properties(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java new file mode 100644 index 0000000..de3ac6e --- /dev/null +++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.hive; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; + + +public class HiveConfFactoryTest { + @Test + public void testSameKey() throws Exception { + HiveConf hiveConf = SharedResourcesBrokerFactory + .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); + HiveConf hiveConf1 = SharedResourcesBrokerFactory + .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE); + + Assert.assertEquals(hiveConf, hiveConf1); + } +} \ No newline at end of file
