mynameborat commented on a change in pull request #912: SEP-19 : Refactoring 
sideInputs from SamzaContainer to ContainerStorageManager
URL: https://github.com/apache/samza/pull/912#discussion_r258228794
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##########
 @@ -112,15 +144,49 @@
 
   public ContainerStorageManager(ContainerModel containerModel, 
StreamMetadataCache streamMetadataCache,
       SystemAdmins systemAdmins, Map<String, SystemStream> 
changelogSystemStreams,
+      Map<String, List<SystemStream>> sideInputSystemStreams,
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
       Map<String, SystemFactory> systemFactories, Map<String, Serde<Object>> 
serdes, Config config,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
SamzaContainerMetrics samzaContainerMetrics,
       JobContext jobContext, ContainerContext containerContext,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, File 
loggedStoreBaseDirectory,
-      File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, 
Clock clock) {
+      File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, 
SerdeManager serdeManager, Clock clock) {
 
     this.containerModel = containerModel;
-    this.changelogSystemStreams = changelogSystemStreams;
+    this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams);
+    this.sideInputSSPs = new HashMap<>();
+
+    // Add all side inputs to the map of sideInputSSPs indexed by taskName
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
+        sideInputSystemStreams.keySet().forEach(storeName -> {
+            Set<SystemStreamPartition> taskSideInputSSPs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
+            this.sideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+            this.sideInputSSPs.get(taskName).put(storeName, taskSideInputSSPs);
+          });
+      });
+
+    // Create a map of changeLogSSP to storeName across all tasks, assuming no 
stores have the same changelogSSP
+    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
+    changelogSystemStreams.forEach((storeName, systemStream) ->
+            containerModel.getTasks().forEach((taskName, taskModel) -> { 
changelogSSPToStore.put(new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition()), storeName); })
+    );
+
+    // We now handle standby tasks. For each standby task, we remove its 
changeLogSSPs from changelogSSP map and add it to the task's sideInputSSPs
+    // The task's sideInputManager will now consume and restore these as well.
+    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
+        changelogSystemStreams.forEach((storeName, systemStream) -> {
+            SystemStreamPartition ssp = new 
SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
+            changelogSSPToStore.remove(ssp);
+            this.sideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+            this.sideInputSystemStreams.put(storeName, 
Collections.singletonList(ssp.getSystemStream()));
+            this.sideInputSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
+          });
+      });
+
+    // changelogSystemStreams correspond only to active tasks (since those of 
standby-tasks moved to side inputs above)
+    this.changelogSystemStreams = 
changelogSSPToStore.entrySet().stream().collect(Collectors.toMap(x -> 
x.getValue(), x -> x.getKey().getSystemStream(), (x, y) -> x));
 
 Review comment:
   iiuc, you are grouping the SSPs by store and projecting the SSPs to system 
stream.
   you can also use collectors.groupingBy to simplify. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to