bkonold commented on a change in pull request #1343:
URL: https://github.com/apache/samza/pull/1343#discussion_r429146510



##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -149,22 +153,26 @@
   private final int maxChangeLogStreamPartitions; // The partition count of 
each changelog-stream topic. This is used for validating changelog streams 
before restoring.
 
   /* Sideinput related parameters */
-  private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map 
of sideInput system-streams indexed by store name
-  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputSSPs;
-  private final Map<SystemStreamPartition, TaskSideInputStorageManager> 
sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp, 
for simpler lookup for process()
-  private final Map<String, SystemConsumer> sideInputConsumers; // Mapping 
from storeSystemNames to SystemConsumers
+  // side inputs indexed first by task, then store name
+  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputStoreSSPs;
+  private final Map<SystemStreamPartition, Object> sideInputSSPLocks;
+  private final Map<TaskName, TaskSideInputStorageManager> 
taskSideInputStorageManagers;
   private SystemConsumers sideInputSystemConsumers;
-  private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
-      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the 
taskSideInputSSPs recorded at start, used to determine when sideInputs are 
caughtup and container init can proceed
-  private volatile CountDownLatch sideInputsCaughtUp; // Used by the 
sideInput-read thread to signal to the main thread
+  // Recorded sspMetadata of the taskSideInputSSPs recorded at start, used to 
determine when sideInputs are caughtup and container init can proceed
+  private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata 
= new ConcurrentHashMap<>();
+  // Used by the sideInput-read thread to signal to the main thread
+  private volatile CountDownLatch sideInputTasksCaughtUp;
   private volatile boolean shouldShutdown = false;
+  private final ConcurrentHashMap<SystemStreamPartition, Optional<String>> 
sideInputSSPCheckpointOffsets;

Review comment:
       We need to know what SSPs we expect checkpoints for. See 
TaskSideInputHandler::process.
   
   For SSPs that we know are checkpointed (i.e. changelog side input SSPs) we 
need to block until we get a checkpoint to consume to. I use Optional as a way 
to propagate that information (Optional.empty() means we expect a checkpoint 
but there isn't one yet).
   
   Do you see a better way we could accomplish this? It's not immediately 
intuitive, so at the very least I should probably include some documentation 
around this choice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to