Repository: samza Updated Branches: refs/heads/master 72ad7523f -> 02153fa50
SAMZA-1647: Fix NPE in onJobModelExpired handler in StreamProcessor. **Changes:** * Switching to using explicit lock in StreamProcessor to make things simpler on state updation. * Switch from using synchronized in ZkJobCoordinator to prevent any potential deadlocks between two threads (where one thread holds the StreamProcessor and other thread has ZkJobCoordinator lock). * Misc cleanups in StreamProcessor: Remove volatile qualifiers from state variables in StreamProcessor. Remove reinstantiating the executorService in onNewJobModel. * ZkJobCoordinator cleanups: Make some state variables as immutable. **NOTE**: The classes in which these changes were made were aynonymous inner classes, so to add proper unit tests we need to do big haul of refactor. Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Jagadish <[email protected]> Closes #493 from shanthoosh/fix_npe_in_jobmodel_expired_handler Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/02153fa5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/02153fa5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/02153fa5 Branch: refs/heads/master Commit: 02153fa506e38b2e7f01c0374089e200bfe1e363 Parents: 72ad752 Author: Shanthoosh Venkataraman <[email protected]> Authored: Mon May 21 11:09:44 2018 -0700 Committer: Jagadish <[email protected]> Committed: Mon May 21 11:09:44 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/processor/StreamProcessor.java | 216 ++++++++++--------- .../org/apache/samza/zk/ZkJobCoordinator.java | 84 ++++---- 2 files changed, 152 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/02153fa5/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 40deb1b..73f32e7 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -66,16 +66,16 @@ public class StreamProcessor { private final Config config; private final long taskShutdownMs; private final String processorId; + private final ExecutorService executorService; + private final Object lock = new Object(); - private ExecutorService executorService; - - private volatile SamzaContainer container = null; - private volatile Throwable containerException = null; + private SamzaContainer container = null; + private Throwable containerException = null; + private boolean processorOnStartCalled = false; // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is // stopped due to re-balancing volatile CountDownLatch jcContainerShutdownLatch; - private volatile boolean processorOnStartCalled = false; @VisibleForTesting JobCoordinatorListener jobCoordinatorListener = null; @@ -97,7 +97,7 @@ public class StreamProcessor { */ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) { - this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener, null); + this(config, customMetricsReporters, asyncStreamTaskFactory, processorListener, null); } /** @@ -110,7 +110,7 @@ public class StreamProcessor { */ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) { - this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null); + this(config, customMetricsReporters, streamTaskFactory, processorListener, null); } /* package private */ @@ -134,8 +134,9 @@ public class StreamProcessor { this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : getJobCoordinator(); this.jobCoordinatorListener = createJobCoordinatorListener(); this.jobCoordinator.setListener(jobCoordinatorListener); - - processorId = this.jobCoordinator.getProcessorId(); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build(); + this.executorService = Executors.newSingleThreadExecutor(threadFactory); + this.processorId = this.jobCoordinator.getProcessorId(); } /** @@ -175,32 +176,28 @@ public class StreamProcessor { * If container is not running, then this method will simply shutdown the {@link JobCoordinator}. * */ - public synchronized void stop() { - boolean containerShutdownInvoked = false; - if (container != null) { - try { - LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); - container.shutdown(); - LOGGER.info("Waiting {} milliseconds for the container: {} to shutdown.", taskShutdownMs, container); - containerShutdownInvoked = true; - } catch (Exception exception) { - LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception); + public void stop() { + synchronized (lock) { + boolean containerShutdownInvoked = false; + if (container != null) { + try { + LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); + container.shutdown(); + containerShutdownInvoked = true; + } catch (Exception exception) { + LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception); + } } - } - if (!containerShutdownInvoked) { - LOGGER.info("Shutting down JobCoordinator from StreamProcessor"); - jobCoordinator.stop(); + if (!containerShutdownInvoked) { + LOGGER.info("Shutting down JobCoordinator from StreamProcessor"); + jobCoordinator.stop(); + } } } SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { - return SamzaContainer.apply( - processorId, - jobModel, - config, - ScalaJavaUtil.toScalaMap(customMetricsReporter), - taskFactory); + return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory); } JobCoordinatorListener createJobCoordinatorListener() { @@ -208,91 +205,52 @@ public class StreamProcessor { @Override public void onJobModelExpired() { - if (container != null) { - SamzaContainerStatus status = container.getStatus(); - if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) { - boolean shutdownComplete = false; - try { - LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId); - container.pause(); - shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); - LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete)); - } catch (IllegalContainerStateException icse) { - // Ignored since container is not running - LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); - shutdownComplete = true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e); - } - if (!shutdownComplete) { - LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId); - container = null; - stop(); + synchronized (lock) { + if (container != null) { + SamzaContainerStatus status = container.getStatus(); + if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) { + boolean shutdownComplete = false; + try { + LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, + processorId); + container.pause(); + shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete)); + } catch (IllegalContainerStateException icse) { + // Ignored since container is not running + LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); + shutdownComplete = true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e); + } catch (Exception e) { + LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); + } + if (!shutdownComplete) { + LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId); + container = null; + stop(); + } else { + LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); + } } else { - LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); + LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId); } } else { - LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId); + LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); } - } else { - LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); } } @Override public void onNewJobModel(String processorId, JobModel jobModel) { - jcContainerShutdownLatch = new CountDownLatch(1); - - SamzaContainerListener containerListener = new SamzaContainerListener() { - @Override - public void onContainerStart() { - if (!processorOnStartCalled) { - // processorListener is called on start only the first time the container starts. - // It is not called after every re-balance of partitions among the processors - processorOnStartCalled = true; - if (processorListener != null) { - processorListener.onStart(); - } - } else { - LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId); - } - } - - @Override - public void onContainerStop(boolean pauseByJm) { - if (pauseByJm) { - LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); - if (jcContainerShutdownLatch != null) { - jcContainerShutdownLatch.countDown(); - } - } else { // sp.stop was called or container stopped by itself - LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); - container = null; // this guarantees that stop() doesn't try to stop container again - stop(); - } - } - - @Override - public void onContainerFailed(Throwable t) { - if (jcContainerShutdownLatch != null) { - jcContainerShutdownLatch.countDown(); - } else { - LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting."); - } - containerException = t; - LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException); - container = null; - stop(); - } - }; - - container = createSamzaContainer(processorId, jobModel); - container.setContainerListener(containerListener); - LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).build(); - executorService = Executors.newSingleThreadExecutor(threadFactory); - executorService.submit(container::run); + synchronized (lock) { + jcContainerShutdownLatch = new CountDownLatch(1); + container = createSamzaContainer(processorId, jobModel); + container.setContainerListener(new ContainerListener()); + LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); + executorService.submit(container::run); + } } @Override @@ -324,4 +282,52 @@ public class StreamProcessor { SamzaContainer getContainer() { return container; } + + class ContainerListener implements SamzaContainerListener { + + @Override + public void onContainerStart() { + if (!processorOnStartCalled) { + // processorListener is called on start only the first time the container starts. + // It is not called after every re-balance of partitions among the processors + processorOnStartCalled = true; + if (processorListener != null) { + processorListener.onStart(); + } + } else { + LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId); + } + } + + @Override + public void onContainerStop(boolean pauseByJm) { + if (pauseByJm) { + LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); + if (jcContainerShutdownLatch != null) { + jcContainerShutdownLatch.countDown(); + } + } else { // sp.stop was called or container stopped by itself + LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); + synchronized (lock) { + container = null; // this guarantees that stop() doesn't try to stop container again + stop(); + } + } + } + + @Override + public void onContainerFailed(Throwable t) { + if (jcContainerShutdownLatch != null) { + jcContainerShutdownLatch.countDown(); + } else { + LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting."); + } + synchronized (lock) { + containerException = t; + LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException); + container = null; + stop(); + } + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/02153fa5/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 3f16f2b..74abf55 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.I0Itec.zkclient.IZkStateListener; import org.apache.commons.lang3.StringUtils; import org.apache.samza.checkpoint.CheckpointManager; @@ -58,8 +59,6 @@ import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.zookeeper.Watcher.Event.KeeperState.*; - /** * JobCoordinator for stand alone processor managed via Zookeeper. */ @@ -92,19 +91,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private final ZkJobCoordinatorMetrics metrics; private final Map<String, MetricsReporter> reporters; private final ZkLeaderElector leaderElector; + private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false); + private final StreamMetadataCache streamMetadataCache; + private final SystemAdmins systemAdmins; + private final int debounceTimeMs; + private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>(); - private StreamMetadataCache streamMetadataCache = null; - private SystemAdmins systemAdmins = null; - - @VisibleForTesting - ScheduleAfterDebounceTime debounceTimer = null; private JobCoordinatorListener coordinatorListener = null; private JobModel newJobModel; - private int debounceTimeMs; private boolean hasCreatedStreams = false; - private boolean initiatedShutdown = false; private String cachedJobModelVersion = null; - private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>(); + + @VisibleForTesting + ScheduleAfterDebounceTime debounceTimer; ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) { this.config = config; @@ -142,50 +141,49 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { } @Override - public synchronized void stop() { + public void stop() { // Make the shutdown idempotent - if (initiatedShutdown) { - LOG.debug("Job Coordinator shutdown is already in progress!"); - return; - } + if (initiatedShutdown.compareAndSet(false, true)) { - LOG.info("Shutting down Job Coordinator..."); - initiatedShutdown = true; - boolean shutdownSuccessful = false; + LOG.info("Shutting down JobCoordinator."); + boolean shutdownSuccessful = false; - // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that - // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader. - metrics.isLeader.set(false); + // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that + // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader. + metrics.isLeader.set(false); - try { - // todo: what does it mean for coordinator listener to be null? why not have it part of constructor? - if (coordinatorListener != null) { - coordinatorListener.onJobModelExpired(); - } + try { + // todo: what does it mean for coordinator listener to be null? why not have it part of constructor? + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } - debounceTimer.stopScheduler(); + debounceTimer.stopScheduler(); - LOG.debug("Shutting down ZkController."); - zkController.stop(); + LOG.debug("Shutting down ZkController."); + zkController.stop(); - LOG.debug("Shutting down system admins."); - systemAdmins.stop(); + LOG.debug("Shutting down system admins."); + systemAdmins.stop(); - LOG.debug("Shutting down metrics."); - shutdownMetrics(); + LOG.debug("Shutting down metrics."); + shutdownMetrics(); - if (coordinatorListener != null) { - coordinatorListener.onCoordinatorStop(); - } + if (coordinatorListener != null) { + coordinatorListener.onCoordinatorStop(); + } - shutdownSuccessful = true; - } catch (Throwable t) { - LOG.error("Encountered errors during job coordinator stop.", t); - if (coordinatorListener != null) { - coordinatorListener.onCoordinatorFailure(t); + shutdownSuccessful = true; + } catch (Throwable t) { + LOG.error("Encountered errors during job coordinator stop.", t); + if (coordinatorListener != null) { + coordinatorListener.onCoordinatorFailure(t); + } + } finally { + LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful); } - } finally { - LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful); + } else { + LOG.info("Job Coordinator shutdown is in progress!"); } }
