Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 dde754246 -> a9b213c13
SAMZA-1093: instantiating StreamOperatorTask in SamzaContainer (WIP) SAMZA-1093: Instantiating StreamOperatorTask in SamzaContainer SAMZA-1093: fix import SAMZA-1093: code ready, more unit tests pending Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/82b839f9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/82b839f9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/82b839f9 Branch: refs/heads/samza-fluent-api-v1 Commit: 82b839f99de832aed9f2f4dd935d9f6bbf6932ec Parents: 7e7747f Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Tue Feb 21 16:18:25 2017 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Feb 24 23:33:26 2017 -0800 ---------------------------------------------------------------------- .../samza/operators/StreamGraphBuilder.java | 2 + .../apache/samza/container/RunLoopFactory.java | 16 +- .../org/apache/samza/task/TaskFactories.java | 95 +++++++ .../org/apache/samza/util/ScalaToJavaUtils.java | 41 +++ .../apache/samza/container/SamzaContainer.scala | 81 +++--- .../apache/samza/task/TestTaskFactories.java | 256 +++++++++++++++++++ .../testUtils/InvalidStreamGraphBuilder.java | 25 ++ .../samza/testUtils/TestAsyncStreamTask.java | 35 +++ .../samza/testUtils/TestStreamGraphBuilder.java | 33 +++ .../apache/samza/testUtils/TestStreamTask.java | 34 +++ .../apache/samza/zk/TestZkLeaderElector.java | 2 +- .../java/org/apache/samza/zk/TestZkUtils.java | 2 +- .../test/integration/StreamTaskTestUtil.scala | 3 +- 13 files changed, 564 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java index b415cf8..23e8625 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java @@ -27,6 +27,8 @@ import org.apache.samza.config.Config; */ @InterfaceStability.Unstable public interface StreamGraphBuilder { + static final String BUILDER_CLASS_CONFIG = "job.graph.builder.class"; + /** * Users are required to implement this abstract method to initialize the processing logic of the application, in terms * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java index 32ab47a..f81266b 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -27,12 +27,12 @@ import org.apache.samza.util.HighResolutionClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; -import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; import java.util.concurrent.ExecutorService; import static org.apache.samza.util.Util.asScalaClock; +import static org.apache.samza.util.ScalaToJavaUtils.defaultValue; /** * Factory class to create runloop for a Samza task, based on the type @@ -109,18 +109,4 @@ public class RunLoopFactory { } } - /** - * Returns a default value object for scala option.getOrDefault() to use - * @param value default value - * @param <T> value type - * @return object containing default value - */ - public static <T> AbstractFunction0<T> defaultValue(final T value) { - return new AbstractFunction0<T>() { - @Override - public T apply() { - return value; - } - }; - } } http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java new file mode 100644 index 0000000..0bfa8c4 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.TaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.util.ScalaToJavaUtils.defaultValue; + +public class TaskFactories { + private static final Logger log = LoggerFactory.getLogger(TaskFactories.class); + + public static Object fromTaskClassConfig(Config config) throws ClassNotFoundException { + + String taskClassName; + + // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory + if (isStreamOperatorTask(config)) { + taskClassName = StreamOperatorTask.class.getName(); + } else { + taskClassName = new TaskConfig(config).getTaskClass().getOrElse(defaultValue(null)); + } + + if (taskClassName == null) { + throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory"); + } + log.info("Got task class name: {}", taskClassName); + + boolean isAsyncTaskClass = AsyncStreamTask.class.isAssignableFrom(Class.forName(taskClassName)); + if (isAsyncTaskClass) { + return new AsyncStreamTaskFactory() { + @Override + public AsyncStreamTask createInstance() { + try { + return (AsyncStreamTask) Class.forName(taskClassName).newInstance(); + } catch (Exception e) { + log.error("Error loading AsyncStreamTask class: {}. error: {}", taskClassName, e); + throw new RuntimeException(e); + } + } + }; + } + + return new StreamTaskFactory() { + @Override + public StreamTask createInstance() { + try { + return taskClassName == StreamOperatorTask.class.getName() ? createStreamOperatorTask(config) : + (StreamTask) Class.forName(taskClassName).newInstance(); + } catch (Exception e) { + log.error("Error loading StreamTask class: {}. error: {}", taskClassName, e); + throw new RuntimeException(e); + } + } + }; + } + + private static StreamTask createStreamOperatorTask(Config config) throws Exception { + StreamGraphBuilder graphBuilder = (StreamGraphBuilder) Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance(); + return new StreamOperatorTask(graphBuilder); + } + + private static boolean isStreamOperatorTask(Config config) { + try { + if (config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != null && config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != "") { + return StreamGraphBuilder.class.isAssignableFrom(Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG))); + } + return false; + } catch (Exception e) { + log.error("Failed to validate StreamGraphBuilder class from the config. {}={}", + StreamGraphBuilder.BUILDER_CLASS_CONFIG, config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java new file mode 100644 index 0000000..6c7fc2d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util; + +import scala.runtime.AbstractFunction0; + +/** + * Common utils methods that helps to convert or use Scala objects in Java code + */ +public class ScalaToJavaUtils { + /** + * Returns a default value object for scala option.getOrDefault() to use + * @param value default value + * @param <T> value type + * @return object containing default value + */ + public static <T> AbstractFunction0<T> defaultValue(final T value) { + return new AbstractFunction0<T>() { + @Override + public T apply() { + return value; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 89522dc..5476fb5 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -25,6 +25,8 @@ import java.util import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} import java.lang.Thread.UncaughtExceptionHandler import java.net.{URL, UnknownHostException} +import javax.servlet.SingleThreadModel + import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job @@ -64,13 +66,7 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.chooser.DefaultChooser import org.apache.samza.system.chooser.MessageChooserFactory import org.apache.samza.system.chooser.RoundRobinChooserFactory -import org.apache.samza.task.AsyncRunLoop -import org.apache.samza.task.AsyncStreamTask -import org.apache.samza.task.AsyncStreamTaskAdapter -import org.apache.samza.task.AsyncStreamTaskFactory -import org.apache.samza.task.StreamTask -import org.apache.samza.task.StreamTaskFactory -import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.task._ import org.apache.samza.util.HighResolutionClock import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging @@ -432,28 +428,6 @@ object SamzaContainer extends Logging { val singleThreadMode = config.getSingleThreadMode info("Got single thread mode: " + singleThreadMode) - val taskClassName = config.getTaskClass.orNull - info("Got task class name: %s" format taskClassName) - - if (taskClassName == null && taskFactory == null) { - throw new SamzaException("Either the task class name or the task factory instance is required.") - } - - val isAsyncTask: Boolean = - if (taskFactory != null) { - taskFactory.isInstanceOf[AsyncStreamTaskFactory] - } else { - classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName)) - } - - if (isAsyncTask) { - info("Got an AsyncStreamTask implementation.") - } - - if(singleThreadMode && isAsyncTask) { - throw new SamzaException("AsyncStreamTask cannot run on single thread mode.") - } - val threadPoolSize = config.getThreadPoolSize info("Got thread pool size: " + threadPoolSize) @@ -462,6 +436,39 @@ object SamzaContainer extends Logging { else null + def maybeAsyncStreamTaskFactory : Object = { + + val taskClassFactory = taskFactory match { + case null => TaskFactories.fromTaskClassConfig(config) + case _ => taskFactory + } + + if (taskClassFactory == null) { + throw new SamzaException("Either the task class name or the task factory instance is required.") + } + + val isAsyncTaskClass: Boolean = taskClassFactory.isInstanceOf[AsyncStreamTaskFactory] + + if (isAsyncTaskClass) { + info("Got an AsyncStreamTask implementation.") + } + + if (singleThreadMode && isAsyncTaskClass) { + throw new SamzaException("AsyncStreamTask cannot run on single thread mode.") + } + + if (!singleThreadMode && !isAsyncTaskClass) { + info("Converting StreamTask to AsyncStreamTaskAdapter when running StreamTask w/ multiple threads") + new AsyncStreamTaskFactory { + override def createInstance() = new AsyncStreamTaskAdapter(taskClassFactory.asInstanceOf[StreamTaskFactory].createInstance(), taskThreadPool) + } + } else { + taskClassFactory + } + } + + val finalTaskFactory = maybeAsyncStreamTaskFactory + // Wire up all task-instance-level (unshared) objects. val taskNames = containerModel .getTasks @@ -483,24 +490,14 @@ object SamzaContainer extends Logging { val taskName = taskModel.getTaskName - val taskObj = if (taskFactory != null) { - debug("Using task factory to create task instance") - taskFactory match { + debug("Using task factory to create task instance") + val task = + finalTaskFactory match { case tf: AsyncStreamTaskFactory => tf.createInstance() case tf: StreamTaskFactory => tf.createInstance() case _ => throw new SamzaException("taskFactory must be an instance of StreamTaskFactory or AsyncStreamTaskFactory") } - } else { - debug("Using task class name: %s to create instance" format taskClassName) - Class.forName(taskClassName).newInstance - } - - val task = if (!singleThreadMode && !isAsyncTask) - // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool - new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], taskThreadPool) - else - taskObj val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName) http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java new file mode 100644 index 0000000..29741a4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.testUtils.TestAsyncStreamTask; +import org.apache.samza.testUtils.TestStreamTask; +import org.junit.Test; + +import java.util.HashMap; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test methods to create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on task class configuration + */ +public class TestTaskFactories { + + @Test + public void testStreamTaskClass() throws ClassNotFoundException { + Config config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); + } + }); + Object retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "no.such.class"); + } + }); + try { + TaskFactories.fromTaskClassConfig(config); + fail("Should have failed w/ no.such.class"); + } catch (ClassNotFoundException cfe) { + // expected + } + } + + @Test + public void testStreamOperatorTaskClass() throws ClassNotFoundException { + Config config = new MapConfig(new HashMap<String, String>() { + { + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder"); + } + }); + Object retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder"); + } + }); + try { + TaskFactories.fromTaskClassConfig(config); + fail("Should have failed w/ no.such.class"); + } catch (ConfigException ce) { + // expected + } + + config = new MapConfig(new HashMap<String, String>() { + { + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "no.such.class"); + } + }); + try { + TaskFactories.fromTaskClassConfig(config); + fail("Should have failed w/ no.such.class"); + } catch (ConfigException ce) { + // expected + } + + config = new MapConfig(new HashMap<String, String>() { + { + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, ""); + } + }); + try { + TaskFactories.fromTaskClassConfig(config); + fail("Should have failed w/ empty class name for StreamGraphBuilder"); + } catch (ConfigException ce) { + // expected + } + + config = new MapConfig(new HashMap<String, String>()); + try { + TaskFactories.fromTaskClassConfig(config); + fail(String.format("Should have failed w/ non-existing entry for %s", StreamGraphBuilder.BUILDER_CLASS_CONFIG)); + } catch (ConfigException ce) { + // expected + } + } + + @Test + public void testStreamOperatorTaskClassWithTaskClass() throws ClassNotFoundException { + Config config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder"); + } + }); + Object retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder"); + } + }); + retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "no.such.class"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder"); + } + }); + retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask); + } + + @Test + public void testStreamTaskClassWithInvalidStreamGraphBuilder() throws ClassNotFoundException { + + Config config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder"); + } + }); + Object retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, ""); + } + }); + retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null); + } + }); + retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof StreamTaskFactory); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", ""); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder"); + } + }); + try { + TaskFactories.fromTaskClassConfig(config); + fail("Should have failed w/ no class not found"); + } catch (ClassNotFoundException cne) { + // expected + } + } + + @Test + public void testAsyncStreamTask() throws ClassNotFoundException { + Config config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); + } + }); + Object retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof AsyncStreamTaskFactory); + assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "no.such.class"); + } + }); + try { + TaskFactories.fromTaskClassConfig(config); + fail("Should have failed w/ no.such.class"); + } catch (ClassNotFoundException cfe) { + // expected + } + } + + @Test + public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws ClassNotFoundException { + + Config config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder"); + } + }); + Object retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof AsyncStreamTaskFactory); + assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, ""); + } + }); + retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof AsyncStreamTaskFactory); + assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); + + config = new MapConfig(new HashMap<String, String>() { + { + this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); + this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null); + } + }); + retFactory = TaskFactories.fromTaskClassConfig(config); + assertTrue(retFactory instanceof AsyncStreamTaskFactory); + assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java new file mode 100644 index 0000000..2fe6946 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +/** + * Test class. Invalid class to implement {@link org.apache.samza.operators.StreamGraphBuilder} + */ +public class InvalidStreamGraphBuilder { +} http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java new file mode 100644 index 0000000..81f3fd4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.task.AsyncStreamTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCallback; +import org.apache.samza.task.TaskCoordinator; + +/** + * Test implementation class for {@link AsyncStreamTask} + */ +public class TestAsyncStreamTask implements AsyncStreamTask { + @Override + public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java new file mode 100644 index 0000000..10eb02f --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphBuilder; + +/** + * Test implementation class for {@link StreamGraphBuilder} + */ +public class TestStreamGraphBuilder implements StreamGraphBuilder { + @Override + public void init(StreamGraph graph, Config config) { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java new file mode 100644 index 0000000..ce0980a --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; + +/** + * Test implementation class for {@link StreamTask} + */ +public class TestStreamTask implements StreamTask { + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index b999ec5..9b5033b 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -57,7 +57,7 @@ public class TestZkLeaderElector { @Before public void testSetup() { - testZkConnectionString = "localhost:" + zkServer.getPort(); + testZkConnectionString = "127.0.0.1:" + zkServer.getPort(); try { testZkUtils = getZkUtilsWithNewClient(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 855d29d..1cf901e 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -47,7 +47,7 @@ public class TestZkUtils { public void testSetup() { try { zkClient = new ZkClient( - new ZkConnection("localhost:" + zkServer.getPort(), SESSION_TIMEOUT_MS), + new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS); } catch (Exception e) { Assert.fail("Client connection setup failed. Aborting tests.."); http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 5d82b92..b803dfe 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -89,8 +89,6 @@ object StreamTaskTestUtil { "systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic "systems.kafka.samza.msg.serde" -> "string", - "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181", - "systems.kafka.producer.bootstrap.servers" -> "localhost:9092", // Since using state, need a checkpoint manager "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory", "task.checkpoint.system" -> "kafka", @@ -122,6 +120,7 @@ object StreamTaskTestUtil { val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT) brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") + // setup the zookeeper and bootstrap servers for local kafka cluster jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect, "systems.kafka.producer.bootstrap.servers" -> brokers)
