This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.3.1 by this push:
     new 27aa391  SAMZA-2305: Stream processor should ensure previous container 
is stopped during a rebalance (#1213)
27aa391 is described below

commit 27aa391821e3ac457a00cb991bd2d779e3ca28bd
Author: mynameborat <[email protected]>
AuthorDate: Tue Dec 10 14:04:45 2019 -0800

    SAMZA-2305: Stream processor should ensure previous container is stopped 
during a rebalance (#1213)
    
    Stream processor should ensure previous container is stopped during a 
rebalance
---
 .../org/apache/samza/storage/StorageEngine.java    |  6 +-
 .../apache/samza/processor/StreamProcessor.java    | 72 ++++++++++++++++------
 .../NonTransactionalStateTaskRestoreManager.java   |  2 +-
 .../org/apache/samza/storage/StorageRecovery.java  |  9 ++-
 .../apache/samza/storage/TaskRestoreManager.java   | 11 +++-
 .../TransactionalStateTaskRestoreManager.java      |  2 +-
 .../apache/samza/container/SamzaContainer.scala    | 22 ++++++-
 .../samza/storage/ContainerStorageManager.java     | 60 ++++++++++++------
 .../org/apache/samza/system/SystemConsumers.scala  | 21 ++++++-
 .../org/apache/samza/system/SystemProducers.scala  | 17 ++++-
 .../samza/processor/TestStreamProcessor.java       | 62 ++++++++++++++++++-
 .../samza/container/TestSamzaContainer.scala       | 42 +++++++++++++
 .../samza/storage/TestContainerStorageManager.java |  4 +-
 .../samza/storage/kv/KeyValueStorageEngine.scala   |  7 ++-
 .../storage/kv/TestKeyValueStorageEngine.scala     | 48 +++++++++++++++
 15 files changed, 332 insertions(+), 53 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java 
b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 8add1de..7b12c85 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -43,11 +43,15 @@ public interface StorageEngine {
    * provided in one {@link java.util.Iterator} and not deserialized for
    * efficiency, allowing the implementation to optimize replay, if possible.
    *
+   * The implementers are expected to handle interrupt signals to the 
restoration thread and rethrow the exception to
+   * upstream so that {@code TaskRestoreManager} can accordingly notify the 
container.
+   *
    * @param envelopes
    *          An iterator of envelopes that the storage engine can read from to
    *          restore its state on startup.
+   * @throws InterruptedException when received interrupts during restoration
    */
-  void restore(ChangelogSSPIterator envelopes);
+  void restore(ChangelogSSPIterator envelopes) throws InterruptedException;
 
   /**
    * Flush any cached messages
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index f952f6e..5a39520 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -85,16 +85,18 @@ import scala.Option;
  *
  * Describes the valid state transitions of the {@link StreamProcessor}.
  *
- *
- *                                                                             
                      ────────────────────────────────
- *                                                                             
                     │                               │
- *                                                                             
                     │                               │
- *                                                                             
                     │                               │
- *                                                                             
                     │                               │
- *     New                                StreamProcessor.start()          
Rebalance triggered      V        Receives JobModel      │
- *  StreamProcessor ──────────▶   NEW ───────────────────────────▶ STARTED 
──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING
- *   Creation                      │                                 │     by 
group leader          │     and starts Container      │
- *                                 │                                 │         
                     │                               │
+ *                                                                             
                        Receives another re-balance request when the container
+ *                                                                             
                        from the previous re-balance is still in INIT phase
+ *                                                                             
                      ────────────────────────────────────────────────
+ *                                                                             
                     │                               │                │
+ *                                                                             
                     │                               │                │
+ *                                                                             
                     │                               │                │
+ *                                                                             
                     │                               │                │
+ *     New                                StreamProcessor.start()          
Rebalance triggered      V        Receives JobModel      │                │
+ *  StreamProcessor ──────────▶   NEW ───────────────────────────▶ STARTED 
──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING           │
+ *   Creation                      │                                 │     by 
group leader          │     and starts │Container     │                │
+ *                                 │                                 │         
                     │                │              │                │
+ *                                 │                                 │         
                     │                 ───────────────────────────────
  *                             Stre│amProcessor.stop()           
Stre│amProcessor.stop()        Stre│amProcessor.stop()         
Stre│amProcessor.stop()
  *                                 │                                 │         
                     │                               │
  *                                 │                                 │         
                     │                               │
@@ -133,7 +135,6 @@ public class StreamProcessor {
   private final Config config;
   private final long taskShutdownMs;
   private final String processorId;
-  private final ExecutorService containerExcecutorService;
   private final Object lock = new Object();
   private final MetricsRegistryMap metricsRegistry;
   private final MetadataStore metadataStore;
@@ -176,6 +177,9 @@ public class StreamProcessor {
   @VisibleForTesting
   JobCoordinatorListener jobCoordinatorListener = null;
 
+  @VisibleForTesting
+  ExecutorService containerExecutorService;
+
   /**
    * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, 
ProcessorLifecycleListener, JobCoordinator)}, except
    * it creates a {@link JobCoordinator} instead of accepting it as an 
argument.
@@ -288,11 +292,15 @@ public class StreamProcessor {
         : createJobCoordinator(config, processorId, metricsRegistry, 
metadataStore);
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
-    ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
-    this.containerExcecutorService = 
Executors.newSingleThreadExecutor(threadFactory);
+    this.containerExecutorService = createExecutorService();
     this.processorListener = listenerFactory.createInstance(this);
   }
 
+  private ExecutorService createExecutorService() {
+    ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
+    return Executors.newSingleThreadExecutor(threadFactory);
+  }
+
   /**
    * Asynchronously starts this {@link StreamProcessor}.
    * <p>
@@ -348,7 +356,7 @@ public class StreamProcessor {
           boolean hasContainerShutdown = stopSamzaContainer();
           if (!hasContainerShutdown) {
             LOGGER.info("Interrupting the container: {} thread to die.", 
container);
-            containerExcecutorService.shutdownNow();
+            containerExecutorService.shutdownNow();
           }
         } catch (Throwable throwable) {
           LOGGER.error(String.format("Exception occurred on container: %s 
shutdown of stream processor: %s.", container, processorId), throwable);
@@ -444,6 +452,19 @@ public class StreamProcessor {
     return hasContainerShutdown;
   }
 
+  private boolean interruptContainerAndShutdownExecutorService() {
+    try {
+      containerExecutorService.shutdownNow();
+      containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.info("Received an interrupt during interrupting container. 
Proceeding to check if the container callback "
+          + "decremented the shutdown latch. ");
+    }
+
+    // we call interrupt successful as long as the shut down latch is 
decremented by the container call back.
+    return containerShutdownLatch.getCount() == 0;
+  }
+
   private JobCoordinatorListener createJobCoordinatorListener() {
     return new JobCoordinatorListener() {
 
@@ -461,8 +482,23 @@ public class StreamProcessor {
             } else {
               LOGGER.info("Container: {} shutdown completed for stream 
processor: {}.", container, processorId);
             }
+          } else if (state == State.IN_REBALANCE) {
+            if (container != null) {
+              boolean hasContainerShutdown = 
interruptContainerAndShutdownExecutorService();
+              if (!hasContainerShutdown) {
+                LOGGER.warn("Job model expire unsuccessful. Failed to 
interrupt container: {} safely. "
+                    + "Stopping the stream processor: {}", container, 
processorId);
+                state = State.STOPPING;
+                jobCoordinator.stop();
+              } else {
+                containerExecutorService = createExecutorService();
+              }
+            } else {
+              LOGGER.info("Ignoring Job model expired since a rebalance is 
already in progress");
+            }
           } else {
-            LOGGER.info("Ignoring onJobModelExpired invocation since the 
current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, 
State.STARTED));
+            LOGGER.info("Ignoring onJobModelExpired invocation since the 
current state is {} and not in {}.", state,
+                ImmutableList.of(State.RUNNING, State.STARTED, 
State.IN_REBALANCE));
           }
         }
       }
@@ -475,7 +511,7 @@ public class StreamProcessor {
             container = createSamzaContainer(processorId, jobModel);
             container.setContainerListener(new ContainerListener());
             LOGGER.info("Starting the container: {} for the stream processor: 
{}.", container, processorId);
-            containerExcecutorService.submit(container);
+            containerExecutorService.submit(container);
           } else {
             LOGGER.info("Ignoring onNewJobModel invocation since the current 
state is {} and not {}.", state, State.IN_REBALANCE);
           }
@@ -490,7 +526,7 @@ public class StreamProcessor {
 
           // we only want to interrupt when container shutdown times out.
           if (!hasContainerShutdown) {
-            containerExcecutorService.shutdownNow();
+            containerExecutorService.shutdownNow();
           }
           state = State.STOPPED;
         }
@@ -508,7 +544,7 @@ public class StreamProcessor {
 
           // we only want to interrupt when container shutdown times out.
           if (!hasContainerShutdown) {
-            containerExcecutorService.shutdownNow();
+            containerExecutorService.shutdownNow();
           }
           state = State.STOPPED;
         }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index 5952647..56ef822 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -317,7 +317,7 @@ class NonTransactionalStateTaskRestoreManager implements 
TaskRestoreManager {
    * Restore each store in taskStoresToRestore sequentially
    */
   @Override
-  public void restore() {
+  public void restore() throws InterruptedException {
     for (String storeName : taskStoresToRestore) {
       LOG.info("Restoring store: {} for task: {}", storeName, 
taskModel.getTaskName());
       SystemConsumer systemConsumer = storeConsumers.get(storeName);
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 2cbeddc..11237a8 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -117,7 +117,14 @@ public class StorageRecovery extends CommandLine {
 
     systemAdmins.start();
     this.containerStorageManagers.forEach((containerName, 
containerStorageManager) -> {
-        containerStorageManager.start();
+        try {
+          containerStorageManager.start();
+        } catch (InterruptedException e) {
+          // we can ignore the exception since its only used in the context of 
a command line tool and bubbling the
+          // exception upstream isn't needed.
+          LOG.warn("Received an interrupt during store restoration for 
container {}."
+              + " Proceeding with the next container", containerName);
+        }
       });
     this.containerStorageManagers.forEach((containerName, 
containerStorageManager) -> {
         containerStorageManager.shutdown();
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java 
b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
index 2bdeeea..f60e148 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
@@ -35,8 +35,17 @@ public interface TaskRestoreManager {
 
   /**
    * Restore state from checkpoints, state snapshots and changelog.
+   * Currently, store restoration happens on a separate thread pool within 
{@code ContainerStorageManager}. In case of
+   * interrupt/shutdown signals from {@code SamzaContainer}, {@code 
ContainerStorageManager} may interrupt the restore
+   * thread.
+   *
+   * Note: Typically, interrupt signals don't bubble up as {@link 
InterruptedException} unless the restore thread is
+   * waiting on IO/network. In case of busy looping, implementors are expected 
to check the interrupt status of the
+   * thread periodically and shutdown gracefully before throwing {@link 
InterruptedException} upstream.
+   * {@code SamzaContainer} will not wait for clean up and the interrupt 
signal is the best effort by the container
+   * to notify that its shutting down.
    */
-  void restore();
+  void restore() throws InterruptedException;
 
   /**
    * Stop all persistent stores after restoring.
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 1e54ea1..6f59a32 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -117,7 +117,7 @@ public class TransactionalStateTaskRestoreManager 
implements TaskRestoreManager
   }
 
   @Override
-  public void restore() {
+  public void restore() throws InterruptedException {
     Map<String, RestoreOffsets> storesToRestore = storeActions.storesToRestore;
 
     for (Map.Entry<String, RestoreOffsets> entry : storesToRestore.entrySet()) 
{
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d15c109..092175d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -27,7 +27,6 @@ import java.time.Duration
 import java.util
 import java.util.{Base64, Optional}
 import java.util.concurrent.{ExecutorService, Executors, 
ScheduledExecutorService, TimeUnit}
-import java.util.stream.Collectors
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -769,6 +768,27 @@ class SamzaContainer(
       else
         Thread.sleep(Long.MaxValue)
     } catch {
+      case e: InterruptedException =>
+        /*
+         * We don't want to categorize interrupts as failure since the only 
place the container thread gets interrupted within
+         * our code inside stream processor is during the following two 
scenarios
+         *    1. During a re-balance, if the container has not started or 
hasn't reported start status to StreamProcessor.
+         *       Subsequently stream processor attempts to interrupt the 
container thread before proceeding to join the barrier
+         *       to agree on the new work assignment.
+         *    2. During shutdown signals to stream processor (external or 
internal), the stream processor signals the container to
+         *       shutdown and waits for `task.shutdown.ms` before forcefully 
shutting down the container executor service which in
+         *       turn interrupts the container thread.
+         *
+         * In the both of these scenarios, the failure cause is either 
captured externally (timing out scenario) or internally
+         * (failed attempt to shut down the container). The act of 
interrupting the container thread is an explicit intent to shutdown
+         * the container since it is not capable of reacting to shutdown 
signals in all scenarios.
+         *
+         */
+        if (status.equals(SamzaContainerStatus.STARTED)) {
+          warn("Received an interrupt in run loop.", e)
+        } else {
+          warn("Received an interrupt during initialization.", e)
+        }
       case e: Throwable =>
         if (status.equals(SamzaContainerStatus.STARTED)) {
           error("Caught exception/error in run loop.", e)
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index afd3e69..8623e5d 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -632,7 +632,7 @@ public class ContainerStorageManager {
     return 
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
   }
 
-  public void start() throws SamzaException {
+  public void start() throws SamzaException, InterruptedException {
     Map<SystemStreamPartition, String> checkpointedChangelogSSPOffsets = new 
HashMap<>();
     if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
       getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) 
-> {
@@ -657,7 +657,8 @@ public class ContainerStorageManager {
   }
 
   // Restoration of all stores, in parallel across tasks
-  private void restoreStores(Map<SystemStreamPartition, String> 
checkpointedChangelogSSPOffsets) {
+  private void restoreStores(Map<SystemStreamPartition, String> 
checkpointedChangelogSSPOffsets)
+      throws InterruptedException {
     LOG.info("Store Restore started");
 
     // initialize each TaskStorageManager
@@ -665,7 +666,7 @@ public class ContainerStorageManager {
        taskStorageManager.init(checkpointedChangelogSSPOffsets));
 
     // Start each store consumer once
-    this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> 
systemConsumer.start());
+    
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
 
     // Create a thread pool for parallel restores (and stopping of persistent 
stores)
     ExecutorService executorService = 
Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
@@ -684,6 +685,11 @@ public class ContainerStorageManager {
     for (Future future : taskRestoreFutures) {
       try {
         future.get();
+      } catch (InterruptedException e) {
+        LOG.warn("Received an interrupt during store restoration. Issuing 
interrupts to the store restoration workers to exit "
+            + "prematurely without restoring full state.");
+        executorService.shutdownNow();
+        throw e;
       } catch (Exception e) {
         LOG.error("Exception when restoring ", e);
         throw new SamzaException("Exception when restoring ", e);
@@ -693,7 +699,7 @@ public class ContainerStorageManager {
     executorService.shutdown();
 
     // Stop each store consumer once
-    this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> 
systemConsumer.stop());
+    
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
 
     // Now re-create persistent stores in read-write mode, leave 
non-persistent stores as-is
     recreatePersistentTaskStoresInReadWriteMode(this.containerModel, 
jobContext, containerContext,
@@ -791,6 +797,14 @@ public class ContainerStorageManager {
       }
 
     } catch (InterruptedException e) {
+      LOG.warn("Received an interrupt during side inputs store restoration."
+          + " Exiting prematurely without completing store restore.");
+      /*
+       * We want to stop side input restoration and rethrow the exception 
upstream. Container should handle the
+       * interrupt exception and shutdown the components and cleaning up the 
resource. We don't want to clean up the
+       * resources prematurely here.
+       */
+      shouldShutdown = true; // todo: should we cancel the flush future right 
away or wait for container to handle it as part of shutdown sequence?
       throw new SamzaException("Side inputs read was interrupted", e);
     }
 
@@ -920,22 +934,32 @@ public class ContainerStorageManager {
     @Override
     public Void call() {
       long startTime = System.currentTimeMillis();
-      LOG.info("Starting stores in task instance {}", 
this.taskName.getTaskName());
-      taskRestoreManager.restore();
-
-      // Stop all persistent stores after restoring. Certain persistent stores 
opened in BulkLoad mode are compacted
-      // on stop, so paralleling stop() also parallelizes their compaction (a 
time-intensive operation).
-      taskRestoreManager.stopPersistentStores();
-
-      long timeToRestore = System.currentTimeMillis() - startTime;
-
-      if (this.samzaContainerMetrics != null) {
-        Gauge taskGauge = 
this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName,
 null);
-
-        if (taskGauge != null) {
-          taskGauge.set(timeToRestore);
+      try {
+        LOG.info("Starting stores in task instance {}", 
this.taskName.getTaskName());
+        taskRestoreManager.restore();
+      } catch (InterruptedException e) {
+        /*
+         * The container thread is the only external source to trigger an 
interrupt to the restoration thread and thus
+         * it is okay to swallow this exception and not propagate it upstream. 
If the container is interrupted during
+         * the store restoration, ContainerStorageManager signals the restore 
workers to abandon restoration and then
+         * finally propagates the exception upstream to trigger container 
shutdown.
+         */
+        LOG.warn("Received an interrupt during store restoration for task: 
{}.", this.taskName.getTaskName());
+      } finally {
+        // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
+        // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
+        taskRestoreManager.stopPersistentStores();
+        long timeToRestore = System.currentTimeMillis() - startTime;
+
+        if (this.samzaContainerMetrics != null) {
+          Gauge taskGauge = 
this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName,
 null);
+
+          if (taskGauge != null) {
+            taskGauge.set(timeToRestore);
+          }
         }
       }
+
       return null;
     }
   }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b41c245..580fd60 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -138,6 +138,13 @@ class SystemConsumers (
   private val emptySystemStreamPartitionsBySystem = new HashMap[String, 
Set[SystemStreamPartition]]()
 
   /**
+    * Denotes if the SystemConsumers have started. The flag is useful in the 
event of shutting down since interrupt
+    * on Samza Container will shutdown components and container currently 
doesn't track what components have started
+    * successfully.
+    */
+  private var started = false
+
+  /**
    * Default timeout to noNewMessagesTimeout. Every time SystemConsumers
    * receives incoming messages, it sets timeout to 0. Every time
    * SystemConsumers receives no new incoming messages from the MessageChooser,
@@ -185,15 +192,23 @@ class SystemConsumers (
 
     chooser.start
 
+    started = true
+
     refresh
   }
 
   def stop {
-    debug("Stopping consumers.")
+    if (started) {
+      debug("Stopping consumers.")
 
-    consumers.values.foreach(_.stop)
+      consumers.values.foreach(_.stop)
 
-    chooser.stop
+      chooser.stop
+
+      started = false
+    } else {
+      debug("Ignoring the consumers stop request since it never started.")
+    }
   }
 
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
index e8ce961..1744195 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -35,16 +35,29 @@ class SystemProducers(
    */
   dropSerializationError: Boolean = false) extends Logging {
 
+  /**
+    * Denotes if the SystemConsumers have started. The flag is useful in the 
event of shutting down since interrupt
+    * on Samza Container will shutdown components and container currently 
doesn't track what components have started
+    * successfully.
+    */
+  private var started = false
+
   def start {
     debug("Starting producers.")
 
     producers.values.foreach(_.start)
+    started = true
   }
 
   def stop {
-    debug("Stopping producers.")
+    if (started) {
+      debug("Stopping producers.")
 
-    producers.values.foreach(_.stop)
+      producers.values.foreach(_.stop)
+      started = false
+    } else {
+      debug("Ignoring the producers stop request since it never started.")
+    }
   }
 
   def register(source: String) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 6d78b77..0978738 100644
--- 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -27,6 +27,7 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -55,6 +56,7 @@ import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -467,13 +469,67 @@ public class TestStreamProcessor {
     assertEquals(State.STOPPING, streamProcessor.getState());
     Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown();
     Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
+  }
 
-    // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should 
be a NO_OP.
-    streamProcessor.state = State.IN_REBALANCE;
+  @Test
+  public void testJobModelExpiredDuringAnExistingRebalance() {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
+    ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class);
+    MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", 
config, new HashMap<>(), null,
+        Optional.empty(), Optional.empty(), Optional.empty(), sp -> 
lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
 
-    streamProcessor.jobCoordinatorListener.onJobModelExpired();
+    runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService, 
false);
 
     assertEquals(State.IN_REBALANCE, streamProcessor.state);
+    assertNotEquals(mockExecutorService, 
streamProcessor.containerExecutorService);
+    Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow();
+  }
+
+  @Test
+  public void 
testJobModelExpiredDuringAnExistingRebalanceWithContainerInterruptFailed() {
+    JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
+    ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class);
+    MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", 
config, new HashMap<>(), null,
+        Optional.empty(), Optional.empty(), Optional.empty(), sp -> 
lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
+
+    runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService, 
true);
+
+    assertEquals(State.STOPPING, streamProcessor.state);
+    assertEquals(mockExecutorService, 
streamProcessor.containerExecutorService);
+    Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow();
+    Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
+  }
+
+  private void runJobModelExpireDuringRebalance(StreamProcessor 
streamProcessor, ExecutorService executorService,
+      boolean failContainerInterrupt) {
+    SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+    CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    /*
+     * When there is an initialized container that hasn't started and the 
stream processor is still in re-balance phase,
+     * subsequent job model expire request should attempt to interrupt the 
existing container to safely shut it down
+     * before proceeding to join the barrier. As part of safe shutdown 
sequence,  we want to ensure shutdownNow is invoked
+     * on the existing executorService to signal interrupt and make sure new 
executor service is created.
+     */
+
+    Mockito.when(executorService.shutdownNow()).thenAnswer(ctx -> {
+        if (!failContainerInterrupt) {
+          shutdownLatch.countDown();
+        }
+        return null;
+      });
+    Mockito.when(executorService.isShutdown()).thenReturn(true);
+
+    streamProcessor.state = State.IN_REBALANCE;
+    streamProcessor.container = mockSamzaContainer;
+    streamProcessor.containerExecutorService = executorService;
+    streamProcessor.containerShutdownLatch = shutdownLatch;
+
+    streamProcessor.jobCoordinatorListener.onJobModelExpired();
   }
 
   @Test
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 78136bf..69223df 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -35,6 +35,8 @@ import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.mockito.Matchers.{any, notNull}
 import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
 import org.mockito.{Mock, Mockito, MockitoAnnotations}
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
@@ -140,6 +142,46 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
   }
 
   @Test
+  def testInterruptDuringStoreRestorationShutdownContainer(): Unit = {
+    when(this.containerStorageManager.start())
+      .thenAnswer(new Answer[Void] {
+        override def answer(mock: InvocationOnMock): Void = {
+        Thread.sleep(1000)
+        throw new InterruptedException("Injecting interrupt into container 
storage manager")
+      }
+      })
+
+    this.samzaContainer.run
+
+    assertEquals(SamzaContainerStatus.STOPPED, this.samzaContainer.getStatus())
+    verify(this.samzaContainerListener).beforeStart()
+    verify(this.samzaContainerListener).afterStop()
+    verify(this.samzaContainerListener, never()).afterFailure(any())
+    verify(this.runLoop, times(0)).run()
+  }
+
+  @Test
+  def testInterruptDuringStoreRestorationWithErrorsDuringContainerShutdown(): 
Unit = {
+    when(this.containerStorageManager.start())
+      .thenAnswer(new Answer[Void] {
+        override def answer(mock: InvocationOnMock): Void = {
+          Thread.sleep(1000)
+          throw new InterruptedException("Injecting interrupt into container 
storage manager")
+        }
+      })
+
+    when(this.taskInstance.shutdownTask).thenThrow(new 
RuntimeException("Trigger a shutdown, please."))
+
+    this.samzaContainer.run
+
+    assertEquals(SamzaContainerStatus.FAILED, this.samzaContainer.getStatus())
+    verify(this.samzaContainerListener).beforeStart()
+    verify(this.samzaContainerListener).afterFailure(any())
+    verify(this.samzaContainerListener, never()).afterStop()
+    verify(this.runLoop, times(0)).run()
+  }
+
+  @Test
   def testFailureDuringShutdown(): Unit = {
     doNothing().when(this.runLoop).run() // run loop completes successfully
     when(this.taskInstance.shutdownTask).thenThrow(new 
RuntimeException("Trigger a shutdown, please."))
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 43872e9..bdc7e0e 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -101,7 +101,7 @@ public class TestContainerStorageManager {
    * Method to create a containerStorageManager with mocked dependencies
    */
   @Before
-  public void setUp() {
+  public void setUp() throws InterruptedException {
     taskRestoreMetricGauges = new HashMap<>();
     this.tasks = new HashMap<>();
     this.taskInstanceMetrics = new HashMap<>();
@@ -243,7 +243,7 @@ public class TestContainerStorageManager {
   }
 
   @Test
-  public void testParallelismAndMetrics() {
+  public void testParallelismAndMetrics() throws InterruptedException {
     this.containerStorageManager.start();
     this.containerStorageManager.shutdown();
 
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index bc6778e..afba824 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -124,7 +124,7 @@ class KeyValueStorageEngine[K, V](
     val batch = new java.util.ArrayList[Entry[Array[Byte], 
Array[Byte]]](batchSize)
     var lastBatchFlushed = false
 
-    while(iterator.hasNext) {
+    while(iterator.hasNext && !Thread.currentThread().isInterrupted) {
       val envelope = iterator.next()
       val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
       val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
@@ -200,6 +200,11 @@ class KeyValueStorageEngine[K, V](
 
     // flush the store and the changelog producer
     flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog 
producers. This only flushes the stores.
+
+    if (Thread.currentThread().isInterrupted) {
+      warn("Received an interrupt during store restoration. Exiting without 
restoring the full state.")
+      throw new InterruptedException("Received an interrupt during store 
restoration.")
+    }
   }
 
   def flush() = {
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index 25445c2..151fd84 100644
--- 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -21,8 +21,10 @@ package org.apache.samza.storage.kv
 
 import java.io.File
 import java.util.Arrays
+import java.util.concurrent.{Callable, ExecutionException, ExecutorService, 
Executors}
 
 import org.apache.samza.Partition
+import org.apache.samza.context.Context
 import org.apache.samza.storage.StoreProperties
 import org.apache.samza.system.ChangelogSSPIterator.Mode
 import org.apache.samza.system.{ChangelogSSPIterator, IncomingMessageEnvelope, 
SystemStreamPartition}
@@ -30,6 +32,8 @@ import org.apache.samza.task.MessageCollector
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
 
 class TestKeyValueStorageEngine {
   var engine: KeyValueStorageEngine[String, String] = null
@@ -163,6 +167,50 @@ class TestKeyValueStorageEngine {
     assertEquals(15, metrics.restoredBytesGauge.getValue) // 3 keys * 2 
bytes/key +  3 msgs * 3 bytes/msg
   }
 
+  @Test
+  def testRestoreInterruptedThrowsInterruptException(): Unit = {
+    val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", 
new Partition(0))
+    val iterator = mock(classOf[ChangelogSSPIterator])
+    val executorService = Executors.newSingleThreadExecutor()
+    val restore = new Callable[Void] {
+      override def call(): Void = {
+        engine.restore(iterator)
+        null
+      }
+    }
+
+    when(iterator.hasNext)
+      .thenReturn(true)
+      .thenReturn(true)
+      .thenAnswer(new Answer[Boolean] {
+        override def answer(invocation: InvocationOnMock): Boolean = {
+          executorService.shutdownNow()
+          true
+        }
+      })
+      .thenReturn(false)
+    when(iterator.next())
+      .thenReturn(new IncomingMessageEnvelope(changelogSSP, "0", 
Array[Byte](1, 2), Array[Byte](3, 4, 5)))
+      .thenReturn(new IncomingMessageEnvelope(changelogSSP, "1", 
Array[Byte](2, 3), Array[Byte](4, 5, 6)))
+      .thenReturn(new IncomingMessageEnvelope(changelogSSP, "2", 
Array[Byte](3, 4), Array[Byte](5, 6, 7)))
+    when(iterator.getMode)
+      .thenReturn(Mode.RESTORE)
+      .thenReturn(Mode.RESTORE)
+      .thenReturn(Mode.RESTORE)
+
+    try {
+      val restoreFuture = executorService.submit(restore)
+      restoreFuture.get()
+      fail("Expected execution exception during restoration")
+    } catch {
+      case e: ExecutionException => {
+        assertTrue(e.getCause.isInstanceOf[InterruptedException])
+        // Make sure we don't restore any more records and bail out
+        assertEquals(2, metrics.restoredMessagesGauge.getValue)
+      }
+    }
+  }
+
   @Test(expected = classOf[IllegalStateException])
   def testThrowsIfIteratorModeChangesFromTrimToRestore(): Unit = {
     val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", 
new Partition(0))

Reply via email to