This is an automated email from the ASF dual-hosted git repository.
jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9acb373 SAMZA-2256: Hotstandby fix for exception while reading side
inputs
9acb373 is described below
commit 9acb37334499c988a2119e3d921b5bf5a6322868
Author: Ray Matharu <[email protected]>
AuthorDate: Wed Jun 19 17:02:05 2019 -0700
SAMZA-2256: Hotstandby fix for exception while reading side inputs
Author: Ray Matharu <[email protected]>
Reviewers: Jagadish<[email protected]>
Closes #1086 from rmatharu/test-hotstandbyfix
---
.../samza/storage/TaskSideInputStorageManager.java | 18 ++-
.../samza/storage/ContainerStorageManager.java | 123 ++++++++++++---------
2 files changed, 82 insertions(+), 59 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 59c31d9..3de4f5d 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -20,8 +20,6 @@
package org.apache.samza.storage;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
@@ -34,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
-
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -50,7 +47,6 @@ import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
@@ -215,7 +211,19 @@ public class TaskSideInputStorageManager {
KeyValueStore keyValueStore = (KeyValueStore) stores.get(storeName);
Collection<Entry<?, ?>> entriesToBeWritten =
sideInputsProcessor.process(message, keyValueStore);
- keyValueStore.putAll(ImmutableList.copyOf(entriesToBeWritten));
+
+ // Iterate over the list to be written.
+ // TODO: SAMZA-2255: Optimize value writes in TaskSideInputStorageManager
+ for (Entry entry : entriesToBeWritten) {
+ // If the key is null we ignore, if the value is null, we issue a
delete, else we issue a put
+ if (entry.getKey() != null) {
+ if (entry.getValue() != null) {
+ keyValueStore.put(entry.getKey(), entry.getValue());
+ } else {
+ keyValueStore.delete(entry.getKey());
+ }
+ }
+ }
}
// update the last processed offset
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 980b41f..9cfa9ab 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -107,14 +107,19 @@ import scala.collection.JavaConverters;
* It provides bootstrap semantics for sideinputs -- the main thread is
blocked until
* all sideInputSSPs have not caught up. Side input store flushes are not in
sync with task-commit, although
* they happen at the same frequency.
- * In case, where a user explicitly requests a task-commit, it will not
include committing side inputs.
+ * In case, where a user explicitly requests a task-commit, it will not
include committing sideInputs.
*/
public class ContainerStorageManager {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStorageManager.class);
private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
+ private static final String SIDEINPUTS_READ_THREAD_NAME = "SideInputs Read
Thread";
private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush
Thread";
private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
- // We use a prefix to differentiate the SystemConsumersMetrics for
side-inputs from the ones in SamzaContainer
+ // We use a prefix to differentiate the SystemConsumersMetrics for
sideInputs from the ones in SamzaContainer
+
+ private static final int SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS = 10; //
Timeout with which sideinput read thread checks for exceptions
+ private static final Duration SIDE_INPUT_FLUSH_TIMEOUT =
Duration.ofMinutes(1); // Period with which sideinputs are flushed
+
/** Maps containing relevant per-task objects */
private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -144,7 +149,7 @@ public class ContainerStorageManager {
private final int maxChangeLogStreamPartitions; // The partition count of
each changelog-stream topic. This is used for validating changelog streams
before restoring.
/* Sideinput related parameters */
- private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map
of side input system-streams indexed by store name
+ private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map
of sideInput system-streams indexed by store name
private final Map<TaskName, Map<String, Set<SystemStreamPartition>>>
taskSideInputSSPs;
private final Map<SystemStreamPartition, TaskSideInputStorageManager>
sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp,
for simpler lookup for process()
private final Map<String, SystemConsumer> sideInputConsumers; // Mapping
from storeSystemNames to SystemConsumers
@@ -152,12 +157,15 @@ public class ContainerStorageManager {
private final Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
= new ConcurrentHashMap<>(); // Recorded sspMetadata of the
taskSideInputSSPs recorded at start, used to determine when sideInputs are
caughtup and container init can proceed
private volatile CountDownLatch sideInputsCaughtUp; // Used by the
sideInput-read thread to signal to the main thread
- private volatile boolean shutDownSideInputRead = false;
+ private volatile boolean shouldShutdown = false;
+
+ private final ExecutorService sideInputsReadExecutor =
Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_READ_THREAD_NAME).build());
+
private final ScheduledExecutorService sideInputsFlushExecutor =
Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
private ScheduledFuture sideInputsFlushFuture;
- private static final Duration SIDE_INPUT_FLUSH_TIMEOUT =
Duration.ofMinutes(1);
- private volatile Optional<Throwable> sideInputException = Optional.empty();
+ private volatile Throwable sideInputException = null;
private final Config config;
@@ -229,10 +237,10 @@ public class ContainerStorageManager {
// creating task restore managers
this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock,
this.samzaContainerMetrics);
- // create side input storage managers
+ // create sideInput storage managers
sideInputStorageManagers = createSideInputStorageManagers(clock,
classLoader);
- // create side Input consumers indexed by systemName
+ // create sideInput consumers indexed by systemName
this.sideInputConsumers = createConsumers(this.sideInputSystemStreams,
systemFactories, config, this.samzaContainerMetrics.registry());
// create SystemConsumers for consuming from taskSideInputSSPs, if
sideInputs are being used
@@ -256,10 +264,10 @@ public class ContainerStorageManager {
}
/**
- * Add all side inputs to a map of maps, indexed first by taskName, then by
sideInput store name.
+ * Add all sideInputs to a map of maps, indexed first by taskName, then by
sideInput store name.
*
* @param containerModel the containerModel to use
- * @param sideInputSystemStreams the map of store to side input system stream
+ * @param sideInputSystemStreams the map of store to sideInput system stream
* @return taskSideInputSSPs map
*/
private Map<TaskName, Map<String, Set<SystemStreamPartition>>>
getTaskSideInputSSPs(ContainerModel containerModel, Map<String,
Set<SystemStream>> sideInputSystemStreams) {
@@ -304,7 +312,7 @@ public class ContainerStorageManager {
});
});
- // changelogSystemStreams correspond only to active tasks (since those of
standby-tasks moved to side inputs above)
+ // changelogSystemStreams correspond only to active tasks (since those of
standby-tasks moved to sideInputs above)
return
MapUtils.invertMap(changelogSSPToStore).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
x -> x.getValue().getSystemStream()));
}
@@ -366,7 +374,7 @@ public class ContainerStorageManager {
/**
* Create taskStores for all stores in storageEngineFactories.
- * The store mode is chosen as bulk-load if its a non-sideinput store, and
readWrite if its a side input store
+ * The store mode is chosen as bulk-load if its a non-sideinput store, and
readWrite if its a sideInput store
*/
private Map<TaskName, Map<String, StorageEngine>>
createTaskStores(ContainerModel containerModel, JobContext jobContext,
ContainerContext containerContext,
Map<String, StorageEngineFactory<Object, Object>>
storageEngineFactories, Map<String, Serde<Object>> serdes,
@@ -453,7 +461,7 @@ public class ContainerStorageManager {
(changelogSystemStreams.containsKey(storeName)) ? new
SystemStreamPartition(
changelogSystemStreams.get(storeName),
taskModel.getChangelogPartition()) : null;
- // Use the logged-store-base-directory for change logged stores and side
input stores, and non-logged-store-base-dir
+ // Use the logged-store-base-directory for change logged stores and
sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeDirectory;
if (changeLogSystemStreamPartition != null ||
sideInputSystemStreams.containsKey(storeName)) {
@@ -498,7 +506,7 @@ public class ContainerStorageManager {
}
- // Create side input store processors, one per store per task
+ // Create sideInput store processors, one per store per task
private Map<TaskName, Map<String, SideInputsProcessor>>
createSideInputProcessors(StorageConfig config,
ContainerModel containerModel, Map<String, Set<SystemStream>>
sideInputSystemStreams,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, ClassLoader
classLoader) {
@@ -516,7 +524,7 @@ public class ContainerStorageManager {
} else {
String sideInputsProcessorFactoryClassName =
config.getSideInputsProcessorFactory(storeName)
.orElseThrow(() -> new SamzaException(
- String.format("Could not find side inputs processor
factory for store: %s", storeName)));
+ String.format("Could not find sideInputs processor factory
for store: %s", storeName)));
SideInputsProcessorFactory sideInputsProcessorFactory =
ReflectionUtil.getObj(classLoader,
sideInputsProcessorFactoryClassName,
SideInputsProcessorFactory.class);
@@ -540,7 +548,14 @@ public class ContainerStorageManager {
sideInputStoresToProcessors.get(taskName).put(storeName, new
SideInputsProcessor() {
@Override
public Collection<Entry<?, ?>> process(IncomingMessageEnvelope
message, KeyValueStore store) {
- return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[])
message.getKey()), msgSerde.fromBytes((byte[]) message.getMessage())));
+ // Ignore message if the key is null
+ if (message.getKey() == null) {
+ return ImmutableList.of();
+ } else {
+ // Skip serde if the message is null
+ return ImmutableList.of(new
Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
+ message.getMessage() == null ? null :
msgSerde.fromBytes((byte[]) message.getMessage())));
+ }
}
});
}
@@ -549,10 +564,10 @@ public class ContainerStorageManager {
return sideInputStoresToProcessors;
}
- // Create task side input storage managers, one per task, index by the SSP
they are responsible for consuming
+ // Create task sideInput storage managers, one per task, index by the SSP
they are responsible for consuming
private Map<SystemStreamPartition, TaskSideInputStorageManager>
createSideInputStorageManagers(Clock clock,
ClassLoader classLoader) {
- // creating side input store processors, one per store per task
+ // creating sideInput store processors, one per store per task
Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
createSideInputProcessors(new StorageConfig(config),
this.containerModel, this.sideInputSystemStreams,
this.taskInstanceMetrics, classLoader);
@@ -668,8 +683,8 @@ public class ContainerStorageManager {
try {
getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.flush());
} catch (Exception e) {
- LOG.error("Exception during flushing side inputs", e);
- sideInputException = Optional.of(e);
+ LOG.error("Exception during flushing sideInputs", e);
+ sideInputException = e;
}
}
}, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
@@ -677,7 +692,7 @@ public class ContainerStorageManager {
// set the latch to the number of sideInput SSPs
this.sideInputsCaughtUp = new
CountDownLatch(this.sideInputStorageManagers.keySet().size());
- // register all side input SSPs with the consumers
+ // register all sideInput SSPs with the consumers
for (SystemStreamPartition ssp : sideInputStorageManagers.keySet()) {
String startingOffset =
sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
@@ -704,44 +719,43 @@ public class ContainerStorageManager {
// start the systemConsumers for consuming input
this.sideInputSystemConsumers.start();
- // create a thread for sideInput reads
- Thread readSideInputs = new Thread(() -> {
- while (!shutDownSideInputRead) {
- IncomingMessageEnvelope envelope =
sideInputSystemConsumers.choose(true);
- if (envelope != null) {
- if (!envelope.isEndOfStream())
-
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
-
- checkSideInputCaughtUp(envelope.getSystemStreamPartition(),
envelope.getOffset(),
- SystemStreamMetadata.OffsetType.NEWEST,
envelope.isEndOfStream());
+ try {
- } else {
- LOG.trace("No incoming message was available");
+ // submit the sideInput read runnable
+ sideInputsReadExecutor.submit(() -> {
+ try {
+ while (!shouldShutdown) {
+ IncomingMessageEnvelope envelope =
sideInputSystemConsumers.choose(true);
+
+ if (envelope != null) {
+ if (!envelope.isEndOfStream()) {
+
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
+ }
+
+ checkSideInputCaughtUp(envelope.getSystemStreamPartition(),
envelope.getOffset(),
+ SystemStreamMetadata.OffsetType.NEWEST,
envelope.isEndOfStream());
+ } else {
+ LOG.trace("No incoming message was available");
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in reading sideInputs", e);
+ sideInputException = e;
}
- }
- });
+ });
- readSideInputs.setDaemon(true);
- readSideInputs.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- sideInputException = Optional.of(e);
- sideInputsCaughtUp.countDown();
+ // Make the main thread wait until all sideInputs have been caughtup or
an exception was thrown
+ while (!shouldShutdown && sideInputException == null &&
+
!this.sideInputsCaughtUp.await(SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS,
TimeUnit.SECONDS)) {
+ LOG.debug("Waiting for SideInput bootstrap to complete");
}
- });
- try {
- readSideInputs.start();
- // Make the main thread wait until all sideInputs have been caughtup or
thrown an exception
- this.sideInputsCaughtUp.await();
-
- if (sideInputException.isPresent()) { // Throw exception if there was an
exception in catching-up sideInputs
- // TODO: SAMZA-2113 relay exception to main thread
- throw new SamzaException("Exception in restoring side inputs",
sideInputException.get());
+ if (sideInputException != null) { // Throw exception if there was an
exception in catching-up sideInputs
+ throw new SamzaException("Exception in restoring sideInputs",
sideInputException);
}
+
} catch (InterruptedException e) {
- sideInputException = Optional.of(e);
throw new SamzaException("Side inputs read was interrupted", e);
}
@@ -825,10 +839,11 @@ public class ContainerStorageManager {
getNonSideInputStores(taskName).forEach((storeName, store) ->
store.stop())
);
+ this.shouldShutdown = true;
+
// stop all sideinput consumers and stores
if (sideInputsPresent()) {
- // stop reading sideInputs
- this.shutDownSideInputRead = true;
+ sideInputsReadExecutor.shutdownNow();
this.sideInputSystemConsumers.stop();
@@ -838,7 +853,7 @@ public class ContainerStorageManager {
try {
sideInputsFlushExecutor.awaitTermination(SIDE_INPUT_FLUSH_TIMEOUT.toMillis(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- throw new SamzaException("Exception while shutting down side inputs",
e);
+ throw new SamzaException("Exception while shutting down sideInputs",
e);
}
// stop all sideInputStores -- this will perform one last flush on the
KV stores, and write the offset file