Repository: samza Updated Branches: refs/heads/master e6c1eed4f -> 1956dac94
SAMZA-1122; Rename ExecutionEnvironment to ApplicationRunner Some refactoring/cleanup: - rename ExecutionEnvironment to ApplicationRunner, including all the subclasses. - rename the package to be org.apache.samza.runtime - rename the StandalondApplicationRunner to be LocalApplicationRunner Author: Xinyu Liu <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #76 from xinyuiscool/SAMZA-1122 and squashes the following commits: cff5206 [Xinyu Liu] Merge branch 'SAMZA-1122' of https://github.com/xinyuiscool/samza into SAMZA-1122 c341d3d [Xinyu Liu] SAMZA-1122: Rename ExecutionEnvironment to ApplicationRunner 6a71205 [Xinyu Liu] SAMZA-1122: Rename ExecutionEnvironment to ApplicationRunner Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1956dac9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1956dac9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1956dac9 Branch: refs/heads/master Commit: 1956dac94b2c41e3e99a0e47d1f5704882b10bbd Parents: e6c1eed Author: Xinyu Liu <[email protected]> Authored: Tue Mar 7 15:02:30 2017 -0800 Committer: Xinyu Liu <[email protected]> Committed: Tue Mar 7 15:02:30 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/runtime/ApplicationRunner.java | 103 ++++++ .../samza/system/ExecutionEnvironment.java | 101 ------ .../runtime/AbstractApplicationRunner.java | 89 +++++ .../samza/runtime/LocalApplicationRunner.java | 54 +++ .../samza/runtime/RemoteApplicationRunner.java | 42 +++ .../system/AbstractExecutionEnvironment.java | 88 ----- .../system/RemoteExecutionEnvironment.java | 41 --- .../system/StandaloneExecutionEnvironment.java | 54 --- .../samza/example/KeyValueStoreExample.java | 17 +- .../samza/example/NoContextStreamExample.java | 15 +- .../samza/example/OrderShipmentJoinExample.java | 15 +- .../samza/example/PageViewCounterExample.java | 6 +- .../samza/example/RepartitionExample.java | 19 +- .../TestAbstractExecutionEnvironment.java | 333 +++++++++++++++++++ .../TestAbstractExecutionEnvironment.java | 331 ------------------ 15 files changed, 663 insertions(+), 645 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java new file mode 100644 index 0000000..ff31eff --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import java.lang.reflect.Constructor; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.ConfigException; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.system.StreamSpec; + + +/** + * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph} + * + * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order + * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor. + */ [email protected] +public interface ApplicationRunner { + + String RUNNER_CONFIG = "job.runner.class"; + String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner"; + + /** + * Static method to create the local {@link ApplicationRunner}. + * + * @param config configuration passed in to initialize the Samza local process + * @return the local {@link ApplicationRunner} to run the user-defined stream applications + */ + static ApplicationRunner getLocalRunner(Config config) { + return null; + } + + /** + * Static method to load the {@link ApplicationRunner} + * + * Requires the implementation class to define a constructor with a single {@link Config} as the argument. + * + * @param config configuration passed in to initialize the Samza processes + * @return the configure-driven {@link ApplicationRunner} to run the user-defined stream applications + */ + static ApplicationRunner fromConfig(Config config) { + try { + Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS)); + if (ApplicationRunner.class.isAssignableFrom(runnerClass)) { + Constructor<?> constructor = runnerClass.getConstructor(Config.class); // *sigh* + return (ApplicationRunner) constructor.newInstance(config); + } + } catch (Exception e) { + throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s", config.get( + RUNNER_CONFIG)), e); + } + throw new ConfigException(String.format( + "Class %s does not implement interface ApplicationRunner properly", + config.get(RUNNER_CONFIG))); + } + + /** + * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph} + * + * @param graphBuilder the user-defined {@link StreamGraphBuilder} object + * @param config the {@link Config} object for this job + */ + void run(StreamGraphBuilder graphBuilder, Config config); + + /** + * Constructs a {@link StreamSpec} from the configuration for the specified streamId. + * + * The stream configurations are read from the following properties in the config: + * {@code streams.{$streamId}.*} + * <br> + * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two + * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system. + * + * <ul> + * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined + * the stream will be associated with the System defined in {@code job.default.system}</li> + * <li>samza.physical.name - The system-specific name for this stream. It could be a file URN, topic name, or other identifer. + * If this property isn't defined the physical.name will be set to the streamId</li> + * </ul> + * + * @param streamId The logical identifier for the stream in Samza. + * @return The {@link StreamSpec} instance. + */ + StreamSpec streamFromConfig(String streamId); +} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java deleted file mode 100644 index 8444c91..0000000 --- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.system; - -import java.lang.reflect.Constructor; -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.ConfigException; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; - - -/** - * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph} - * - * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order - * to support the {@link ExecutionEnvironment#fromConfig(Config)} static constructor. - */ [email protected] -public interface ExecutionEnvironment { - - String ENVIRONMENT_CONFIG = "job.execution.environment.class"; - String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment"; - - /** - * Static method to load the local standalone environment - * - * @param config configuration passed in to initialize the Samza standalone process - * @return the standalone {@link ExecutionEnvironment} to run the user-defined stream applications - */ - static ExecutionEnvironment getLocalEnvironment(Config config) { - return null; - } - - /** - * Static method to load the non-standalone environment. - * - * Requires the implementation class to define a constructor with a single {@link Config} as the argument. - * - * @param config configuration passed in to initialize the Samza processes - * @return the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications - */ - static ExecutionEnvironment fromConfig(Config config) { - try { - Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)); - if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) { - Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh* - return (ExecutionEnvironment) constructor.newInstance(config); - } - } catch (Exception e) { - throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e); - } - throw new ConfigException(String.format( - "Class %s does not implement interface ExecutionEnvironment properly", - config.get(ENVIRONMENT_CONFIG))); - } - - /** - * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph} - * - * @param graphBuilder the user-defined {@link StreamGraphBuilder} object - * @param config the {@link Config} object for this job - */ - void run(StreamGraphBuilder graphBuilder, Config config); - - /** - * Constructs a {@link StreamSpec} from the configuration for the specified streamId. - * - * The stream configurations are read from the following properties in the config: - * {@code streams.{$streamId}.*} - * <br> - * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two - * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system. - * - * <ul> - * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined - * the stream will be associated with the System defined in {@code job.default.system}</li> - * <li>samza.physical.name - The system-specific name for this stream. It could be a file URN, topic name, or other identifer. - * If this property isn't defined the physical.name will be set to the streamId</li> - * </ul> - * - * @param streamId The logical identifier for the stream in Samza. - * @return The {@link StreamSpec} instance. - */ - StreamSpec streamFromConfig(String streamId); -} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java new file mode 100644 index 0000000..8e21ec2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.system.StreamSpec; + + +public abstract class AbstractApplicationRunner implements ApplicationRunner { + + private final Config config; + + public AbstractApplicationRunner(Config config) { + if (config == null) { + throw new NullPointerException("Parameter 'config' cannot be null."); + } + + this.config = config; + } + + @Override + public StreamSpec streamFromConfig(String streamId) { + StreamConfig streamConfig = new StreamConfig(config); + String physicalName = streamConfig.getPhysicalName(streamId, streamId); + + return streamFromConfig(streamId, physicalName); + } + + /** + * Constructs a {@link StreamSpec} from the configuration for the specified streamId. + * + * The stream configurations are read from the following properties in the config: + * {@code streams.{$streamId}.*} + * <br> + * All properties matching this pattern are assumed to be system-specific with one exception. The following + * property is a Samza property which is used to bind the stream to a system. + * + * <ul> + * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined + * the stream will be associated with the System defined in {@code job.default.system}</li> + * </ul> + * + * @param streamId The logical identifier for the stream in Samza. + * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. + * @return The {@link StreamSpec} instance. + */ + /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) { + StreamConfig streamConfig = new StreamConfig(config); + String system = streamConfig.getSystem(streamId); + + return streamFromConfig(streamId, physicalName, system); + } + + /** + * Constructs a {@link StreamSpec} from the configuration for the specified streamId. + * + * The stream configurations are read from the following properties in the config: + * {@code streams.{$streamId}.*} + * + * @param streamId The logical identifier for the stream in Samza. + * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. + * @param system The name of the System on which this stream will be used. + * @return The {@link StreamSpec} instance. + */ + /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) { + StreamConfig streamConfig = new StreamConfig(config); + Map<String, String> properties = streamConfig.getStreamProperties(streamId); + + return new StreamSpec(streamId, physicalName, system, properties); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java new file mode 100644 index 0000000..c936553 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.runtime; + +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraphImpl; + + +/** + * This class implements the {@link ApplicationRunner} that runs the applications in standalone environment + */ +public class LocalApplicationRunner extends AbstractApplicationRunner { + + public LocalApplicationRunner(Config config) { + super(config); + } + + // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment} + StreamGraph createGraph(StreamGraphBuilder app, Config config) { + StreamGraphImpl graph = new StreamGraphImpl(); + app.init(graph, config); + return graph; + } + + @Override public void run(StreamGraphBuilder app, Config config) { + // 1. get logic graph for optimization + // StreamGraph logicGraph = this.createGraph(app, config); + // 2. potential optimization.... + // 3. create new instance of StreamGraphBuilder that would generate the optimized graph + // 4. create all input/output/intermediate topics + // 5. create the configuration for StreamProcessor + // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java new file mode 100644 index 0000000..6e33fe8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; + + +/** + * This class implements the {@link ApplicationRunner} that runs the applications in a remote cluster + */ +public class RemoteApplicationRunner extends AbstractApplicationRunner { + + public RemoteApplicationRunner(Config config) { + super(config); + } + + @Override public void run(StreamGraphBuilder app, Config config) { + // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph} + // TODO: actually instantiate the tasks and run the job, i.e. + // 1. create all input/output/intermediate topics + // 2. create the single job configuration + // 3. execute JobRunner to submit the single job for the whole graph + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java deleted file mode 100644 index 64d60b7..0000000 --- a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.system; - -import java.util.Map; -import org.apache.samza.config.Config; -import org.apache.samza.config.StreamConfig; - - -public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment { - - private final Config config; - - public AbstractExecutionEnvironment(Config config) { - if (config == null) { - throw new NullPointerException("Parameter 'config' cannot be null."); - } - - this.config = config; - } - - @Override - public StreamSpec streamFromConfig(String streamId) { - StreamConfig streamConfig = new StreamConfig(config); - String physicalName = streamConfig.getPhysicalName(streamId, streamId); - - return streamFromConfig(streamId, physicalName); - } - - /** - * Constructs a {@link StreamSpec} from the configuration for the specified streamId. - * - * The stream configurations are read from the following properties in the config: - * {@code streams.{$streamId}.*} - * <br> - * All properties matching this pattern are assumed to be system-specific with one exception. The following - * property is a Samza property which is used to bind the stream to a system. - * - * <ul> - * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined - * the stream will be associated with the System defined in {@code job.default.system}</li> - * </ul> - * - * @param streamId The logical identifier for the stream in Samza. - * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. - * @return The {@link StreamSpec} instance. - */ - /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) { - StreamConfig streamConfig = new StreamConfig(config); - String system = streamConfig.getSystem(streamId); - - return streamFromConfig(streamId, physicalName, system); - } - - /** - * Constructs a {@link StreamSpec} from the configuration for the specified streamId. - * - * The stream configurations are read from the following properties in the config: - * {@code streams.{$streamId}.*} - * - * @param streamId The logical identifier for the stream in Samza. - * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. - * @param system The name of the System on which this stream will be used. - * @return The {@link StreamSpec} instance. - */ - /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) { - StreamConfig streamConfig = new StreamConfig(config); - Map<String, String> properties = streamConfig.getStreamProperties(streamId); - - return new StreamSpec(streamId, physicalName, system, properties); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java deleted file mode 100644 index e592e66..0000000 --- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.system; - -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; - -/** - * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment - */ -public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment { - - public RemoteExecutionEnvironment(Config config) { - super(config); - } - - @Override public void run(StreamGraphBuilder app, Config config) { - // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph} - // TODO: actually instantiate the tasks and run the job, i.e. - // 1. create all input/output/intermediate topics - // 2. create the single job configuration - // 3. execute JobRunner to submit the single job for the whole graph - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java deleted file mode 100644 index 71d60ef..0000000 --- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system; - -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraphImpl; - - -/** - * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment - */ -public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment { - - public StandaloneExecutionEnvironment(Config config) { - super(config); - } - - // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment} - StreamGraph createGraph(StreamGraphBuilder app, Config config) { - StreamGraphImpl graph = new StreamGraphImpl(); - app.init(graph, config); - return graph; - } - - @Override public void run(StreamGraphBuilder app, Config config) { - // 1. get logic graph for optimization - // StreamGraph logicGraph = this.createGraph(app, config); - // 2. potential optimization.... - // 3. create new instance of StreamGraphBuilder that would generate the optimized graph - // 4. create all input/output/intermediate topics - // 5. create the configuration for StreamProcessor - // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java index 4a0681e..e54d01a 100644 --- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java @@ -31,7 +31,7 @@ import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.task.TaskContext; import org.apache.samza.util.CommandLine; @@ -43,15 +43,14 @@ import org.apache.samza.util.CommandLine; public class KeyValueStoreExample implements StreamGraphBuilder { /** - * used by remote execution environment to launch the job in remote program. The remote program should follow the similar - * invoking context as in standalone: + * used by remote application runner to launch the job in remote program. The remote program should follow the similar + * invoking context as in local runner: * * public static void main(String args[]) throws Exception { * CommandLine cmdLine = new CommandLine(); * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); - * UserMainExample runnableApp = new UserMainExample(); - * runnableApp.run(remoteEnv, config); + * ApplicationRunner runner = ApplicationRunner.fromConfig(config); + * runner.run(new UserMainExample(), config) * } * */ @@ -67,12 +66,12 @@ public class KeyValueStoreExample implements StreamGraphBuilder { } - // standalone local program model + // local program model public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new KeyValueStoreExample(), config); + ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); + localRunner.run(new KeyValueStoreExample(), config); } class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> { http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java index 320680c..ef58c8b 100644 --- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java @@ -27,7 +27,7 @@ import org.apache.samza.operators.data.Offset; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.CommandLine; @@ -89,15 +89,16 @@ public class NoContextStreamExample implements StreamGraphBuilder { } } + /** - * used by remote execution environment to launch the job in remote program. The remote program should follow the similar - * invoking context as in standalone: + * used by remote application runner to launch the job in remote program. The remote program should follow the similar + * invoking context as in local: * * public static void main(String args[]) throws Exception { * CommandLine cmdLine = new CommandLine(); * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config); - * remoteEnv.run(new NoContextStreamExample(), config); + * ApplicationRunner runner = ApplicationRunner.fromConfig(config); + * runner.run(new NoContextStreamExample(), config); * } * */ @@ -119,8 +120,8 @@ public class NoContextStreamExample implements StreamGraphBuilder { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new NoContextStreamExample(), config); + ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); + localRunner.run(new NoContextStreamExample(), config); } } http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java index 30ce7d2..0e60c2f 100644 --- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -27,7 +27,7 @@ import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; @@ -38,15 +38,14 @@ import org.apache.samza.util.CommandLine; public class OrderShipmentJoinExample implements StreamGraphBuilder { /** - * used by remote execution environment to launch the job in remote program. The remote program should follow the similar - * invoking context as in standalone: + * used by remote application runner to launch the job in remote program. The remote program should follow the similar + * invoking context as in local runner: * * public static void main(String args[]) throws Exception { * CommandLine cmdLine = new CommandLine(); * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); - * UserMainExample runnableApp = new UserMainExample(); - * runnableApp.run(remoteEnv, config); + * ApplicationRunner runner = ApplicationRunner.fromConfig(config); + * runner.run(new UserMainExample(), config); * } * */ @@ -64,8 +63,8 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new OrderShipmentJoinExample(), config); + ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); + localRunner.run(new OrderShipmentJoinExample(), config); } StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka"); http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java index fcf67a7..886bf1c 100644 --- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -28,7 +28,7 @@ import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; @@ -57,8 +57,8 @@ public class PageViewCounterExample implements StreamGraphBuilder { public static void main(String[] args) { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new PageViewCounterExample(), config); + ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); + localRunner.run(new PageViewCounterExample(), config); } StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka"); http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java index 228668c..b8abacf 100644 --- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -26,7 +26,7 @@ import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; @@ -39,6 +39,19 @@ import java.time.Duration; public class RepartitionExample implements StreamGraphBuilder { /** + * used by remote application runner to launch the job in remote program. The remote program should follow the similar + * invoking context as in local runner: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ApplicationRunner runner = ApplicationRunner.fromConfig(config); + * runner.run(new UserMainExample(), config); + * } + * + */ + + /** * used by remote execution environment to launch the job in remote program. The remote program should follow the similar * invoking context as in standalone: * @@ -68,8 +81,8 @@ public class RepartitionExample implements StreamGraphBuilder { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new RepartitionExample(), config); + ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); + localRunner.run(new RepartitionExample(), config); } StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka"); http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java new file mode 100644 index 0000000..4942ed6 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.system.StreamSpec; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +public class TestAbstractExecutionEnvironment { + private static final String STREAM_ID = "t3st-Stream_Id"; + private static final String STREAM_ID_INVALID = "test#Str3amId!"; + + private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name"; + private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2"; + private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?"; + + private static final String TEST_SYSTEM = "t3st-System_Name"; + private static final String TEST_SYSTEM2 = "testSystemName2"; + private static final String TEST_SYSTEM_INVALID = "test:System!Name@"; + + private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName"; + + + @Test(expected = NullPointerException.class) + public void testConfigValidation() { + new TestAbstractApplicationRunnerImpl(null); + } + + // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value. + @Test + public void testStreamFromConfigWithPhysicalNameInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + } + + // The streamId should be used as the physicalName when the physical name is not specified. + // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity. + @Test + public void testStreamFromConfigWithoutPhysicalNameInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(STREAM_ID, spec.getPhysicalName()); + } + + // If the system is specified at the stream scope, use it + @Test + public void testStreamFromConfigWithSystemAtStreamScopeInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // If system isn't specified at stream scope, use the default system + @Test + public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME), + JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName()); + } + + // Stream scope should override default scope + @Test + public void testStreamFromConfigWithSystemAtBothScopesInConfig() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM), + JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // System is required. Throw if it cannot be determined. + @Test(expected = Exception.class) + public void testStreamFromConfigWithOutSystemInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // The properties in the config "streams.{streamId}.*" should be passed through to the spec. + @Test + public void testStreamFromConfigPropertiesPassthrough() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "systemProperty1", "systemValue1", + "systemProperty2", "systemValue2", + "systemProperty3", "systemValue3"); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + Map<String, String> properties = spec.getConfig(); + assertEquals(3, properties.size()); + assertEquals("systemValue1", properties.get("systemProperty1")); + assertEquals("systemValue2", properties.get("systemProperty2")); + assertEquals("systemValue3", properties.get("systemProperty3")); + assertEquals("systemValue1", spec.get("systemProperty1")); + assertEquals("systemValue2", spec.get("systemProperty2")); + assertEquals("systemValue3", spec.get("systemProperty3")); + } + + // The samza properties (which are invalid for the underlying system) should be filtered out. + @Test + public void testStreamFromConfigSamzaPropertiesOmitted() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "systemProperty1", "systemValue1", + "systemProperty2", "systemValue2", + "systemProperty3", "systemValue3"); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + Map<String, String> properties = spec.getConfig(); + assertEquals(3, properties.size()); + assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); + assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); + assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); + assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); + } + + // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config + @Test + public void testStreamFromConfigPhysicalNameArgSimple() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME); + + assertEquals(STREAM_ID, spec.getId()); + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // Special characters are allowed for the physical name + @Test + public void testStreamFromConfigPhysicalNameArgSpecialCharacters() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS); + assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName()); + } + + // Null is allowed for the physical name + @Test + public void testStreamFromConfigPhysicalNameArgNull() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, null); + assertNull(spec.getPhysicalName()); + } + + // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config + @Test + public void testStreamFromConfigSystemNameArgValid() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg + StreamConfig.SYSTEM(), TEST_SYSTEM2); // This too + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM); + + assertEquals(STREAM_ID, spec.getId()); + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // Special characters are NOT allowed for system name, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testStreamFromConfigSystemNameArgInvalid() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM2); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID); + } + + // Empty strings are NOT allowed for system name, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testStreamFromConfigSystemNameArgEmpty() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM2); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, ""); + } + + // Null is not allowed for system name. + @Test(expected = NullPointerException.class) + public void testStreamFromConfigSystemNameArgNull() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM2); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null); + } + + // Special characters are NOT allowed for streamId, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testStreamFromConfigStreamIdInvalid() { + Config config = buildStreamConfig(STREAM_ID_INVALID, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + env.streamFromConfig(STREAM_ID_INVALID); + } + + // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testStreamFromConfigStreamIdEmpty() { + Config config = buildStreamConfig("", + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + env.streamFromConfig(""); + } + + // Null is not allowed for streamId. + @Test(expected = NullPointerException.class) + public void testStreamFromConfigStreamIdNull() { + Config config = buildStreamConfig(null, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + env.streamFromConfig(null); + } + + + // Helper methods + + private Config buildStreamConfig(String streamId, String... kvs) { + // inject streams.x. into each key + for (int i = 0; i < kvs.length - 1; i += 2) { + kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i]; + } + return buildConfig(kvs); + } + + private Config buildConfig(String... kvs) { + if (kvs.length % 2 != 0) { + throw new IllegalArgumentException("There must be parity between the keys and values"); + } + + Map<String, String> configMap = new HashMap<>(); + for (int i = 0; i < kvs.length - 1; i += 2) { + configMap.put(kvs[i], kvs[i + 1]); + } + return new MapConfig(configMap); + } + + private Config addConfigs(Config original, String... kvs) { + Map<String, String> result = new HashMap<>(); + result.putAll(original); + result.putAll(buildConfig(kvs)); + return new MapConfig(result); + } + + private class TestAbstractApplicationRunnerImpl extends AbstractApplicationRunner { + + public TestAbstractApplicationRunnerImpl(Config config) { + super(config); + } + + @Override + public void run(StreamGraphBuilder graphBuilder, Config config) { + // do nothing + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java deleted file mode 100644 index 861f049..0000000 --- a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.system; - -import java.util.HashMap; -import java.util.Map; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.StreamConfig; -import org.apache.samza.operators.StreamGraphBuilder; -import org.junit.Test; - -import static org.junit.Assert.*; -public class TestAbstractExecutionEnvironment { - private static final String STREAM_ID = "t3st-Stream_Id"; - private static final String STREAM_ID_INVALID = "test#Str3amId!"; - - private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name"; - private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2"; - private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?"; - - private static final String TEST_SYSTEM = "t3st-System_Name"; - private static final String TEST_SYSTEM2 = "testSystemName2"; - private static final String TEST_SYSTEM_INVALID = "test:System!Name@"; - - private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName"; - - - @Test(expected = NullPointerException.class) - public void testConfigValidation() { - new TestAbstractExecutionEnvironmentImpl(null); - } - - // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value. - @Test - public void testStreamFromConfigWithPhysicalNameInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); - } - - // The streamId should be used as the physicalName when the physical name is not specified. - // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity. - @Test - public void testStreamFromConfigWithoutPhysicalNameInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - assertEquals(STREAM_ID, spec.getPhysicalName()); - } - - // If the system is specified at the stream scope, use it - @Test - public void testStreamFromConfigWithSystemAtStreamScopeInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // If system isn't specified at stream scope, use the default system - @Test - public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() { - Config config = addConfigs(buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME), - JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName()); - } - - // Stream scope should override default scope - @Test - public void testStreamFromConfigWithSystemAtBothScopesInConfig() { - Config config = addConfigs(buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM), - JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // System is required. Throw if it cannot be determined. - @Test(expected = Exception.class) - public void testStreamFromConfigWithOutSystemInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // The properties in the config "streams.{streamId}.*" should be passed through to the spec. - @Test - public void testStreamFromConfigPropertiesPassthrough() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM, - "systemProperty1", "systemValue1", - "systemProperty2", "systemValue2", - "systemProperty3", "systemValue3"); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - Map<String, String> properties = spec.getConfig(); - assertEquals(3, properties.size()); - assertEquals("systemValue1", properties.get("systemProperty1")); - assertEquals("systemValue2", properties.get("systemProperty2")); - assertEquals("systemValue3", properties.get("systemProperty3")); - assertEquals("systemValue1", spec.get("systemProperty1")); - assertEquals("systemValue2", spec.get("systemProperty2")); - assertEquals("systemValue3", spec.get("systemProperty3")); - } - - // The samza properties (which are invalid for the underlying system) should be filtered out. - @Test - public void testStreamFromConfigSamzaPropertiesOmitted() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM, - "systemProperty1", "systemValue1", - "systemProperty2", "systemValue2", - "systemProperty3", "systemValue3"); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID); - - Map<String, String> properties = spec.getConfig(); - assertEquals(3, properties.size()); - assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); - assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); - assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); - assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); - } - - // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config - @Test - public void testStreamFromConfigPhysicalNameArgSimple() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME); - - assertEquals(STREAM_ID, spec.getId()); - assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // Special characters are allowed for the physical name - @Test - public void testStreamFromConfigPhysicalNameArgSpecialCharacters() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS); - assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName()); - } - - // Null is allowed for the physical name - @Test - public void testStreamFromConfigPhysicalNameArgNull() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, null); - assertNull(spec.getPhysicalName()); - } - - // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config - @Test - public void testStreamFromConfigSystemNameArgValid() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg - StreamConfig.SYSTEM(), TEST_SYSTEM2); // This too - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM); - - assertEquals(STREAM_ID, spec.getId()); - assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // Special characters are NOT allowed for system name, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigSystemNameArgInvalid() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, - StreamConfig.SYSTEM(), TEST_SYSTEM2); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID); - } - - // Empty strings are NOT allowed for system name, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigSystemNameArgEmpty() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, - StreamConfig.SYSTEM(), TEST_SYSTEM2); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, ""); - } - - // Null is not allowed for system name. - @Test(expected = NullPointerException.class) - public void testStreamFromConfigSystemNameArgNull() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, - StreamConfig.SYSTEM(), TEST_SYSTEM2); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null); - } - - // Special characters are NOT allowed for streamId, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigStreamIdInvalid() { - Config config = buildStreamConfig(STREAM_ID_INVALID, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - env.streamFromConfig(STREAM_ID_INVALID); - } - - // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testStreamFromConfigStreamIdEmpty() { - Config config = buildStreamConfig("", - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - env.streamFromConfig(""); - } - - // Null is not allowed for streamId. - @Test(expected = NullPointerException.class) - public void testStreamFromConfigStreamIdNull() { - Config config = buildStreamConfig(null, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); - env.streamFromConfig(null); - } - - - // Helper methods - - private Config buildStreamConfig(String streamId, String... kvs) { - // inject streams.x. into each key - for (int i = 0; i < kvs.length - 1; i += 2) { - kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i]; - } - return buildConfig(kvs); - } - - private Config buildConfig(String... kvs) { - if (kvs.length % 2 != 0) { - throw new IllegalArgumentException("There must be parity between the keys and values"); - } - - Map<String, String> configMap = new HashMap<>(); - for (int i = 0; i < kvs.length - 1; i += 2) { - configMap.put(kvs[i], kvs[i + 1]); - } - return new MapConfig(configMap); - } - - private Config addConfigs(Config original, String... kvs) { - Map<String, String> result = new HashMap<>(); - result.putAll(original); - result.putAll(buildConfig(kvs)); - return new MapConfig(result); - } - - private class TestAbstractExecutionEnvironmentImpl extends AbstractExecutionEnvironment { - - public TestAbstractExecutionEnvironmentImpl(Config config) { - super(config); - } - - @Override - public void run(StreamGraphBuilder graphBuilder, Config config) { - // do nothing - } - } -}
