Repository: samza Updated Branches: refs/heads/samza-standalone a47e8819f -> 4918e3ad7
Pulling out from samza-li StandAloneApi Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7d6332b6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7d6332b6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7d6332b6 Branch: refs/heads/samza-standalone Commit: 7d6332b69f02b9bdb5fda78983ef64eded79f66b Parents: a47e881 Author: navina <[email protected]> Authored: Fri Dec 23 16:58:14 2016 -0800 Committer: navina <[email protected]> Committed: Fri Dec 23 16:58:14 2016 -0800 ---------------------------------------------------------------------- build.gradle | 1 + gradle/dependency-versions.gradle | 2 +- .../org/apache/samza/config/JavaJobConfig.java | 27 +++ .../samza/config/JobCoordinatorConfig.java | 24 ++ .../org/apache/samza/config/TaskConfigJava.java | 14 ++ .../java/org/apache/samza/config/ZkConfig.java | 30 +++ .../samza/coordinator/JobCoordinator.java | 58 +++++ .../coordinator/JobCoordinatorFactory.java | 30 +++ .../leaderelection/LeaderElector.java | 7 + .../processor/SamzaContainerController.java | 127 ++++++++++ .../apache/samza/processor/StreamProcessor.java | 148 ++++++++++++ .../standalone/StandaloneJobCoordinator.java | 121 ++++++++++ .../StandaloneJobCoordinatorFactory.java | 31 +++ .../samza/zk/BarrierForVersionUpgrade.java | 9 + .../samza/zk/ScheduleAfterDebounceTime.java | 56 +++++ .../samza/zk/ZkBarrierForVersionUpgrade.java | 147 ++++++++++++ .../java/org/apache/samza/zk/ZkController.java | 11 + .../org/apache/samza/zk/ZkControllerImpl.java | 136 +++++++++++ .../org/apache/samza/zk/ZkJobCoordinator.java | 198 ++++++++++++++++ .../samza/zk/ZkJobCoordinatorFactory.java | 37 +++ .../java/org/apache/samza/zk/ZkKeyBuilder.java | 45 ++++ .../org/apache/samza/zk/ZkLeaderElector.java | 110 +++++++++ .../java/org/apache/samza/zk/ZkListener.java | 11 + .../main/java/org/apache/samza/zk/ZkUtils.java | 237 +++++++++++++++++++ .../apache/samza/container/SamzaContainer.scala | 41 +++- .../samza/job/local/ThreadJobFactory.scala | 11 +- 26 files changed, 1659 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 5b41c52..0d60970 100644 --- a/build.gradle +++ b/build.gradle @@ -159,6 +159,7 @@ project(":samza-core_$scalaVersion") { compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" + compile "com.101tec:zkclient:$zkClientVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 976a49c..872ae1b 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -26,7 +26,7 @@ mockitoVersion = "1.8.4" scalaTestVersion = "2.2.4" zkClientVersion = "0.8" - zookeeperVersion = "3.3.4" + zookeeperVersion = "3.4.6" metricsVersion = "2.2.0" kafkaVersion = "0.10.0.1" commonsHttpClientVersion = "3.1" http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java new file mode 100644 index 0000000..c0747f0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java @@ -0,0 +1,27 @@ +package org.apache.samza.config; + +public class JavaJobConfig extends MapConfig { + private static final String JOB_NAME = "job.name"; // streaming.job_name + private static final String JOB_ID = "job.id"; // streaming.job_id + private static final String DEFAULT_JOB_ID = "1"; + + public JavaJobConfig (Config config) { + super(config); + } + + public String getJobName() { + if (!containsKey(JOB_NAME)) { + throw new ConfigException("Missing " + JOB_NAME + " config!"); + } + return get(JOB_NAME); + } + + public String getJobName(String defaultValue) { + return get(JOB_NAME, defaultValue); + } + + public String getJobId() { + return get(JOB_ID, DEFAULT_JOB_ID); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java new file mode 100644 index 0000000..c8e496e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -0,0 +1,24 @@ +package org.apache.samza.config; + +import com.google.common.base.Strings; +import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.util.Util; + +public class JobCoordinatorConfig extends MapConfig { + // TODO: Change this to job-coordinator.factory + private static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; + + public JobCoordinatorConfig (Config config) { + super(config); + } + + public String getJobCoordinatorFactoryClassName() { + String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY); + if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) { + throw new ConfigException( + String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY)); + } + + return jobCoordinatorFactoryClassName; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java index 648fe58..a88e1ec 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -35,6 +35,10 @@ import scala.collection.JavaConversions; public class TaskConfigJava extends MapConfig { + // Task Configs + private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms"; + public static final long DEFAULT_TASK_SHUTDOWN_MS = 5000L; + // broadcast streams consumed by all tasks. e.g. kafka.foo#1 public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs"; private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$"; @@ -117,4 +121,14 @@ public class TaskConfigJava extends MapConfig { return Collections.unmodifiableSet(allInputSS); } + + /** + * Returns a value indicating how long to wait for the tasks to shutdown + * + * @return value indicating how long to wait for the tasks to shutdown + */ + public long getShutdownMs() { + if (get(TASK_SHUTDOWN_MS) == null) return DEFAULT_TASK_SHUTDOWN_MS; + return Long.valueOf(get(TASK_SHUTDOWN_MS)); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java new file mode 100644 index 0000000..973db42 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java @@ -0,0 +1,30 @@ +package org.apache.samza.config; + +public class ZkConfig extends MapConfig { + // Connection string for ZK, format: :<hostname>:<port>,..." + public static final String ZK_CONNECT = "coordinator.zk.connect"; + public static final String ZK_SESSION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms"; + public static final String ZK_CONNECTION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms"; + + public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000; + public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000; + + public ZkConfig (Config config) { + super(config); + } + + public String getZkConnect() { + if (!containsKey(ZK_CONNECT)) { + throw new ConfigException("Missing " + ZK_CONNECT + " config!"); + } + return get(ZK_CONNECT); + } + + public int getZkSessionTimeoutMs() { + return getInt(ZK_SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS); + } + + public int getZkConnectionTimeoutMs() { + return getInt(ZK_CONNECTION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java new file mode 100644 index 0000000..ce0de2b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.apache.samza.job.model.JobModel; + +/** + * A JobCoordinator is a pluggable module in each process that provides the JobModel and the ID to the StreamProcessor. + * In some cases, ID assignment is completely config driven, while in other cases, ID assignment may require + * coordination with JobCoordinators of other StreamProcessors. + * */ +public interface JobCoordinator { + /** + * Starts the JobCoordinator which involves one or more of the following: + * * LeaderElector Module initialization, if any + * * If leader, generate JobModel. Else, read JobModel + */ + void start(); + + /** + * Cleanly shutting down the JobCoordinator involves: + * * Shutting down the LeaderElection module (TBD: details depending on leader or not) + * * TBD + */ + void stop(); + + /** + * Returns the logical ID assigned to the processor + * This may be specified by the user when used as a Library and hence, it is upto the user to ensure that different + * instances of StreamProcessor have unique processor ID. In all other cases, this will be assigned by the leader?? (Need to think more) + * @return integer representing the logical processor ID + */ + int getProcessorId(); + + /** + * Returns the current JobModel + * The implementation of the JobCoordinator in the leader needs to know how to read the config and generate JobModel + * In case of a non-leader, the JobCoordinator should simply fetch the jobmodel + * @return instance of JobModel that describes the partition distribution among the processors (and hence, tasks) + */ + JobModel getJobModel(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java new file mode 100644 index 0000000..af2aaa7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.apache.samza.config.Config; +import org.apache.samza.processor.SamzaContainerController; + +public interface JobCoordinatorFactory { + /** + * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" + * @return An instance of IJobCoordinator + */ + JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java new file mode 100644 index 0000000..fc3cac9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java @@ -0,0 +1,7 @@ +package org.apache.samza.coordinator.leaderelection; + +public interface LeaderElector { + boolean tryBecomeLeader(); + void resignLeadership(); + boolean amILeader(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java new file mode 100644 index 0000000..9352f27 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java @@ -0,0 +1,127 @@ +package org.apache.samza.processor; + +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.container.SamzaContainer$; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.metrics.JmxServer; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SamzaContainerController { + private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class); + + private final ExecutorService executorService; + private volatile SamzaContainer container; + private final Map<String, MetricsReporter> metricsReporterMap; + private final Object taskFactory; + private final long containerShutdownMs; + + // Internal Member Variables + private Future containerFuture; + + /** + * Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer} + * Requests to execute a container are submitted to the {@link ExecutorService} + * + * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or + * {@link org.apache.samza.task.AsyncStreamTask} + * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances + * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance + */ + public SamzaContainerController ( + Object taskFactory, + long containerShutdownMs, + Map<String, MetricsReporter> metricsReporterMap) { + this.executorService = Executors.newSingleThreadExecutor(); + this.taskFactory = taskFactory; + this.metricsReporterMap = metricsReporterMap; + if (containerShutdownMs == -1) { + this.containerShutdownMs = TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS; + } else { + this.containerShutdownMs = containerShutdownMs; + } + } + + /** + * Instantiates a container and submits to the executor. This method does not actually wait for the container to + * fully start-up. For such a behavior, see {@link #awaitStart(long)} + * + * <b>Note:</b> <i>This method does not stop a currently running container, if any. It is left up to the caller to + * ensure that the container has been stopped with stopContainer before invoking this method.</i> + * + * @param containerModel {@link ContainerModel} instance to use for the current run of the Container + * @param config Complete configuration map used by the Samza job + * @param maxChangelogStreamPartitions Max number of partitions expected in the changelog streams + * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments + */ + public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) { + LocalityManager localityManager = null; + if (new ClusterManagerConfig(config).getHostAffinityEnabled()) { + localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config); + } + container = SamzaContainer$.MODULE$.apply( + containerModel.getContainerId(), + containerModel, + config, + maxChangelogStreamPartitions, + localityManager, + new JmxServer(), + Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap), + taskFactory); + containerFuture = executorService.submit(() -> container.run()); + } + + /** + * Method waits for a specified amount of time for the container to fully start-up, which consists of class-loading + * all the components and start message processing + * + * @param timeoutMs Maximum time to wait, in milliseconds + * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting + * time elapsed + * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up + */ + public boolean awaitStart(long timeoutMs) throws InterruptedException { + return container.awaitStart(timeoutMs); + } + + /** + * Stops a running container, if any. Invoking this method multiple times does not have any side-effects. + */ + public void stopContainer() { + if(container == null) { + log.warn("Shutdown before a container was created."); + return; + } + + container.shutdown(); + try { + containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException e) { + log.error("Ran into problems while trying to stop the container in the processor!", e); + } catch (TimeoutException e) { + log.warn("Got Timeout Exception while trying to stop the container in the processor! The processor may not shutdown properly", e); + } + } + + /** + * Shutsdown the controller by first stop any running container and then, shutting down the {@link ExecutorService} + */ + public void shutdown() { + stopContainer(); + executorService.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java new file mode 100644 index 0000000..0f34400 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.processor; + +import org.apache.samza.config.Config; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as + * independent processes <br /> + * <p> + * <b>Usage Example:</b> + * <pre> + * StreamProcessor processor = new StreamProcessor(1, config); <br /> + * processor.start(); + * try { + * boolean status = processor.awaitStart(TIMEOUT_MS); // Optional - blocking call + * if (!status) { + * // Timed out + * } + * ... + * } catch (InterruptedException ie) { + * ... + * } finally { + * processor.stop(); + * } + * </pre> + */ +public class StreamProcessor { + private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class); + /** + * processor.id is equivalent to containerId in samza. It is a logical identifier used by Samza for a processor. + * In a distributed environment, this logical identifier is mapped to a physical identifier of the resource. For + * example, Yarn provides a "containerId" for every resource it allocates. + * In an embedded environment, this identifier is provided by the user by directly using the StreamProcessor API. + * <p> + * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors. + */ + private static final String PROCESSOR_ID = "processor.id"; + private final int processorId; + private final JobCoordinator jobCoordinator; + private final SamzaContainerController containerController; + + /** + * Create an instance of StreamProcessor that encapsulates a JobCoordinator and Samza Container + * <p> + * JobCoordinator controls how the various StreamProcessor instances belonging to a job coordinate. It is also + * responsible generating and updating JobModel. + * When StreamProcessor starts, it starts the JobCoordinator and brings up a SamzaContainer based on the JobModel. + * SamzaContainer is executed using an ExecutorService. <br /> + * <p> + * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user + * + * @param processorId Unique identifier for a processor within the job. It has the same semantics as + * "containerId" in Samza + * @param config Instance of config object - contains all configuration required for processing + * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job + */ + public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) { + this(processorId, config, customMetricsReporters, (Object) null); + } + + private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, + Object taskFactory) { + this.processorId = processorId; + + Map<String, String> updatedConfigMap = new HashMap<>(); + updatedConfigMap.putAll(config); + updatedConfigMap.put(PROCESSOR_ID, String.valueOf(processorId)); + Config updatedConfig = new MapConfig(updatedConfigMap); + + + this.containerController = new SamzaContainerController( + taskFactory, + new TaskConfigJava(updatedConfig).getShutdownMs(), + customMetricsReporters); + + this.jobCoordinator = Util. + <JobCoordinatorFactory>getObj( + new JobCoordinatorConfig(updatedConfig) + .getJobCoordinatorFactoryClassName()) + .getJobCoordinator(processorId, updatedConfig, this.containerController); + } + + /** + * StreamProcessor Lifecycle: start() + * <ul> + * <li>Starts the JobCoordinator and fetches the JobModel</li> + * <li>Starts the container using ContainerModel based on the processorId </li> + * </ul> + * When start() returns, it only guarantees that the container is initialized and submitted by the controller to + * execute + */ + public void start() { + jobCoordinator.start(); + } + + /** + * Method that allows the user to wait for a specified amount of time for the container to initialize and start + * processing messages + * + * @param timeoutMs Maximum time to wait, in milliseconds + * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting time + * elapsed + * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up + */ + public boolean awaitStart(long timeoutMs) throws InterruptedException { + return containerController.awaitStart(timeoutMs); // TODO: Should awaitStart be part of the JC interface, instead of directly using container controller + } + + /** + * StreamProcessor Lifecycle: stop() + * <ul> + * <li>Stops the SamzaContainer execution</li> + * <li>Stops the JobCoordinator</li> + * </ul> + */ + public void stop() { + jobCoordinator.stop(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java new file mode 100644 index 0000000..7fe1422 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.standalone; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.JobModelManager$; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.processor.SamzaContainerController; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.util.SystemClock; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Standalone Job Coordinator does not implement any leader elector module or cluster manager + * + * It generates the JobModel using the Config passed into the constructor. + * Since the standalone JobCoordinator does not perform partition management, it allows two kinds of partition + * distribution mechanism - consumer-managed partition distribution and user-defined fixed partition distribution. + * + * */ +public class StandaloneJobCoordinator implements JobCoordinator { + private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class); + private final int processorId; + private final Config config; + private final JobModelManager jobModelManager; + private final SamzaContainerController containerController; + + @VisibleForTesting + StandaloneJobCoordinator( + int processorId, + Config config, + SamzaContainerController containerController, + JobModelManager jobModelManager) { + this.processorId = processorId; + this.config = config; + this.containerController = containerController; + this.jobModelManager = jobModelManager; + } + + public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { + this.processorId = processorId; + this.config = config; + this.containerController = containerController; + + JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); + Map<String, SystemAdmin> systemAdmins = new HashMap<>(); + for (String systemName: systemConfig.getSystemNames()) { + String systemFactoryClassName = systemConfig.getSystemFactory(systemName); + if (systemFactoryClassName == null) { + log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + } + SystemFactory systemFactory = Util.getObj(systemFactoryClassName); + systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); + } + + StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); + + /** TODO: + * Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also, + * in SamzaContainer for writing locality info to the coordinator stream. This closely couples together + * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator + * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) + */ + this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + } + + @Override + public void start() { + // No-op + JobModel jobModel = getJobModel(); + containerController.startContainer( + jobModel.getContainers().get(processorId), + jobModel.getConfig(), + jobModel.maxChangeLogStreamPartitions); + } + + @Override + public void stop() { + // No-op + containerController.shutdown(); + } + + @Override + public int getProcessorId() { + return this.processorId; + } + + @Override + public JobModel getJobModel() { + return jobModelManager.jobModel(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java new file mode 100644 index 0000000..7ca85c0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.standalone; + +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.processor.SamzaContainerController; + +public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory { + @Override + public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { + return new StandaloneJobCoordinator(processorId, config, containerController); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java new file mode 100644 index 0000000..691aced --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java @@ -0,0 +1,9 @@ +package org.apache.samza.zk; + +import java.util.List; + + +public interface BarrierForVersionUpgrade { + void leaderStartBarrier(String version, List<String> processorsNames); + void waitForBarrier(String version, String processorsName, Runnable callback); +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java new file mode 100644 index 0000000..1854de6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -0,0 +1,56 @@ +package org.apache.samza.zk; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScheduleAfterDebounceTime { + public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class); + public static final long timeoutMs = 1000*10; + + public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange"; + public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange"; + public static final String ON_DATA_CHANGE_ON = "OnDataChanteOn"; + public static final int DEBOUNCE_TIME_MS = 2000; + + + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + private final Map<String, ScheduledFuture> futureHandles = new HashMap<>(); + + public ScheduleAfterDebounceTime () { + + } + + synchronized public void scheduleAfterDebounceTime (String actionName, long debounceTimeMs, Runnable runnable) {//, final ReadyToCreateJobModelListener listener) { + // check if this action has been scheduled already + ScheduledFuture sf = futureHandles.get(actionName); + if(sf != null && !sf.isDone()) { + LOG.info(">>>>>>>>>>>DEBOUNCE: cancel future for " + actionName); + // attempt to cancel + if(! sf.cancel(false) ) { + try { + sf.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + // we ignore the exception + LOG.warn("cancel for action " + actionName + " failed with ", e); + } + } + futureHandles.remove(actionName); + } + // schedule a new task + sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, TimeUnit.MILLISECONDS); + LOG.info(">>>>>>>>>>>DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs); + futureHandles.put(actionName, sf); + } + + public void stopScheduler() { + // shutdown executor service + scheduledExecutorService.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java new file mode 100644 index 0000000..60a06da --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -0,0 +1,147 @@ +package org.apache.samza.zk; + +import java.util.List; +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { + private final ZkUtils zkUtils; + private final ZkKeyBuilder keyBuilder; + private final static String BARRIER_DONE = "done"; + private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class); + + private final ScheduleAfterDebounceTime debounceTimer; + + final private String barrierPrefix; + + public ZkBarrierForVersionUpgrade( ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) { + this.zkUtils = zkUtils; + keyBuilder = zkUtils.getKeyBuilder(); + + barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix(); + this.debounceTimer = debounceTimer; + } + + @Override + public void leaderStartBarrier(String version, List<String> processorsNames) { + String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + String barrierDonePath = String.format("%s/barrier_done", barrierPath); + String barrierProcessors = String.format("%s/barrier_processors", barrierPath); + String barrier = String.format("%s/%s/barrier", barrierPrefix, version); + + // TODO - do we need a check if it exists - it needs to be deleted? + zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath}); + + // callback for when the barrier is reached + Runnable callback = new Runnable() { + @Override + public void run() { + LOG.info("Writing BARRIER DONE to " + barrierDonePath); + zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); + } + }; + // subscribe for processor's list changes + LOG.info("Subscribing for child changes at " + barrierProcessors); + zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, + new ZkBarrierChangeHandler(callback, processorsNames)); + } + + @Override + public void waitForBarrier(String version, String processorsName, Runnable callback) { + // if participant makes this call it means it has already stopped the old container and got the new job model. + String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + String barrierDonePath = String.format("%s/barrier_done", barrierPath); + String barrierProcessors = String.format("%s/barrier_processors", barrierPath); + String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName); + + + // update the barrier for this processor + LOG.info("Creating a child for barrier at " + barrierProcessorThis); + zkUtils.getZkClient().createPersistent(barrierProcessorThis); + + // now subscribe for the barrier + zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback)); + } + + /** + * listener for the subscription. + */ + class ZkBarrierChangeHandler implements IZkChildListener { + Runnable callback; + List<String> names; + + public ZkBarrierChangeHandler(Runnable callback, List<String> names) { + this.callback = callback; + this.names = names; + } + + @Override + public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception { + // Find out the event & Log + boolean allIn = true; + + if(currentChildren == null) { + LOG.info("Got handleChildChange with null currentChildren"); + return; + } + // debug + StringBuilder sb = new StringBuilder(); + for (String child : currentChildren) { + sb.append(child).append(","); + } + LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString()); + sb = new StringBuilder(); + for (String child : names) { + sb.append(child).append(","); + } + LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString()); + + + // check if all the names are in + for(String n : names) { + if(!currentChildren.contains(n)) { + LOG.info("node " + n + " is still not in the list "); + allIn = false; + break; + } + } + if(allIn) { + LOG.info("ALl nodes reached the barrier"); + callback.run(); // all the names have registered + } + } + } + + class ZkBarrierReachedHandler implements IZkDataListener { + private final ScheduleAfterDebounceTime debounceTimer; + private final String barrierPathDone; + private final Runnable callback; + public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) { + this.barrierPathDone = barrierPathDone; + this.callback = callback; + this.debounceTimer = debounceTimer; + } + + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + String done = (String) data; + LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done); + if (done.equals(BARRIER_DONE)) { + zkUtils.unsubscribeDataChanges(barrierPathDone, this); + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback); + } else { + // TODO do we need to resubscribe? + } + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + LOG.warn("barrier done got deleted at " + dataPath); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java new file mode 100644 index 0000000..20e55ab --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java @@ -0,0 +1,11 @@ +package org.apache.samza.zk; + + +public interface ZkController { + void register (); + boolean isLeader(); + void notifyJobModelChange(String version); + void stop(); + void listenToProcessorLiveness(); + String currentJobModelVersion(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java new file mode 100644 index 0000000..fdd1f02 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java @@ -0,0 +1,136 @@ +package org.apache.samza.zk; + +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.samza.SamzaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + + +public class ZkControllerImpl implements ZkController { + private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class); + + private String processorIdStr; + private final ZkUtils zkUtils; + private final ZkListener zkListener; + private final ZkLeaderElector leaderElector; + private final ScheduleAfterDebounceTime debounceTimer; + + public ZkControllerImpl (String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) { + this.processorIdStr = processorIdStr; + this.zkUtils = zkUtils; + this.zkListener = zkListener; + this.leaderElector = new ZkLeaderElector(this.processorIdStr, this.zkUtils, this.zkListener); + this.debounceTimer = debounceTimer; + + init(); + } + + @Override + public void register() { + + // TODO - make a loop here with some number of attempts. + // possibly split into two method - becomeLeader() and becomeParticipant() + boolean isLeader = leaderElector.tryBecomeLeader(); + if(isLeader) { + listenToProcessorLiveness(); + + // zkUtils.subscribeToProcessorChange(zkProcessorChangeListener); + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, + ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkListener.onBecomeLeader()); // RECONSIDER MAKING THIS SYNC CALL + + } + + // subscribe to JobModel version updates + zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer)); + } + + private void init() { + ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder(); + zkUtils.makeSurePersistentPathsExists(new String[] { + keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix()}); + } + + @Override + public boolean isLeader() { + return leaderElector.amILeader(); + } + + @Override + public void notifyJobModelChange(String version) { + zkListener.onNewJobModelAvailable(version); + } + + @Override + public void stop() { + if (isLeader()) { + leaderElector.resignLeadership(); + } + zkUtils.close(); + } + + @Override + public void listenToProcessorLiveness() { + zkUtils.subscribeToProcessorChange(new ZkProcessorChangeHandler(debounceTimer)); + } + + @Override + public String currentJobModelVersion() { + return zkUtils.getJobModelVersion(); + } + + // Only by Leader + class ZkProcessorChangeHandler implements IZkChildListener { + private final ScheduleAfterDebounceTime debounceTimer; + public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) { + this.debounceTimer = debounceTimer; + } + /** + * Called when the children of the given path changed. + * + * @param parentPath The parent path + * @param currentChilds The children or null if the root node (parent path) was deleted. + * @throws Exception + */ + @Override + public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { + LOG.info( + "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + " Current Children: " + + currentChilds); + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, + ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkListener.onProcessorChange(currentChilds)); + } + } + + class ZkJobModelVersionChangeHandler implements IZkDataListener { + private final ScheduleAfterDebounceTime debounceTimer; + public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) { + this.debounceTimer = debounceTimer; + } + /** + * called when job model version gets updated + * @param dataPath + * @param data + * @throws Exception + */ + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data=" + + (String) data); + + debounceTimer + .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data)); + } + @Override + public void handleDataDeleted(String dataPath) throws Exception { + throw new SamzaException("version update path has been deleted!."); + } + } + + public void shutdown() { + if(debounceTimer != null) + debounceTimer.stopScheduler(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java new file mode 100644 index 0000000..8c36ff2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -0,0 +1,198 @@ +package org.apache.samza.zk; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.processor.SamzaContainerController; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.util.SystemClock; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * JobCoordinator for stand alone processor managed via Zookeeper. + */ +public class ZkJobCoordinator implements JobCoordinator, ZkListener { + private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class); + + private final ZkUtils zkUtils; + private final int processorId; + private final ZkController zkController; + private final SamzaContainerController containerController; + + private final BarrierForVersionUpgrade barrier; + + + ///////////////////////////////////////// + private JobModel newJobModel; + private String newJobModelVersion; // version published in ZK (by the leader) + private Config config; + private ZkKeyBuilder keyBuilder; + private final ScheduleAfterDebounceTime debounceTimer; + //JobModelManager jobModelManager; + + public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) { + this.zkUtils = zkUtils; + this.keyBuilder = zkUtils.getKeyBuilder(); + this.debounceTimer = debounceTimer; + this.processorId = processorId; + this.containerController = containerController; + this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this); + this.config = config; + + + barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer); //should not have any state in it + + + + // TEMP for model generation + //////////////////////////////// NEEDS TO BE REPLACED ////////////////////////////////////// + JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); + Map<String, SystemAdmin> systemAdmins = new HashMap<>(); + for (String systemName: systemConfig.getSystemNames()) { + String systemFactoryClassName = systemConfig.getSystemFactory(systemName); + if (systemFactoryClassName == null) { + log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + } + SystemFactory systemFactory = Util.getObj(systemFactoryClassName); + systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); + } + + StreamMetadataCache + streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock + .instance()); + + //jobModelManager = //JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + + //////////////////////////////////////////////////////////////////////////////////////////// + } + + @Override + public void start() { + zkController.register(); + } + + public void cleanupZk() { + zkUtils.deleteRoot(); + } + + @Override + public void stop() { + zkController.stop(); + } + + @Override + public int getProcessorId() + { + return processorId; + } + + @Override + public JobModel getJobModel() { + return newJobModel; + } + + //////////////////////////////////////////////// LEADER stuff /////////////////////////// + @Override + public void onBecomeLeader() { + log.info("ZkJobCoordinator::onBecomeLeader - I become the leader!"); + //zkController.listenToProcessorLiveness(); + // Reset debounce Timer + + // generate JobProcess + generateNewJobModel(); + } + + private void generateNewJobModel() { + // get the current list of processors + List<String> currentProcessors = zkUtils.getActiveProcessors(); + + // get the current version + String currentJMVersion = zkUtils.getJobModelVersion(); + String nextJMVersion; + if(currentJMVersion == null) + nextJMVersion = "1"; + else + nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1); + log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion); + + Map<String, String> configMap = new HashMap<>(); + Map<Integer, ContainerModel> containers = new HashMap<>(); + MapConfig config = new MapConfig(configMap); + JobModel jobModel = new JobModel(config, containers); + + log.info("pid=" + processorId + "Generated jobModel: " + jobModel); + + // publish the new version + zkUtils.publishNewJobModel(nextJMVersion, jobModel); + log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel); + + // start the barrier for the job model update + barrier.leaderStartBarrier(nextJMVersion, currentProcessors); + + // publish new JobModel version + zkUtils.publishNewJobModelVersion(currentJMVersion, nextJMVersion); + log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion); + } + + ////////////////////////////////////////////////////////////////////////////////////////////// + @Override + public void onProcessorChange(List<String> processorIds) { + // Reset debounce Timer + log.info("ZkJobCoordinator::onProcessorChange - Processors changed! List: " + Arrays.toString(processorIds.toArray())); + generateNewJobModel(); + } + + @Override + public void onNewJobModelAvailable(final String version) { + newJobModelVersion = version; + log.info("pid=" + processorId + "new JobModel available"); + // stop current work + containerController.stopContainer(); + log.info("pid=" + processorId + "new JobModel available.Container stopped."); + // get the new job model + newJobModel = zkUtils.getJobModel(version); + log.info("pid=" + processorId + "new JobModel available. ver=" + version + "; jm = " + newJobModel); + + + String currentPath = zkUtils.getEphemeralPath(); + + String zkProcessorId = keyBuilder.parseIdFromPath(currentPath); + + // update ZK and wait for all the processors to get this new version + barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() { + @Override + public void run() { + onNewJobModelConfirmed(version); + } + }); + } + + @Override + public void onNewJobModelConfirmed(String version) { + log.info("pid=" + processorId + "new version " + version + " of the job model got confirmed"); + // get the new Model + // ????? + JobModel jobModel = getJobModel(); + log.info("pid=" + processorId + "got the new job model =" + jobModel); + /* + containerController.startContainer( + jobModel.getContainers().get(processorId), + jobModel.getConfig(), + jobModel.maxChangeLogStreamPartitions); + */ + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java new file mode 100644 index 0000000..90b0097 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -0,0 +1,37 @@ +package org.apache.samza.zk; + +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaJobConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.processor.SamzaContainerController; + +public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { + /** + * Method to instantiate an implementation of JobCoordinator + * + * @param processorId Indicates the StreamProcessor's id to which this Job Coordinator is associated with + * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" + * @return An instance of IJobCoordinator + */ + @Override + public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { + JavaJobConfig jobConfig = new JavaJobConfig(config); + String groupName = String.format("%s-%s", jobConfig.getJobName(), jobConfig.getJobId()); + ZkConfig zkConfig = new ZkConfig(config); + ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); + return new ZkJobCoordinator( + processorId, + config, + debounceTimer, + new ZkUtils( + new ZkKeyBuilder(groupName), + zkConfig.getZkConnect(), + debounceTimer, + String.valueOf(processorId), + zkConfig.getZkSessionTimeoutMs(), + zkConfig.getZkConnectionTimeoutMs()), + containerController); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java new file mode 100644 index 0000000..7ad62be --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java @@ -0,0 +1,45 @@ +package org.apache.samza.zk; + +public class ZkKeyBuilder { + private final String pathPrefix; + public static final String PROCESSORS_PATH = "processors"; + + public static final String JOBMODEL_VERSION_PATH = "jobModelVersion"; + + public ZkKeyBuilder () { + this(""); + } + public ZkKeyBuilder (String pathPrefix) { + this.pathPrefix = pathPrefix; + } + + public String getProcessorsPath() { + return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH); + } + + public static String parseIdFromPath(String path) { + if (path != null) + return path.substring(path.indexOf("processor-")); + return null; + } + + public String getJobModelVersionPath() { + return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH); + } + + public String getJobModelPathPrefix() { + return String.format("/%s/jobModels", pathPrefix); + } + + public String getJobModelPath(String jobModelVersion) { + return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion); + } + + public String getJobModelVersionBarrierPrefix() { + return String.format("/%s/versionBarriers", pathPrefix); + } + + public String getRootPath() { + return "/" + pathPrefix; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java new file mode 100644 index 0000000..8bffeb6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -0,0 +1,110 @@ +package org.apache.samza.zk; + +import java.util.Arrays; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.leaderelection.LeaderElector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Random; + + +public class ZkLeaderElector implements LeaderElector { + public static final Logger log = LoggerFactory.getLogger(ZkLeaderElector.class); + private final ZkUtils zkUtils; + private final ZkListener zkListener; + private final String processorIdStr; + private final ZkKeyBuilder keyBuilder; + + private String leaderId = null; + private final ZkLeaderListener zkLeaderListener = new ZkLeaderListener(); + private String currentSubscription = null; + private final Random random = new Random(); + + public ZkLeaderElector (String processorIdStr, ZkUtils zkUtils, ZkListener zkListener) { + this.processorIdStr = processorIdStr; + this.zkUtils = zkUtils; + this.keyBuilder = this.zkUtils.getKeyBuilder(); + this.zkListener = zkListener; + } + + @Override + public boolean tryBecomeLeader() { + String currentPath = zkUtils.getEphemeralPath(); + + if (currentPath == null || currentPath.isEmpty()) { + zkUtils.registerProcessorAndGetId(); + currentPath = zkUtils.getEphemeralPath(); + } + + List<String> children = zkUtils.getActiveProcessors(); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath)); + + if (index == -1) { + // Retry register here?? + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect??"); + } + + if (index == 0) { + log.info("pid=" + processorIdStr + " Eligible to be the leader!"); + leaderId = ZkKeyBuilder.parseIdFromPath(currentPath); + return true; + } + + log.info("pid=" + processorIdStr + ";index=" + index + ";children=" + Arrays.toString(children.toArray()) + " Not eligible to be a leader yet!"); + leaderId = ZkKeyBuilder.parseIdFromPath(children.get(0)); + String prevCandidate = children.get(index - 1); + if (!prevCandidate.equals(currentSubscription)) { + if (currentSubscription != null) { + zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderListener); + } + currentSubscription = prevCandidate; + log.info("pid=" + processorIdStr + "Subscribing to " + prevCandidate); + zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderListener); + } + + // Double check that the previous candidate still exists + boolean prevCandidateExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription); + if (prevCandidateExists) { + log.info("pid=" + processorIdStr + "Previous candidate still exists. Continuing as non-leader"); + } else { + // TODO - what actually happens here.. + try { + Thread.sleep(random.nextInt(1000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + log.info("pid=" + processorIdStr + "Previous candidate doesn't exist anymore. Trying to become leader again..."); + return tryBecomeLeader(); + } + return false; + } + + @Override + public void resignLeadership() { + } + + @Override + public boolean amILeader() { + return zkUtils.getEphemeralPath() != null + && leaderId != null + && leaderId.equals(ZkKeyBuilder.parseIdFromPath(zkUtils.getEphemeralPath())); + } + + // Only by non-leaders + class ZkLeaderListener implements IZkDataListener { + + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + log.info("ZkLeaderListener::handleDataChange on path " + dataPath + " Data: " + data); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + log.info("ZkLeaderListener::handleDataDeleted on path " + dataPath); + tryBecomeLeader(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java new file mode 100644 index 0000000..4a1c491 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java @@ -0,0 +1,11 @@ +package org.apache.samza.zk; + +import java.util.List; + +public interface ZkListener { + void onBecomeLeader(); + void onProcessorChange(List<String> processorIds); + + void onNewJobModelAvailable(String version); // start job model update (stop current work) + void onNewJobModelConfirmed(String version); // start new work according to the new model +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java new file mode 100644 index 0000000..6655468 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -0,0 +1,237 @@ +package org.apache.samza.zk; + +import java.io.IOException; +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.IZkStateListener; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.apache.samza.SamzaException; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class ZkUtils { + private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class); + + public final ReentrantLock lock = new ReentrantLock(); + + private final ZkStateChangeHandler zkStateChangeHandler; + private final ZkClient zkClient; + private final ZkConnection zkConnnection; + private volatile String ephemeralPath = null; + private final ZkKeyBuilder keyBuilder; + private final int sessionTimeoutMs; + private final int connectionTimeoutMs; + private final ScheduleAfterDebounceTime debounceTimer; + private final String processorId; + + public ZkUtils(String zkConnectString, ScheduleAfterDebounceTime debounceTimer, String processorId) { + this(new ZkKeyBuilder(), zkConnectString, debounceTimer, processorId, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); + } + + public ZkUtils(ZkKeyBuilder zkKeyBuilder, String zkConnectString, ScheduleAfterDebounceTime debounceTimer, String processorId, int sessionTimeoutMs, int connectionTimeoutMs) { + this.keyBuilder = zkKeyBuilder; + this.sessionTimeoutMs = sessionTimeoutMs; + this.connectionTimeoutMs = connectionTimeoutMs; + this.zkConnnection = new ZkConnection(zkConnectString, this.sessionTimeoutMs); + this.zkClient = new ZkClient(zkConnnection, this.connectionTimeoutMs); + this.zkClient.waitForKeeperState(Watcher.Event.KeeperState.SyncConnected, 10000, TimeUnit.MILLISECONDS); + this.debounceTimer = debounceTimer; + this.zkStateChangeHandler = new ZkStateChangeHandler(debounceTimer); + this.processorId = processorId; + } + + public void connect() throws ZkInterruptedException { + boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS); + if (!isConnected) { + throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!"); + } else { + zkClient.subscribeStateChanges(zkStateChangeHandler); + } + } + + public ZkClient getZkClient() { + return zkClient; + } + + public ZkConnection getZkConnnection() { + return zkConnnection; + } + + public ZkKeyBuilder getKeyBuilder() { + return keyBuilder; + } + public void makeSurePersistentPathsExists(String[] paths) { + for(String path: paths) { + if (!zkClient.exists(path)) { + zkClient.createPersistent(path, true); + } + } + } + + public synchronized String registerProcessorAndGetId() { + try { + // TODO: Data should be more than just the hostname. Use Json serialized data + ephemeralPath = + zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + "/processor-", InetAddress.getLocalHost().getHostName()); + return ephemeralPath; + } catch (UnknownHostException e) { + throw new RuntimeException("Failed to register as worker. Aborting..."); + } + } + + public synchronized String getEphemeralPath() { + return ephemeralPath; + } + + public List<String> getActiveProcessors() { + List<String> children = zkClient.getChildren(keyBuilder.getProcessorsPath()); + assert children.size() > 0; + Collections.sort(children); + LOG.info("Found these children - " + children); + return children; + } + + ////////////////////////// TEMP ///////////////// NEEDS to be discussed //////////////// + public void publishNewJobModel(String jobModelVersion, JobModel jobModel) { + try { + // We assume (needs to be verified) that this call will FAIL if the node already exists!!!!!!!! + ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); + String jobModelStr = mmapper.writeValueAsString(jobModel); + LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr); + zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr); + LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion)); + } catch (Exception e) { + throw new SamzaException(e); + } + } + public JobModel getJobModel(String jobModelVersion) { + LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion)); + Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion)); + ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); + JobModel jm; + try { + jm = mmapper.readValue((String)data, JobModel.class); + } catch (IOException e) { + throw new SamzaException("failed to read JobModel from ZK", e); + } + return jm; + } + /////////////////////////////////////////////////////////////////////////// + + + public String getJobModelVersion() { + return zkClient.<String>readData(keyBuilder.getJobModelVersionPath()); + } + + public void publishNewJobModelVersion(String oldVersion, String newVersion) { + Stat stat = new Stat(); + String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat); + LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat.getVersion() + ")"); + if(currentVersion != null && !currentVersion.equals(oldVersion)) { + throw new SamzaException("Someone change JMVersion while Leader was generating: expected" + oldVersion + ", got " + currentVersion); + } + int dataVersion = stat.getVersion(); + stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion); + if(stat.getVersion() != dataVersion + 1) + throw new SamzaException("Someone changed data version of the JMVersion while Leader was generating a new one. current= " + dataVersion + ", old version = " + stat.getVersion()); + + LOG.info("pid=" + processorId + + " published new version: " + newVersion + "; expected dataVersion = " + dataVersion + "(" + stat.getVersion() + + ")"); + } + + /** + * subscribe for changes of JobModel version + * @param dataListener + */ + public void subscribeToJobModelVersionChange(IZkDataListener dataListener) { + LOG.info("pid=" + processorId + " subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath()); + zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener); + } + + public void subscribeToProcessorChange(IZkChildListener listener) { + LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath()); + zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener); + } + + /* Wrapper for standard I0Itec methods */ + public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { + LOG.info("pid=" + processorId + " unsubscribing for data change at:" + path); + zkClient.unsubscribeDataChanges(path, dataListener); + } + + public void subscribeDataChanges(String path, IZkDataListener dataListener) { + LOG.info("pid=" + processorId + " subscribing for data change at:" + path); + zkClient.subscribeDataChanges(path, dataListener); + } + + public boolean exists(String path) { + return zkClient.exists(path); + } + + public void close() { + zkClient.close(); + } + + public void deleteRoot() { + String rootPath = keyBuilder.getRootPath(); + if(rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) { + LOG.info("pid=" + processorId + " Deleteing root: " + rootPath); + zkClient.deleteRecursive(rootPath); + } + } + + class ZkStateChangeHandler implements IZkStateListener { + private final ScheduleAfterDebounceTime debounceTimer; + public ZkStateChangeHandler(ScheduleAfterDebounceTime debounceTimer) { + this.debounceTimer = debounceTimer; + } + + /** + * Called when the zookeeper connection state has changed. + * + * @param state The new state. + * @throws Exception On any error. + */ + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { } + + /** + * Called after the zookeeper session has expired and a new session has been created. You would have to re-create + * any ephemeral nodes here. + * + * @throws Exception On any error. + */ + @Override + public void handleNewSession() throws Exception { + + } + + /** + * Called when a session cannot be re-established. This should be used to implement connection + * failure handling e.g. retry to connect or pass the error up + * + * @param error The error that prevents a session from being established + * @throws Exception On any error. + */ + @Override + public void handleSessionEstablishmentError(Throwable error) throws Exception { + + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index f1d62c5..f4d605f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -32,7 +32,7 @@ import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer -import org.apache.samza.config.ShellCommandConfig +import org.apache.samza.config.{Config, ShellCommandConfig} import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System @@ -112,7 +112,14 @@ object SamzaContainer extends Logging { try { jmxServer = newJmxServer() - SamzaContainer(containerModel, jobModel, jmxServer).run + val containerModel = jobModel.getContainers.get(containerId.toInt) + SamzaContainer( + containerId.toInt, + containerModel, + config, + jobModel.maxChangeLogStreamPartitions, + getLocalityManager(containerId, config), + jmxServer).run } finally { if (jmxServer != null) { jmxServer.stop @@ -120,6 +127,17 @@ object SamzaContainer extends Logging { } } + def getLocalityManager(containerId: Int, config: Config): LocalityManager = { + val containerName = getSamzaContainerName(containerId) + val registryMap = new MetricsRegistryMap(containerName) + val coordinatorSystemProducer = + new CoordinatorStreamSystemFactory() + .getCoordinatorStreamSystemProducer( + config, + new SamzaContainerMetrics(containerName, registryMap).registry) + new LocalityManager(coordinatorSystemProducer) + } + /** * Fetches config, task:SSP assignments, and task:changelog partition * assignments, and returns objects to be used for SamzaContainer's @@ -136,10 +154,19 @@ object SamzaContainer extends Logging { classOf[JobModel]) } - def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = { - val config = jobModel.getConfig - val containerId = containerModel.getContainerId - val containerName = "samza-container-%s" format containerId + def getSamzaContainerName(containerId: Int): String = { + "samza-container-%d" format containerId + } + + def apply( + containerId: Int, + containerModel: ContainerModel, + config: Config, + maxChangeLogStreamPartitions: Int, + localityManager: LocalityManager, + jmxServer: JmxServer, + customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter]()) = { + val containerName = getSamzaContainerName(containerId) val containerPID = Util.getContainerPID info("Setting up Samza container: %s" format containerName) @@ -528,7 +555,7 @@ object SamzaContainer extends Logging { taskStores = taskStores, storeConsumers = storeConsumers, changeLogSystemStreams = changeLogSystemStreams, - jobModel.maxChangeLogStreamPartitions, + maxChangeLogStreamPartitions, streamMetadataCache = streamMetadataCache, storeBaseDir = defaultStoreBaseDir, loggedStoreBaseDir = loggedStorageBaseDir, http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index eaab3a6..9ccf6fc 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -38,7 +38,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { info("Creating a ThreadJob, which is only meant for debugging.") val coordinator = JobModelManager(config) - val containerModel = coordinator.jobModel.getContainers.get(0) + val jobModel = coordinator.jobModel + val containerModel = jobModel.getContainers.get(0) // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. config.getTaskOpts match { @@ -52,7 +53,13 @@ class ThreadJobFactory extends StreamJobFactory with Logging { override def run(): Unit = { val jmxServer = new JmxServer try { - SamzaContainer(containerModel, coordinator.jobModel, jmxServer).run() + SamzaContainer( + containerModel.getContainerId, + containerModel, + config, + jobModel.maxChangeLogStreamPartitions, + null, + new JmxServer) } finally { jmxServer.stop }
