This is an automated email from the ASF dual-hosted git repository.
lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.3.1 by this push:
new 27aa391 SAMZA-2305: Stream processor should ensure previous container
is stopped during a rebalance (#1213)
27aa391 is described below
commit 27aa391821e3ac457a00cb991bd2d779e3ca28bd
Author: mynameborat <[email protected]>
AuthorDate: Tue Dec 10 14:04:45 2019 -0800
SAMZA-2305: Stream processor should ensure previous container is stopped
during a rebalance (#1213)
Stream processor should ensure previous container is stopped during a
rebalance
---
.../org/apache/samza/storage/StorageEngine.java | 6 +-
.../apache/samza/processor/StreamProcessor.java | 72 ++++++++++++++++------
.../NonTransactionalStateTaskRestoreManager.java | 2 +-
.../org/apache/samza/storage/StorageRecovery.java | 9 ++-
.../apache/samza/storage/TaskRestoreManager.java | 11 +++-
.../TransactionalStateTaskRestoreManager.java | 2 +-
.../apache/samza/container/SamzaContainer.scala | 22 ++++++-
.../samza/storage/ContainerStorageManager.java | 60 ++++++++++++------
.../org/apache/samza/system/SystemConsumers.scala | 21 ++++++-
.../org/apache/samza/system/SystemProducers.scala | 17 ++++-
.../samza/processor/TestStreamProcessor.java | 62 ++++++++++++++++++-
.../samza/container/TestSamzaContainer.scala | 42 +++++++++++++
.../samza/storage/TestContainerStorageManager.java | 4 +-
.../samza/storage/kv/KeyValueStorageEngine.scala | 7 ++-
.../storage/kv/TestKeyValueStorageEngine.scala | 48 +++++++++++++++
15 files changed, 332 insertions(+), 53 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 8add1de..7b12c85 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -43,11 +43,15 @@ public interface StorageEngine {
* provided in one {@link java.util.Iterator} and not deserialized for
* efficiency, allowing the implementation to optimize replay, if possible.
*
+ * The implementers are expected to handle interrupt signals to the
restoration thread and rethrow the exception to
+ * upstream so that {@code TaskRestoreManager} can accordingly notify the
container.
+ *
* @param envelopes
* An iterator of envelopes that the storage engine can read from to
* restore its state on startup.
+ * @throws InterruptedException when received interrupts during restoration
*/
- void restore(ChangelogSSPIterator envelopes);
+ void restore(ChangelogSSPIterator envelopes) throws InterruptedException;
/**
* Flush any cached messages
diff --git
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index f952f6e..5a39520 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -85,16 +85,18 @@ import scala.Option;
*
* Describes the valid state transitions of the {@link StreamProcessor}.
*
- *
- *
────────────────────────────────
- *
│ │
- *
│ │
- *
│ │
- *
│ │
- * New StreamProcessor.start()
Rebalance triggered V Receives JobModel │
- * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED
──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING
- * Creation │ │ by
group leader │ and starts Container │
- * │ │
│ │
+ *
Receives another re-balance request when the container
+ *
from the previous re-balance is still in INIT phase
+ *
────────────────────────────────────────────────
+ *
│ │ │
+ *
│ │ │
+ *
│ │ │
+ *
│ │ │
+ * New StreamProcessor.start()
Rebalance triggered V Receives JobModel │ │
+ * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED
──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING │
+ * Creation │ │ by
group leader │ and starts │Container │ │
+ * │ │
│ │ │ │
+ * │ │
│ ───────────────────────────────
* Stre│amProcessor.stop()
Stre│amProcessor.stop() Stre│amProcessor.stop()
Stre│amProcessor.stop()
* │ │
│ │
* │ │
│ │
@@ -133,7 +135,6 @@ public class StreamProcessor {
private final Config config;
private final long taskShutdownMs;
private final String processorId;
- private final ExecutorService containerExcecutorService;
private final Object lock = new Object();
private final MetricsRegistryMap metricsRegistry;
private final MetadataStore metadataStore;
@@ -176,6 +177,9 @@ public class StreamProcessor {
@VisibleForTesting
JobCoordinatorListener jobCoordinatorListener = null;
+ @VisibleForTesting
+ ExecutorService containerExecutorService;
+
/**
* Same as {@link #StreamProcessor(String, Config, Map, TaskFactory,
ProcessorLifecycleListener, JobCoordinator)}, except
* it creates a {@link JobCoordinator} instead of accepting it as an
argument.
@@ -288,11 +292,15 @@ public class StreamProcessor {
: createJobCoordinator(config, processorId, metricsRegistry,
metadataStore);
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
- ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
- this.containerExcecutorService =
Executors.newSingleThreadExecutor(threadFactory);
+ this.containerExecutorService = createExecutorService();
this.processorListener = listenerFactory.createInstance(this);
}
+ private ExecutorService createExecutorService() {
+ ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
+ return Executors.newSingleThreadExecutor(threadFactory);
+ }
+
/**
* Asynchronously starts this {@link StreamProcessor}.
* <p>
@@ -348,7 +356,7 @@ public class StreamProcessor {
boolean hasContainerShutdown = stopSamzaContainer();
if (!hasContainerShutdown) {
LOGGER.info("Interrupting the container: {} thread to die.",
container);
- containerExcecutorService.shutdownNow();
+ containerExecutorService.shutdownNow();
}
} catch (Throwable throwable) {
LOGGER.error(String.format("Exception occurred on container: %s
shutdown of stream processor: %s.", container, processorId), throwable);
@@ -444,6 +452,19 @@ public class StreamProcessor {
return hasContainerShutdown;
}
+ private boolean interruptContainerAndShutdownExecutorService() {
+ try {
+ containerExecutorService.shutdownNow();
+ containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.info("Received an interrupt during interrupting container.
Proceeding to check if the container callback "
+ + "decremented the shutdown latch. ");
+ }
+
+ // we call interrupt successful as long as the shut down latch is
decremented by the container call back.
+ return containerShutdownLatch.getCount() == 0;
+ }
+
private JobCoordinatorListener createJobCoordinatorListener() {
return new JobCoordinatorListener() {
@@ -461,8 +482,23 @@ public class StreamProcessor {
} else {
LOGGER.info("Container: {} shutdown completed for stream
processor: {}.", container, processorId);
}
+ } else if (state == State.IN_REBALANCE) {
+ if (container != null) {
+ boolean hasContainerShutdown =
interruptContainerAndShutdownExecutorService();
+ if (!hasContainerShutdown) {
+ LOGGER.warn("Job model expire unsuccessful. Failed to
interrupt container: {} safely. "
+ + "Stopping the stream processor: {}", container,
processorId);
+ state = State.STOPPING;
+ jobCoordinator.stop();
+ } else {
+ containerExecutorService = createExecutorService();
+ }
+ } else {
+ LOGGER.info("Ignoring Job model expired since a rebalance is
already in progress");
+ }
} else {
- LOGGER.info("Ignoring onJobModelExpired invocation since the
current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING,
State.STARTED));
+ LOGGER.info("Ignoring onJobModelExpired invocation since the
current state is {} and not in {}.", state,
+ ImmutableList.of(State.RUNNING, State.STARTED,
State.IN_REBALANCE));
}
}
}
@@ -475,7 +511,7 @@ public class StreamProcessor {
container = createSamzaContainer(processorId, jobModel);
container.setContainerListener(new ContainerListener());
LOGGER.info("Starting the container: {} for the stream processor:
{}.", container, processorId);
- containerExcecutorService.submit(container);
+ containerExecutorService.submit(container);
} else {
LOGGER.info("Ignoring onNewJobModel invocation since the current
state is {} and not {}.", state, State.IN_REBALANCE);
}
@@ -490,7 +526,7 @@ public class StreamProcessor {
// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
- containerExcecutorService.shutdownNow();
+ containerExecutorService.shutdownNow();
}
state = State.STOPPED;
}
@@ -508,7 +544,7 @@ public class StreamProcessor {
// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
- containerExcecutorService.shutdownNow();
+ containerExecutorService.shutdownNow();
}
state = State.STOPPED;
}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index 5952647..56ef822 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -317,7 +317,7 @@ class NonTransactionalStateTaskRestoreManager implements
TaskRestoreManager {
* Restore each store in taskStoresToRestore sequentially
*/
@Override
- public void restore() {
+ public void restore() throws InterruptedException {
for (String storeName : taskStoresToRestore) {
LOG.info("Restoring store: {} for task: {}", storeName,
taskModel.getTaskName());
SystemConsumer systemConsumer = storeConsumers.get(storeName);
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 2cbeddc..11237a8 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -117,7 +117,14 @@ public class StorageRecovery extends CommandLine {
systemAdmins.start();
this.containerStorageManagers.forEach((containerName,
containerStorageManager) -> {
- containerStorageManager.start();
+ try {
+ containerStorageManager.start();
+ } catch (InterruptedException e) {
+ // we can ignore the exception since its only used in the context of
a command line tool and bubbling the
+ // exception upstream isn't needed.
+ LOG.warn("Received an interrupt during store restoration for
container {}."
+ + " Proceeding with the next container", containerName);
+ }
});
this.containerStorageManagers.forEach((containerName,
containerStorageManager) -> {
containerStorageManager.shutdown();
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
index 2bdeeea..f60e148 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
@@ -35,8 +35,17 @@ public interface TaskRestoreManager {
/**
* Restore state from checkpoints, state snapshots and changelog.
+ * Currently, store restoration happens on a separate thread pool within
{@code ContainerStorageManager}. In case of
+ * interrupt/shutdown signals from {@code SamzaContainer}, {@code
ContainerStorageManager} may interrupt the restore
+ * thread.
+ *
+ * Note: Typically, interrupt signals don't bubble up as {@link
InterruptedException} unless the restore thread is
+ * waiting on IO/network. In case of busy looping, implementors are expected
to check the interrupt status of the
+ * thread periodically and shutdown gracefully before throwing {@link
InterruptedException} upstream.
+ * {@code SamzaContainer} will not wait for clean up and the interrupt
signal is the best effort by the container
+ * to notify that its shutting down.
*/
- void restore();
+ void restore() throws InterruptedException;
/**
* Stop all persistent stores after restoring.
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 1e54ea1..6f59a32 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -117,7 +117,7 @@ public class TransactionalStateTaskRestoreManager
implements TaskRestoreManager
}
@Override
- public void restore() {
+ public void restore() throws InterruptedException {
Map<String, RestoreOffsets> storesToRestore = storeActions.storesToRestore;
for (Map.Entry<String, RestoreOffsets> entry : storesToRestore.entrySet())
{
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d15c109..092175d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -27,7 +27,6 @@ import java.time.Duration
import java.util
import java.util.{Base64, Optional}
import java.util.concurrent.{ExecutorService, Executors,
ScheduledExecutorService, TimeUnit}
-import java.util.stream.Collectors
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -769,6 +768,27 @@ class SamzaContainer(
else
Thread.sleep(Long.MaxValue)
} catch {
+ case e: InterruptedException =>
+ /*
+ * We don't want to categorize interrupts as failure since the only
place the container thread gets interrupted within
+ * our code inside stream processor is during the following two
scenarios
+ * 1. During a re-balance, if the container has not started or
hasn't reported start status to StreamProcessor.
+ * Subsequently stream processor attempts to interrupt the
container thread before proceeding to join the barrier
+ * to agree on the new work assignment.
+ * 2. During shutdown signals to stream processor (external or
internal), the stream processor signals the container to
+ * shutdown and waits for `task.shutdown.ms` before forcefully
shutting down the container executor service which in
+ * turn interrupts the container thread.
+ *
+ * In the both of these scenarios, the failure cause is either
captured externally (timing out scenario) or internally
+ * (failed attempt to shut down the container). The act of
interrupting the container thread is an explicit intent to shutdown
+ * the container since it is not capable of reacting to shutdown
signals in all scenarios.
+ *
+ */
+ if (status.equals(SamzaContainerStatus.STARTED)) {
+ warn("Received an interrupt in run loop.", e)
+ } else {
+ warn("Received an interrupt during initialization.", e)
+ }
case e: Throwable =>
if (status.equals(SamzaContainerStatus.STARTED)) {
error("Caught exception/error in run loop.", e)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index afd3e69..8623e5d 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -632,7 +632,7 @@ public class ContainerStorageManager {
return
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
}
- public void start() throws SamzaException {
+ public void start() throws SamzaException, InterruptedException {
Map<SystemStreamPartition, String> checkpointedChangelogSSPOffsets = new
HashMap<>();
if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel)
-> {
@@ -657,7 +657,8 @@ public class ContainerStorageManager {
}
// Restoration of all stores, in parallel across tasks
- private void restoreStores(Map<SystemStreamPartition, String>
checkpointedChangelogSSPOffsets) {
+ private void restoreStores(Map<SystemStreamPartition, String>
checkpointedChangelogSSPOffsets)
+ throws InterruptedException {
LOG.info("Store Restore started");
// initialize each TaskStorageManager
@@ -665,7 +666,7 @@ public class ContainerStorageManager {
taskStorageManager.init(checkpointedChangelogSSPOffsets));
// Start each store consumer once
- this.storeConsumers.values().stream().distinct().forEach(systemConsumer ->
systemConsumer.start());
+
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
// Create a thread pool for parallel restores (and stopping of persistent
stores)
ExecutorService executorService =
Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
@@ -684,6 +685,11 @@ public class ContainerStorageManager {
for (Future future : taskRestoreFutures) {
try {
future.get();
+ } catch (InterruptedException e) {
+ LOG.warn("Received an interrupt during store restoration. Issuing
interrupts to the store restoration workers to exit "
+ + "prematurely without restoring full state.");
+ executorService.shutdownNow();
+ throw e;
} catch (Exception e) {
LOG.error("Exception when restoring ", e);
throw new SamzaException("Exception when restoring ", e);
@@ -693,7 +699,7 @@ public class ContainerStorageManager {
executorService.shutdown();
// Stop each store consumer once
- this.storeConsumers.values().stream().distinct().forEach(systemConsumer ->
systemConsumer.stop());
+
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
// Now re-create persistent stores in read-write mode, leave
non-persistent stores as-is
recreatePersistentTaskStoresInReadWriteMode(this.containerModel,
jobContext, containerContext,
@@ -791,6 +797,14 @@ public class ContainerStorageManager {
}
} catch (InterruptedException e) {
+ LOG.warn("Received an interrupt during side inputs store restoration."
+ + " Exiting prematurely without completing store restore.");
+ /*
+ * We want to stop side input restoration and rethrow the exception
upstream. Container should handle the
+ * interrupt exception and shutdown the components and cleaning up the
resource. We don't want to clean up the
+ * resources prematurely here.
+ */
+ shouldShutdown = true; // todo: should we cancel the flush future right
away or wait for container to handle it as part of shutdown sequence?
throw new SamzaException("Side inputs read was interrupted", e);
}
@@ -920,22 +934,32 @@ public class ContainerStorageManager {
@Override
public Void call() {
long startTime = System.currentTimeMillis();
- LOG.info("Starting stores in task instance {}",
this.taskName.getTaskName());
- taskRestoreManager.restore();
-
- // Stop all persistent stores after restoring. Certain persistent stores
opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction (a
time-intensive operation).
- taskRestoreManager.stopPersistentStores();
-
- long timeToRestore = System.currentTimeMillis() - startTime;
-
- if (this.samzaContainerMetrics != null) {
- Gauge taskGauge =
this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName,
null);
-
- if (taskGauge != null) {
- taskGauge.set(timeToRestore);
+ try {
+ LOG.info("Starting stores in task instance {}",
this.taskName.getTaskName());
+ taskRestoreManager.restore();
+ } catch (InterruptedException e) {
+ /*
+ * The container thread is the only external source to trigger an
interrupt to the restoration thread and thus
+ * it is okay to swallow this exception and not propagate it upstream.
If the container is interrupted during
+ * the store restoration, ContainerStorageManager signals the restore
workers to abandon restoration and then
+ * finally propagates the exception upstream to trigger container
shutdown.
+ */
+ LOG.warn("Received an interrupt during store restoration for task:
{}.", this.taskName.getTaskName());
+ } finally {
+ // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
+ // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
+ taskRestoreManager.stopPersistentStores();
+ long timeToRestore = System.currentTimeMillis() - startTime;
+
+ if (this.samzaContainerMetrics != null) {
+ Gauge taskGauge =
this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName,
null);
+
+ if (taskGauge != null) {
+ taskGauge.set(timeToRestore);
+ }
}
}
+
return null;
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b41c245..580fd60 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -138,6 +138,13 @@ class SystemConsumers (
private val emptySystemStreamPartitionsBySystem = new HashMap[String,
Set[SystemStreamPartition]]()
/**
+ * Denotes if the SystemConsumers have started. The flag is useful in the
event of shutting down since interrupt
+ * on Samza Container will shutdown components and container currently
doesn't track what components have started
+ * successfully.
+ */
+ private var started = false
+
+ /**
* Default timeout to noNewMessagesTimeout. Every time SystemConsumers
* receives incoming messages, it sets timeout to 0. Every time
* SystemConsumers receives no new incoming messages from the MessageChooser,
@@ -185,15 +192,23 @@ class SystemConsumers (
chooser.start
+ started = true
+
refresh
}
def stop {
- debug("Stopping consumers.")
+ if (started) {
+ debug("Stopping consumers.")
- consumers.values.foreach(_.stop)
+ consumers.values.foreach(_.stop)
- chooser.stop
+ chooser.stop
+
+ started = false
+ } else {
+ debug("Ignoring the consumers stop request since it never started.")
+ }
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
index e8ce961..1744195 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -35,16 +35,29 @@ class SystemProducers(
*/
dropSerializationError: Boolean = false) extends Logging {
+ /**
+ * Denotes if the SystemConsumers have started. The flag is useful in the
event of shutting down since interrupt
+ * on Samza Container will shutdown components and container currently
doesn't track what components have started
+ * successfully.
+ */
+ private var started = false
+
def start {
debug("Starting producers.")
producers.values.foreach(_.start)
+ started = true
}
def stop {
- debug("Stopping producers.")
+ if (started) {
+ debug("Stopping producers.")
- producers.values.foreach(_.stop)
+ producers.values.foreach(_.stop)
+ started = false
+ } else {
+ debug("Ignoring the producers stop request since it never started.")
+ }
}
def register(source: String) {
diff --git
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 6d78b77..0978738 100644
---
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,6 +56,7 @@ import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -467,13 +469,67 @@ public class TestStreamProcessor {
assertEquals(State.STOPPING, streamProcessor.getState());
Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown();
Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
+ }
- // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should
be a NO_OP.
- streamProcessor.state = State.IN_REBALANCE;
+ @Test
+ public void testJobModelExpiredDuringAnExistingRebalance() {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ ProcessorLifecycleListener lifecycleListener =
Mockito.mock(ProcessorLifecycleListener.class);
+ ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class);
+ MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+ StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId",
config, new HashMap<>(), null,
+ Optional.empty(), Optional.empty(), Optional.empty(), sp ->
lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
- streamProcessor.jobCoordinatorListener.onJobModelExpired();
+ runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService,
false);
assertEquals(State.IN_REBALANCE, streamProcessor.state);
+ assertNotEquals(mockExecutorService,
streamProcessor.containerExecutorService);
+ Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow();
+ }
+
+ @Test
+ public void
testJobModelExpiredDuringAnExistingRebalanceWithContainerInterruptFailed() {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ ProcessorLifecycleListener lifecycleListener =
Mockito.mock(ProcessorLifecycleListener.class);
+ ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class);
+ MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+ StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId",
config, new HashMap<>(), null,
+ Optional.empty(), Optional.empty(), Optional.empty(), sp ->
lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
+
+ runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService,
true);
+
+ assertEquals(State.STOPPING, streamProcessor.state);
+ assertEquals(mockExecutorService,
streamProcessor.containerExecutorService);
+ Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow();
+ Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
+ }
+
+ private void runJobModelExpireDuringRebalance(StreamProcessor
streamProcessor, ExecutorService executorService,
+ boolean failContainerInterrupt) {
+ SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+ CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ /*
+ * When there is an initialized container that hasn't started and the
stream processor is still in re-balance phase,
+ * subsequent job model expire request should attempt to interrupt the
existing container to safely shut it down
+ * before proceeding to join the barrier. As part of safe shutdown
sequence, we want to ensure shutdownNow is invoked
+ * on the existing executorService to signal interrupt and make sure new
executor service is created.
+ */
+
+ Mockito.when(executorService.shutdownNow()).thenAnswer(ctx -> {
+ if (!failContainerInterrupt) {
+ shutdownLatch.countDown();
+ }
+ return null;
+ });
+ Mockito.when(executorService.isShutdown()).thenReturn(true);
+
+ streamProcessor.state = State.IN_REBALANCE;
+ streamProcessor.container = mockSamzaContainer;
+ streamProcessor.containerExecutorService = executorService;
+ streamProcessor.containerShutdownLatch = shutdownLatch;
+
+ streamProcessor.jobCoordinatorListener.onJobModelExpired();
}
@Test
diff --git
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 78136bf..69223df 100644
---
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -35,6 +35,8 @@ import org.junit.Assert._
import org.junit.{Before, Test}
import org.mockito.Matchers.{any, notNull}
import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
import org.mockito.{Mock, Mockito, MockitoAnnotations}
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
@@ -140,6 +142,46 @@ class TestSamzaContainer extends AssertionsForJUnit with
MockitoSugar {
}
@Test
+ def testInterruptDuringStoreRestorationShutdownContainer(): Unit = {
+ when(this.containerStorageManager.start())
+ .thenAnswer(new Answer[Void] {
+ override def answer(mock: InvocationOnMock): Void = {
+ Thread.sleep(1000)
+ throw new InterruptedException("Injecting interrupt into container
storage manager")
+ }
+ })
+
+ this.samzaContainer.run
+
+ assertEquals(SamzaContainerStatus.STOPPED, this.samzaContainer.getStatus())
+ verify(this.samzaContainerListener).beforeStart()
+ verify(this.samzaContainerListener).afterStop()
+ verify(this.samzaContainerListener, never()).afterFailure(any())
+ verify(this.runLoop, times(0)).run()
+ }
+
+ @Test
+ def testInterruptDuringStoreRestorationWithErrorsDuringContainerShutdown():
Unit = {
+ when(this.containerStorageManager.start())
+ .thenAnswer(new Answer[Void] {
+ override def answer(mock: InvocationOnMock): Void = {
+ Thread.sleep(1000)
+ throw new InterruptedException("Injecting interrupt into container
storage manager")
+ }
+ })
+
+ when(this.taskInstance.shutdownTask).thenThrow(new
RuntimeException("Trigger a shutdown, please."))
+
+ this.samzaContainer.run
+
+ assertEquals(SamzaContainerStatus.FAILED, this.samzaContainer.getStatus())
+ verify(this.samzaContainerListener).beforeStart()
+ verify(this.samzaContainerListener).afterFailure(any())
+ verify(this.samzaContainerListener, never()).afterStop()
+ verify(this.runLoop, times(0)).run()
+ }
+
+ @Test
def testFailureDuringShutdown(): Unit = {
doNothing().when(this.runLoop).run() // run loop completes successfully
when(this.taskInstance.shutdownTask).thenThrow(new
RuntimeException("Trigger a shutdown, please."))
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 43872e9..bdc7e0e 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -101,7 +101,7 @@ public class TestContainerStorageManager {
* Method to create a containerStorageManager with mocked dependencies
*/
@Before
- public void setUp() {
+ public void setUp() throws InterruptedException {
taskRestoreMetricGauges = new HashMap<>();
this.tasks = new HashMap<>();
this.taskInstanceMetrics = new HashMap<>();
@@ -243,7 +243,7 @@ public class TestContainerStorageManager {
}
@Test
- public void testParallelismAndMetrics() {
+ public void testParallelismAndMetrics() throws InterruptedException {
this.containerStorageManager.start();
this.containerStorageManager.shutdown();
diff --git
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index bc6778e..afba824 100644
---
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -124,7 +124,7 @@ class KeyValueStorageEngine[K, V](
val batch = new java.util.ArrayList[Entry[Array[Byte],
Array[Byte]]](batchSize)
var lastBatchFlushed = false
- while(iterator.hasNext) {
+ while(iterator.hasNext && !Thread.currentThread().isInterrupted) {
val envelope = iterator.next()
val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
@@ -200,6 +200,11 @@ class KeyValueStorageEngine[K, V](
// flush the store and the changelog producer
flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog
producers. This only flushes the stores.
+
+ if (Thread.currentThread().isInterrupted) {
+ warn("Received an interrupt during store restoration. Exiting without
restoring the full state.")
+ throw new InterruptedException("Received an interrupt during store
restoration.")
+ }
}
def flush() = {
diff --git
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index 25445c2..151fd84 100644
---
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -21,8 +21,10 @@ package org.apache.samza.storage.kv
import java.io.File
import java.util.Arrays
+import java.util.concurrent.{Callable, ExecutionException, ExecutorService,
Executors}
import org.apache.samza.Partition
+import org.apache.samza.context.Context
import org.apache.samza.storage.StoreProperties
import org.apache.samza.system.ChangelogSSPIterator.Mode
import org.apache.samza.system.{ChangelogSSPIterator, IncomingMessageEnvelope,
SystemStreamPartition}
@@ -30,6 +32,8 @@ import org.apache.samza.task.MessageCollector
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
class TestKeyValueStorageEngine {
var engine: KeyValueStorageEngine[String, String] = null
@@ -163,6 +167,50 @@ class TestKeyValueStorageEngine {
assertEquals(15, metrics.restoredBytesGauge.getValue) // 3 keys * 2
bytes/key + 3 msgs * 3 bytes/msg
}
+ @Test
+ def testRestoreInterruptedThrowsInterruptException(): Unit = {
+ val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream",
new Partition(0))
+ val iterator = mock(classOf[ChangelogSSPIterator])
+ val executorService = Executors.newSingleThreadExecutor()
+ val restore = new Callable[Void] {
+ override def call(): Void = {
+ engine.restore(iterator)
+ null
+ }
+ }
+
+ when(iterator.hasNext)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenAnswer(new Answer[Boolean] {
+ override def answer(invocation: InvocationOnMock): Boolean = {
+ executorService.shutdownNow()
+ true
+ }
+ })
+ .thenReturn(false)
+ when(iterator.next())
+ .thenReturn(new IncomingMessageEnvelope(changelogSSP, "0",
Array[Byte](1, 2), Array[Byte](3, 4, 5)))
+ .thenReturn(new IncomingMessageEnvelope(changelogSSP, "1",
Array[Byte](2, 3), Array[Byte](4, 5, 6)))
+ .thenReturn(new IncomingMessageEnvelope(changelogSSP, "2",
Array[Byte](3, 4), Array[Byte](5, 6, 7)))
+ when(iterator.getMode)
+ .thenReturn(Mode.RESTORE)
+ .thenReturn(Mode.RESTORE)
+ .thenReturn(Mode.RESTORE)
+
+ try {
+ val restoreFuture = executorService.submit(restore)
+ restoreFuture.get()
+ fail("Expected execution exception during restoration")
+ } catch {
+ case e: ExecutionException => {
+ assertTrue(e.getCause.isInstanceOf[InterruptedException])
+ // Make sure we don't restore any more records and bail out
+ assertEquals(2, metrics.restoredMessagesGauge.getValue)
+ }
+ }
+ }
+
@Test(expected = classOf[IllegalStateException])
def testThrowsIfIteratorModeChangesFromTrimToRestore(): Unit = {
val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream",
new Partition(0))