Repository: incubator-gobblin Updated Branches: refs/heads/master d98f77c14 -> 261819c63
[GOBBLIN-489] Implement PusherFactory Closes #2359 from zxcware/broker Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/261819c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/261819c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/261819c6 Branch: refs/heads/master Commit: 261819c634710031466758d88b549b1f5aa3bd73 Parents: d98f77c Author: zhchen <[email protected]> Authored: Tue Jun 12 16:05:37 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Jun 12 16:05:45 2018 -0700 ---------------------------------------------------------------------- .../apache/gobblin/broker/BrokerConstants.java | 2 + .../kafka/GobblinScopePusherFactory.java | 20 ++++ .../gobblin/metrics/kafka/PusherFactory.java | 55 ++++++++++ .../metrics/kafka/PusherFactoryTest.java | 102 +++++++++++++++++++ .../broker/SharedResourcesBrokerFactory.java | 26 ++++- .../SharedResourcesBrokerFactoryTest.java | 31 ++++++ 6 files changed, 235 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java index c29a162..14da49f 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java @@ -22,4 +22,6 @@ package org.apache.gobblin.broker; */ public class BrokerConstants { public static final String GOBBLIN_BROKER_CONFIG_PREFIX = "gobblin.broker"; + + public static final String GOBBLIN_BROKER_CONFIG_NAMESPACES = "gobblin.brokerNamespaces"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java new file mode 100644 index 0000000..a228394 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java @@ -0,0 +1,20 @@ +package org.apache.gobblin.metrics.kafka; + +import org.apache.gobblin.broker.StringNameSharedResourceKey; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.ConfigView; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + + +/** + * An {@link PusherFactory} to create a shared {@link Pusher} instance + * in {@link GobblinScopeTypes} + */ +public class GobblinScopePusherFactory<T> extends PusherFactory<T, GobblinScopeTypes> { + @Override + public GobblinScopeTypes getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker, + ConfigView<GobblinScopeTypes, StringNameSharedResourceKey> config) { + // By default, a job level resource + return GobblinScopeTypes.JOB; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java new file mode 100644 index 0000000..08cbc7e --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java @@ -0,0 +1,55 @@ +package org.apache.gobblin.metrics.kafka; + +import org.apache.commons.lang3.reflect.ConstructorUtils; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.broker.ResourceInstance; +import org.apache.gobblin.broker.StringNameSharedResourceKey; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.ScopeType; +import org.apache.gobblin.broker.iface.ScopedConfigView; +import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + + +/** + * Basic resource factory to create shared {@link Pusher} instance + */ +@Slf4j +public abstract class PusherFactory<T, S extends ScopeType<S>> implements SharedResourceFactory<Pusher<T>, StringNameSharedResourceKey, S> { + private static final String FACTORY_NAME = "pusher"; + private static final String PUSHER_CLASS = "class"; + + private static final Config FALLBACK = ConfigFactory.parseMap( + ImmutableMap.<String, Object>builder() + .put(PUSHER_CLASS, LoggingPusher.class.getName()) + .build()); + + @Override + public String getName() { + return FACTORY_NAME; + } + + @Override + public SharedResourceFactoryResponse<Pusher<T>> createResource(SharedResourcesBroker<S> broker, + ScopedConfigView<S, StringNameSharedResourceKey> config) + throws NotConfiguredException { + Config pusherConfig = config.getConfig().withFallback(FALLBACK); + String pusherClass = pusherConfig.getString(PUSHER_CLASS); + + Pusher<T> pusher; + try { + pusher = (Pusher) ConstructorUtils.invokeConstructor(Class.forName(pusherClass), pusherConfig); + } catch (ReflectiveOperationException e) { + log.warn("Unable to construct a pusher with class {}. LoggingPusher will be used", pusherClass, e); + pusher = new LoggingPusher<>(); + } + return new ResourceInstance<>(pusher); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java new file mode 100644 index 0000000..ad48a1e --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java @@ -0,0 +1,102 @@ +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.StringNameSharedResourceKey; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + + +/** + * Test {@link PusherFactory}s + */ +public class PusherFactoryTest { + + @Test + private void testCreateGobblinScopedDefaultPusher() + throws NotConfiguredException { + SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory + .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker.newSubscopedBuilder( + new JobScopeInstance("PusherFactoryTest", String.valueOf(System.currentTimeMillis()))).build(); + + StringNameSharedResourceKey key = new StringNameSharedResourceKey("test"); + + Pusher<Object> pusher = jobBroker.getSharedResource(new GobblinScopePusherFactory<>(), key); + Assert.assertEquals(pusher.getClass(), LoggingPusher.class); + + try { + jobBroker.close(); + instanceBroker.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + private void testCreateGobblinScopedCustomPusher() + throws NotConfiguredException { + Map<String, String> configAsMap = new HashMap<>(); + configAsMap.put("gobblin.broker.pusher.class", TestPusher.class.getName()); + configAsMap.put("gobblin.broker.pusher.id", "sharedId"); + configAsMap.put("gobblin.broker.pusher.testPusher.id", "testPusherId"); + configAsMap.put("gobblin.broker.pusher.testPusher.name", "testPusherName"); + + SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory + .createDefaultTopLevelBroker(ConfigFactory.parseMap(configAsMap), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker.newSubscopedBuilder( + new JobScopeInstance("PusherFactoryTest", String.valueOf(System.currentTimeMillis()))).build(); + + StringNameSharedResourceKey key = new StringNameSharedResourceKey("testPusher"); + + Pusher<String> pusher = jobBroker.getSharedResource(new GobblinScopePusherFactory<>(), key); + + Assert.assertEquals(pusher.getClass(), TestPusher.class); + TestPusher testPusher = (TestPusher) pusher; + Assert.assertTrue(!testPusher.isClosed); + Assert.assertEquals(testPusher.id, "testPusherId"); + Assert.assertEquals(testPusher.name, "testPusherName"); + + try { + jobBroker.close(); + instanceBroker.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + Assert.assertTrue(testPusher.isClosed); + } + + public static class TestPusher implements Pusher<String> { + private boolean isClosed = false; + private final String id; + private final String name; + + public TestPusher(Config config) { + id = config.getString("id"); + name = config.getString("name"); + } + + @Override + public void pushMessages(List<String> messages) { + } + + @Override + public void close() + throws IOException { + isClosed = true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java index 06d705a..bdb85ac 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java @@ -21,6 +21,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -54,6 +56,10 @@ public class SharedResourcesBrokerFactory { public static final String BROKER_CONF_FILE_KEY = BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX + ".configuration"; public static final String DEFAULT_BROKER_CONF_FILE = "gobblinBroker.conf"; + private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings(); + private static final Config BROKER_NAMESPACES_FALLBACK = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(BrokerConstants.GOBBLIN_BROKER_CONFIG_NAMESPACES, "").build()); + /** * Create a root {@link SharedResourcesBroker}. Subscoped brokers should be built using * {@link SharedResourcesBroker#newSubscopedBuilder(ScopeInstance)}. @@ -78,7 +84,7 @@ public class SharedResourcesBrokerFactory { return new SharedResourcesBrokerImpl<>(new DefaultBrokerCache<S>(), scopeWrapper, Lists.newArrayList(new SharedResourcesBrokerImpl.ScopedConfig<>(globalScope.getType(), - ConfigUtils.getConfigOrEmpty(addSystemConfigurationToConfig(config), BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX))), + getBrokerConfig(addSystemConfigurationToConfig(config)))), ImmutableMap.of(globalScope.getType(), scopeWrapper)); } @@ -86,6 +92,24 @@ public class SharedResourcesBrokerFactory { private static SharedResourcesBroker<SimpleScopeType> SINGLETON; /** + * Get all broker configurations from the given {@code srcConfig}. Configurations from + * {@value BrokerConstants#GOBBLIN_BROKER_CONFIG_PREFIX} is always loaded first, then in-order from namespaces, + * which is encoded as a comma separated string keyed by {@value BrokerConstants#GOBBLIN_BROKER_CONFIG_NAMESPACES}. + */ + @VisibleForTesting + static Config getBrokerConfig(Config srcConfig) { + Config allSrcConfig = srcConfig.withFallback(BROKER_NAMESPACES_FALLBACK); + String namespaces = allSrcConfig.getString(BrokerConstants.GOBBLIN_BROKER_CONFIG_NAMESPACES); + Config brokerConfig = ConfigUtils.getConfigOrEmpty(allSrcConfig, BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX); + + for (String namespace : LIST_SPLITTER.splitToList(namespaces)) { + brokerConfig = brokerConfig.withFallback(ConfigUtils.getConfigOrEmpty(allSrcConfig, namespace)); + } + + return brokerConfig; + } + + /** * Get the implicit {@link SharedResourcesBroker} in the callers thread. This is either a singleton broker configured * from environment variables, java options, and classpath configuration options, or a specific broker injected * elsewhere in the application. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java index 1b1c3e8..2df48a9 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java @@ -17,6 +17,8 @@ package org.apache.gobblin.broker; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -67,6 +69,35 @@ public class SharedResourcesBrokerFactoryTest { Assert.assertEquals(configView.getConfig().getString("testKey"), "testValue"); } + @Test + public void testGetBrokerConfig() { + Map<String, String> srcConfigMap = new HashMap<>(); + srcConfigMap.put("gobblin.broker.key1", "value1"); + + // Test global namespace, "gobblin.broker" + Config brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap)); + Config expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1")); + Assert.assertEquals(brokerConfig, expectedConfig); + + // Test extra namespace, "gobblin.shared" + srcConfigMap.put("gobblin.shared.key2", "value2"); + srcConfigMap.put("gobblin.brokerNamespaces", "gobblin.shared"); + brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap)); + expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1","key2", "value2")); + Assert.assertEquals(brokerConfig, expectedConfig); + + // Test a list of extra namespaces, configurations are respected in order + srcConfigMap.put("gobblin.shared.key2", "value2"); + srcConfigMap.put("gobblin.shared.key3", "value3"); + srcConfigMap.put("gobblin.shared2.key3", "value3x"); + srcConfigMap.put("gobblin.shared2.key4", "value4"); + srcConfigMap.put("gobblin.brokerNamespaces", "gobblin.shared, gobblin.shared2"); + brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap)); + expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1", "key2", "value2", + "key3", "value3", "key4", "value4")); + Assert.assertEquals(brokerConfig, expectedConfig); + } + public static class ImplicitBrokerTest implements Runnable { @Override
