Repository: samza Updated Branches: refs/heads/master bd71d609c -> e2c35c000
samza-1156: Improve ApplicationRunner method naming and class structure navina xinyuiscool nickpan47 take a look Author: Jacob Maes <[email protected]> Reviewers: Yi Pan (Data Infrastructure) <[email protected]> Closes #93 from jmakes/app-runner-class-hier Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e2c35c00 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e2c35c00 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e2c35c00 Branch: refs/heads/master Commit: e2c35c00068f071840332a200d9879e07fdb1001 Parents: bd71d60 Author: Jacob Maes <[email protected]> Authored: Wed Mar 22 07:59:08 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Wed Mar 22 07:59:08 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/runtime/ApplicationRunner.java | 34 +++--- .../apache/samza/operators/StreamGraphImpl.java | 2 +- .../runtime/AbstractApplicationRunner.java | 23 ++-- .../runtime/TestAbstractApplicationRunner.java | 108 +++++++++---------- 4 files changed, 85 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java index 6da1242..d148626 100644 --- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java @@ -27,16 +27,15 @@ import org.apache.samza.system.StreamSpec; /** - * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph} - * - * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order - * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor. + * A physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph} */ @InterfaceStability.Unstable -public interface ApplicationRunner { +public abstract class ApplicationRunner { + + private static final String RUNNER_CONFIG = "app.runner.class"; + private static final String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner"; - String RUNNER_CONFIG = "app.runner.class"; - String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner"; + protected final Config config; /** * Static method to create the local {@link ApplicationRunner}. @@ -44,19 +43,17 @@ public interface ApplicationRunner { * @param config configuration passed in to initialize the Samza local process * @return the local {@link ApplicationRunner} to run the user-defined stream applications */ - static ApplicationRunner getLocalRunner(Config config) { + public static ApplicationRunner getLocalRunner(Config config) { return null; } /** * Static method to load the {@link ApplicationRunner} * - * Requires the implementation class to define a constructor with a single {@link Config} as the argument. - * * @param config configuration passed in to initialize the Samza processes * @return the configure-driven {@link ApplicationRunner} to run the user-defined stream applications */ - static ApplicationRunner fromConfig(Config config) { + public static ApplicationRunner fromConfig(Config config) { try { Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS)); if (ApplicationRunner.class.isAssignableFrom(runnerClass)) { @@ -68,16 +65,25 @@ public interface ApplicationRunner { RUNNER_CONFIG)), e); } throw new ConfigException(String.format( - "Class %s does not implement interface ApplicationRunner properly", + "Class %s does not extend ApplicationRunner properly", config.get(RUNNER_CONFIG))); } + + public ApplicationRunner(Config config) { + if (config == null) { + throw new NullPointerException("Parameter 'config' cannot be null."); + } + + this.config = config; + } + /** * Method to be invoked to deploy and run the actual Samza jobs to execute {@link StreamApplication} * * @param streamApp the user-defined {@link StreamApplication} object */ - void run(StreamApplication streamApp); + public abstract void run(StreamApplication streamApp); /** * Constructs a {@link StreamSpec} from the configuration for the specified streamId. @@ -98,5 +104,5 @@ public interface ApplicationRunner { * @param streamId The logical identifier for the stream in Samza. * @return The {@link StreamSpec} instance. */ - StreamSpec streamFromConfig(String streamId); + public abstract StreamSpec getStream(String streamId); } http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index 1b36f76..fe86699 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -255,7 +255,7 @@ public class StreamGraphImpl implements StreamGraph { config.get(JobConfig.JOB_NAME()), config.get(JobConfig.JOB_ID(), "1"), opNameWithId); - StreamSpec streamSpec = runner.streamFromConfig(streamId); + StreamSpec streamSpec = runner.getStream(streamId); this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn)); IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId()); http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java index a5d784a..5f01ca7 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -24,23 +24,20 @@ import org.apache.samza.config.StreamConfig; import org.apache.samza.system.StreamSpec; -public abstract class AbstractApplicationRunner implements ApplicationRunner { - - protected final Config config; +/** + * Defines common, core behavior for implementations of the {@link ApplicationRunner} API + */ +public abstract class AbstractApplicationRunner extends ApplicationRunner { public AbstractApplicationRunner(Config config) { - if (config == null) { - throw new NullPointerException("Parameter 'config' cannot be null."); - } - - this.config = config; + super(config); } @Override - public StreamSpec streamFromConfig(String streamId) { + public StreamSpec getStream(String streamId) { StreamConfig streamConfig = new StreamConfig(config); String physicalName = streamConfig.getPhysicalName(streamId); - return streamFromConfig(streamId, physicalName); + return getStream(streamId, physicalName); } /** @@ -61,11 +58,11 @@ public abstract class AbstractApplicationRunner implements ApplicationRunner { * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. * @return The {@link StreamSpec} instance. */ - /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) { + /*package private*/ StreamSpec getStream(String streamId, String physicalName) { StreamConfig streamConfig = new StreamConfig(config); String system = streamConfig.getSystem(streamId); - return streamFromConfig(streamId, physicalName, system); + return getStream(streamId, physicalName, system); } /** @@ -79,7 +76,7 @@ public abstract class AbstractApplicationRunner implements ApplicationRunner { * @param system The name of the System on which this stream will be used. * @return The {@link StreamSpec} instance. */ - /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) { + /*package private*/ StreamSpec getStream(String streamId, String physicalName, String system) { StreamConfig streamConfig = new StreamConfig(config); Map<String, String> properties = streamConfig.getStreamProperties(streamId); http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java index 155a47d..8d7db9f 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java @@ -52,13 +52,13 @@ public class TestAbstractApplicationRunner { // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value. @Test - public void testStreamFromConfigWithPhysicalNameInConfig() { + public void testgetStreamWithPhysicalNameInConfig() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); } @@ -66,71 +66,71 @@ public class TestAbstractApplicationRunner { // The streamId should be used as the physicalName when the physical name is not specified. // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity. @Test - public void testStreamFromConfigWithoutPhysicalNameInConfig() { + public void testgetStreamWithoutPhysicalNameInConfig() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); assertEquals(STREAM_ID, spec.getPhysicalName()); } // If the system is specified at the stream scope, use it @Test - public void testStreamFromConfigWithSystemAtStreamScopeInConfig() { + public void testgetStreamWithSystemAtStreamScopeInConfig() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); assertEquals(TEST_SYSTEM, spec.getSystemName()); } // If system isn't specified at stream scope, use the default system @Test - public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() { + public void testgetStreamWithSystemAtDefaultScopeInConfig() { Config config = addConfigs(buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME), JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName()); } // Stream scope should override default scope @Test - public void testStreamFromConfigWithSystemAtBothScopesInConfig() { + public void testgetStreamWithSystemAtBothScopesInConfig() { Config config = addConfigs(buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, StreamConfig.SYSTEM(), TEST_SYSTEM), JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); assertEquals(TEST_SYSTEM, spec.getSystemName()); } // System is required. Throw if it cannot be determined. @Test(expected = Exception.class) - public void testStreamFromConfigWithOutSystemInConfig() { + public void testgetStreamWithOutSystemInConfig() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); assertEquals(TEST_SYSTEM, spec.getSystemName()); } // The properties in the config "streams.{streamId}.*" should be passed through to the spec. @Test - public void testStreamFromConfigPropertiesPassthrough() { + public void testgetStreamPropertiesPassthrough() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, StreamConfig.SYSTEM(), TEST_SYSTEM, @@ -138,8 +138,8 @@ public class TestAbstractApplicationRunner { "systemProperty2", "systemValue2", "systemProperty3", "systemValue3"); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); Map<String, String> properties = spec.getConfig(); assertEquals(3, properties.size()); @@ -153,7 +153,7 @@ public class TestAbstractApplicationRunner { // The samza properties (which are invalid for the underlying system) should be filtered out. @Test - public void testStreamFromConfigSamzaPropertiesOmitted() { + public void testgetStreamSamzaPropertiesOmitted() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, StreamConfig.SYSTEM(), TEST_SYSTEM, @@ -161,8 +161,8 @@ public class TestAbstractApplicationRunner { "systemProperty2", "systemValue2", "systemProperty3", "systemValue3"); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID); Map<String, String> properties = spec.getConfig(); assertEquals(3, properties.size()); @@ -174,13 +174,13 @@ public class TestAbstractApplicationRunner { // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config @Test - public void testStreamFromConfigPhysicalNameArgSimple() { + public void testgetStreamPhysicalNameArgSimple() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME); assertEquals(STREAM_ID, spec.getId()); assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); @@ -189,37 +189,37 @@ public class TestAbstractApplicationRunner { // Special characters are allowed for the physical name @Test - public void testStreamFromConfigPhysicalNameArgSpecialCharacters() { + public void testgetStreamPhysicalNameArgSpecialCharacters() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS); assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName()); } // Null is allowed for the physical name @Test - public void testStreamFromConfigPhysicalNameArgNull() { + public void testgetStreamPhysicalNameArgNull() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, null); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID, null); assertNull(spec.getPhysicalName()); } // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config @Test - public void testStreamFromConfigSystemNameArgValid() { + public void testgetStreamSystemNameArgValid() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg StreamConfig.SYSTEM(), TEST_SYSTEM2); // This too - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM); assertEquals(STREAM_ID, spec.getId()); assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); @@ -228,65 +228,65 @@ public class TestAbstractApplicationRunner { // Special characters are NOT allowed for system name, because it's used as an identifier in the config. @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigSystemNameArgInvalid() { + public void testgetStreamSystemNameArgInvalid() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, StreamConfig.SYSTEM(), TEST_SYSTEM2); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID); } // Empty strings are NOT allowed for system name, because it's used as an identifier in the config. @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigSystemNameArgEmpty() { + public void testgetStreamSystemNameArgEmpty() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, StreamConfig.SYSTEM(), TEST_SYSTEM2); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, ""); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, ""); } // Null is not allowed for system name. @Test(expected = NullPointerException.class) - public void testStreamFromConfigSystemNameArgNull() { + public void testgetStreamSystemNameArgNull() { Config config = buildStreamConfig(STREAM_ID, StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, StreamConfig.SYSTEM(), TEST_SYSTEM2); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, null); } // Special characters are NOT allowed for streamId, because it's used as an identifier in the config. @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigStreamIdInvalid() { + public void testgetStreamStreamIdInvalid() { Config config = buildStreamConfig(STREAM_ID_INVALID, StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - env.streamFromConfig(STREAM_ID_INVALID); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + runner.getStream(STREAM_ID_INVALID); } // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config. @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigStreamIdEmpty() { + public void testgetStreamStreamIdEmpty() { Config config = buildStreamConfig("", StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - env.streamFromConfig(""); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + runner.getStream(""); } // Null is not allowed for streamId. @Test(expected = NullPointerException.class) - public void testStreamFromConfigStreamIdNull() { + public void testgetStreamStreamIdNull() { Config config = buildStreamConfig(null, StreamConfig.SYSTEM(), TEST_SYSTEM); - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - env.streamFromConfig(null); + AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); + runner.getStream(null); }
