This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9acb373  SAMZA-2256: Hotstandby fix for exception while reading side 
inputs
9acb373 is described below

commit 9acb37334499c988a2119e3d921b5bf5a6322868
Author: Ray Matharu <[email protected]>
AuthorDate: Wed Jun 19 17:02:05 2019 -0700

    SAMZA-2256: Hotstandby fix for exception while reading side inputs
    
    Author: Ray Matharu <[email protected]>
    
    Reviewers: Jagadish<[email protected]>
    
    Closes #1086 from rmatharu/test-hotstandbyfix
---
 .../samza/storage/TaskSideInputStorageManager.java |  18 ++-
 .../samza/storage/ContainerStorageManager.java     | 123 ++++++++++++---------
 2 files changed, 82 insertions(+), 59 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 59c31d9..3de4f5d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -20,8 +20,6 @@
 package org.apache.samza.storage;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-
 import java.io.File;
 import java.util.Collection;
 import java.util.HashMap;
@@ -34,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -50,7 +47,6 @@ import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.FileUtil;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
@@ -215,7 +211,19 @@ public class TaskSideInputStorageManager {
 
       KeyValueStore keyValueStore = (KeyValueStore) stores.get(storeName);
       Collection<Entry<?, ?>> entriesToBeWritten = 
sideInputsProcessor.process(message, keyValueStore);
-      keyValueStore.putAll(ImmutableList.copyOf(entriesToBeWritten));
+
+      // Iterate over the list to be written.
+      // TODO: SAMZA-2255: Optimize value writes in TaskSideInputStorageManager
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a 
delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
     }
 
     // update the last processed offset
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 980b41f..9cfa9ab 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -107,14 +107,19 @@ import scala.collection.JavaConverters;
  *  It provides bootstrap semantics for sideinputs -- the main thread is 
blocked until
  *  all sideInputSSPs have not caught up. Side input store flushes are not in 
sync with task-commit, although
  *  they happen at the same frequency.
- *  In case, where a user explicitly requests a task-commit, it will not 
include committing side inputs.
+ *  In case, where a user explicitly requests a task-commit, it will not 
include committing sideInputs.
  */
 public class ContainerStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
+  private static final String SIDEINPUTS_READ_THREAD_NAME = "SideInputs Read 
Thread";
   private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush 
Thread";
   private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
-  // We use a prefix to differentiate the SystemConsumersMetrics for 
side-inputs from the ones in SamzaContainer
+  // We use a prefix to differentiate the SystemConsumersMetrics for 
sideInputs from the ones in SamzaContainer
+
+  private static final int SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS = 10; // 
Timeout with which sideinput read thread checks for exceptions
+  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = 
Duration.ofMinutes(1); // Period with which sideinputs are flushed
+
 
   /** Maps containing relevant per-task objects */
   private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -144,7 +149,7 @@ public class ContainerStorageManager {
   private final int maxChangeLogStreamPartitions; // The partition count of 
each changelog-stream topic. This is used for validating changelog streams 
before restoring.
 
   /* Sideinput related parameters */
-  private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map 
of side input system-streams indexed by store name
+  private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map 
of sideInput system-streams indexed by store name
   private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputSSPs;
   private final Map<SystemStreamPartition, TaskSideInputStorageManager> 
sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp, 
for simpler lookup for process()
   private final Map<String, SystemConsumer> sideInputConsumers; // Mapping 
from storeSystemNames to SystemConsumers
@@ -152,12 +157,15 @@ public class ContainerStorageManager {
   private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
       = new ConcurrentHashMap<>(); // Recorded sspMetadata of the 
taskSideInputSSPs recorded at start, used to determine when sideInputs are 
caughtup and container init can proceed
   private volatile CountDownLatch sideInputsCaughtUp; // Used by the 
sideInput-read thread to signal to the main thread
-  private volatile boolean shutDownSideInputRead = false;
+  private volatile boolean shouldShutdown = false;
+
+  private final ExecutorService sideInputsReadExecutor = 
Executors.newSingleThreadExecutor(
+      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_READ_THREAD_NAME).build());
+
   private final ScheduledExecutorService sideInputsFlushExecutor = 
Executors.newSingleThreadScheduledExecutor(
       new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
   private ScheduledFuture sideInputsFlushFuture;
-  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = 
Duration.ofMinutes(1);
-  private volatile Optional<Throwable> sideInputException = Optional.empty();
+  private volatile Throwable sideInputException = null;
 
   private final Config config;
 
@@ -229,10 +237,10 @@ public class ContainerStorageManager {
     // creating task restore managers
     this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, 
this.samzaContainerMetrics);
 
-    // create side input storage managers
+    // create sideInput storage managers
     sideInputStorageManagers = createSideInputStorageManagers(clock, 
classLoader);
 
-    // create side Input consumers indexed by systemName
+    // create sideInput consumers indexed by systemName
     this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, 
systemFactories, config, this.samzaContainerMetrics.registry());
 
     // create SystemConsumers for consuming from taskSideInputSSPs, if 
sideInputs are being used
@@ -256,10 +264,10 @@ public class ContainerStorageManager {
   }
 
   /**
-   * Add all side inputs to a map of maps, indexed first by taskName, then by 
sideInput store name.
+   * Add all sideInputs to a map of maps, indexed first by taskName, then by 
sideInput store name.
    *
    * @param containerModel the containerModel to use
-   * @param sideInputSystemStreams the map of store to side input system stream
+   * @param sideInputSystemStreams the map of store to sideInput system stream
    * @return taskSideInputSSPs map
    */
   private Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
getTaskSideInputSSPs(ContainerModel containerModel, Map<String, 
Set<SystemStream>> sideInputSystemStreams) {
@@ -304,7 +312,7 @@ public class ContainerStorageManager {
           });
       });
 
-    // changelogSystemStreams correspond only to active tasks (since those of 
standby-tasks moved to side inputs above)
+    // changelogSystemStreams correspond only to active tasks (since those of 
standby-tasks moved to sideInputs above)
     return 
MapUtils.invertMap(changelogSSPToStore).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 x -> x.getValue().getSystemStream()));
   }
 
@@ -366,7 +374,7 @@ public class ContainerStorageManager {
 
   /**
    * Create taskStores for all stores in storageEngineFactories.
-   * The store mode is chosen as bulk-load if its a non-sideinput store, and 
readWrite if its a side input store
+   * The store mode is chosen as bulk-load if its a non-sideinput store, and 
readWrite if its a sideInput store
    */
   private Map<TaskName, Map<String, StorageEngine>> 
createTaskStores(ContainerModel containerModel, JobContext jobContext, 
ContainerContext containerContext,
       Map<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories, Map<String, Serde<Object>> serdes,
@@ -453,7 +461,7 @@ public class ContainerStorageManager {
         (changelogSystemStreams.containsKey(storeName)) ? new 
SystemStreamPartition(
             changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition()) : null;
 
-    // Use the logged-store-base-directory for change logged stores and side 
input stores, and non-logged-store-base-dir
+    // Use the logged-store-base-directory for change logged stores and 
sideInput stores, and non-logged-store-base-dir
     // for non logged stores
     File storeDirectory;
     if (changeLogSystemStreamPartition != null || 
sideInputSystemStreams.containsKey(storeName)) {
@@ -498,7 +506,7 @@ public class ContainerStorageManager {
   }
 
 
-  // Create side input store processors, one per store per task
+  // Create sideInput store processors, one per store per task
   private Map<TaskName, Map<String, SideInputsProcessor>> 
createSideInputProcessors(StorageConfig config,
       ContainerModel containerModel, Map<String, Set<SystemStream>> 
sideInputSystemStreams,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, ClassLoader 
classLoader) {
@@ -516,7 +524,7 @@ public class ContainerStorageManager {
           } else {
             String sideInputsProcessorFactoryClassName = 
config.getSideInputsProcessorFactory(storeName)
                 .orElseThrow(() -> new SamzaException(
-                    String.format("Could not find side inputs processor 
factory for store: %s", storeName)));
+                    String.format("Could not find sideInputs processor factory 
for store: %s", storeName)));
             SideInputsProcessorFactory sideInputsProcessorFactory =
                 ReflectionUtil.getObj(classLoader, 
sideInputsProcessorFactoryClassName,
                     SideInputsProcessorFactory.class);
@@ -540,7 +548,14 @@ public class ContainerStorageManager {
           sideInputStoresToProcessors.get(taskName).put(storeName, new 
SideInputsProcessor() {
             @Override
             public Collection<Entry<?, ?>> process(IncomingMessageEnvelope 
message, KeyValueStore store) {
-              return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) 
message.getKey()), msgSerde.fromBytes((byte[]) message.getMessage())));
+              // Ignore message if the key is null
+              if (message.getKey() == null) {
+                return ImmutableList.of();
+              } else {
+                // Skip serde if the message is null
+                return ImmutableList.of(new 
Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
+                    message.getMessage() == null ? null : 
msgSerde.fromBytes((byte[]) message.getMessage())));
+              }
             }
           });
         }
@@ -549,10 +564,10 @@ public class ContainerStorageManager {
     return sideInputStoresToProcessors;
   }
 
-  // Create task side input storage managers, one per task, index by the SSP 
they are responsible for consuming
+  // Create task sideInput storage managers, one per task, index by the SSP 
they are responsible for consuming
   private Map<SystemStreamPartition, TaskSideInputStorageManager> 
createSideInputStorageManagers(Clock clock,
       ClassLoader classLoader) {
-    // creating side input store processors, one per store per task
+    // creating sideInput store processors, one per store per task
     Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
         createSideInputProcessors(new StorageConfig(config), 
this.containerModel, this.sideInputSystemStreams,
             this.taskInstanceMetrics, classLoader);
@@ -668,8 +683,8 @@ public class ContainerStorageManager {
         try {
           getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.flush());
         } catch (Exception e) {
-          LOG.error("Exception during flushing side inputs", e);
-          sideInputException = Optional.of(e);
+          LOG.error("Exception during flushing sideInputs", e);
+          sideInputException = e;
         }
       }
     }, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
@@ -677,7 +692,7 @@ public class ContainerStorageManager {
     // set the latch to the number of sideInput SSPs
     this.sideInputsCaughtUp = new 
CountDownLatch(this.sideInputStorageManagers.keySet().size());
 
-    // register all side input SSPs with the consumers
+    // register all sideInput SSPs with the consumers
     for (SystemStreamPartition ssp : sideInputStorageManagers.keySet()) {
       String startingOffset = 
sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
 
@@ -704,44 +719,43 @@ public class ContainerStorageManager {
     // start the systemConsumers for consuming input
     this.sideInputSystemConsumers.start();
 
-    // create a thread for sideInput reads
-    Thread readSideInputs = new Thread(() -> {
-        while (!shutDownSideInputRead) {
-          IncomingMessageEnvelope envelope = 
sideInputSystemConsumers.choose(true);
-          if (envelope != null) {
 
-            if (!envelope.isEndOfStream())
-              
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
-
-            checkSideInputCaughtUp(envelope.getSystemStreamPartition(), 
envelope.getOffset(),
-                SystemStreamMetadata.OffsetType.NEWEST, 
envelope.isEndOfStream());
+    try {
 
-          } else {
-            LOG.trace("No incoming message was available");
+    // submit the sideInput read runnable
+      sideInputsReadExecutor.submit(() -> {
+          try {
+            while (!shouldShutdown) {
+              IncomingMessageEnvelope envelope = 
sideInputSystemConsumers.choose(true);
+
+              if (envelope != null) {
+                if (!envelope.isEndOfStream()) {
+                  
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
+                }
+
+                checkSideInputCaughtUp(envelope.getSystemStreamPartition(), 
envelope.getOffset(),
+                    SystemStreamMetadata.OffsetType.NEWEST, 
envelope.isEndOfStream());
+              } else {
+                LOG.trace("No incoming message was available");
+              }
+            }
+          } catch (Exception e) {
+            LOG.error("Exception in reading sideInputs", e);
+            sideInputException = e;
           }
-        }
-      });
+        });
 
-    readSideInputs.setDaemon(true);
-    readSideInputs.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
-      @Override
-      public void uncaughtException(Thread t, Throwable e) {
-        sideInputException = Optional.of(e);
-        sideInputsCaughtUp.countDown();
+      // Make the main thread wait until all sideInputs have been caughtup or 
an exception was thrown
+      while (!shouldShutdown && sideInputException == null &&
+          
!this.sideInputsCaughtUp.await(SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) {
+        LOG.debug("Waiting for SideInput bootstrap to complete");
       }
-    });
 
-    try {
-      readSideInputs.start();
-      // Make the main thread wait until all sideInputs have been caughtup or 
thrown an exception
-      this.sideInputsCaughtUp.await();
-
-      if (sideInputException.isPresent()) { // Throw exception if there was an 
exception in catching-up sideInputs
-        // TODO: SAMZA-2113 relay exception to main thread
-        throw new SamzaException("Exception in restoring side inputs", 
sideInputException.get());
+      if (sideInputException != null) { // Throw exception if there was an 
exception in catching-up sideInputs
+        throw new SamzaException("Exception in restoring sideInputs", 
sideInputException);
       }
+
     } catch (InterruptedException e) {
-      sideInputException = Optional.of(e);
       throw new SamzaException("Side inputs read was interrupted", e);
     }
 
@@ -825,10 +839,11 @@ public class ContainerStorageManager {
         getNonSideInputStores(taskName).forEach((storeName, store) -> 
store.stop())
     );
 
+    this.shouldShutdown = true;
+
     // stop all sideinput consumers and stores
     if (sideInputsPresent()) {
-      // stop reading sideInputs
-      this.shutDownSideInputRead = true;
+      sideInputsReadExecutor.shutdownNow();
 
       this.sideInputSystemConsumers.stop();
 
@@ -838,7 +853,7 @@ public class ContainerStorageManager {
       try {
         
sideInputsFlushExecutor.awaitTermination(SIDE_INPUT_FLUSH_TIMEOUT.toMillis(), 
TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
-        throw new SamzaException("Exception while shutting down side inputs", 
e);
+        throw new SamzaException("Exception while shutting down sideInputs", 
e);
       }
 
       // stop all sideInputStores -- this will perform one last flush on the 
KV stores, and write the offset file

Reply via email to