mynameborat commented on a change in pull request #912: SEP-19 : Refactoring sideInputs from SamzaContainer to ContainerStorageManager URL: https://github.com/apache/samza/pull/912#discussion_r258228794
########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -112,15 +144,49 @@ public ContainerStorageManager(ContainerModel containerModel, StreamMetadataCache streamMetadataCache, SystemAdmins systemAdmins, Map<String, SystemStream> changelogSystemStreams, + Map<String, List<SystemStream>> sideInputSystemStreams, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories, Map<String, SystemFactory> systemFactories, Map<String, Serde<Object>> serdes, Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, SamzaContainerMetrics samzaContainerMetrics, JobContext jobContext, ContainerContext containerContext, Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, File loggedStoreBaseDirectory, - File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, Clock clock) { + File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, SerdeManager serdeManager, Clock clock) { this.containerModel = containerModel; - this.changelogSystemStreams = changelogSystemStreams; + this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams); + this.sideInputSSPs = new HashMap<>(); + + // Add all side inputs to the map of sideInputSSPs indexed by taskName + containerModel.getTasks().forEach((taskName, taskModel) -> { + sideInputSystemStreams.keySet().forEach(storeName -> { + Set<SystemStreamPartition> taskSideInputSSPs = taskModel.getSystemStreamPartitions().stream().filter(ssp -> sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet()); + this.sideInputSSPs.putIfAbsent(taskName, new HashMap<>()); + this.sideInputSSPs.get(taskName).put(storeName, taskSideInputSSPs); + }); + }); + + // Create a map of changeLogSSP to storeName across all tasks, assuming no stores have the same changelogSSP + Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>(); + changelogSystemStreams.forEach((storeName, systemStream) -> + containerModel.getTasks().forEach((taskName, taskModel) -> { changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName); }) + ); + + // We now handle standby tasks. For each standby task, we remove its changeLogSSPs from changelogSSP map and add it to the task's sideInputSSPs + // The task's sideInputManager will now consume and restore these as well. + getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> { + changelogSystemStreams.forEach((storeName, systemStream) -> { + SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()); + changelogSSPToStore.remove(ssp); + this.sideInputSSPs.putIfAbsent(taskName, new HashMap<>()); + this.sideInputSystemStreams.put(storeName, Collections.singletonList(ssp.getSystemStream())); + this.sideInputSSPs.get(taskName).put(storeName, Collections.singleton(ssp)); + }); + }); + + // changelogSystemStreams correspond only to active tasks (since those of standby-tasks moved to side inputs above) + this.changelogSystemStreams = changelogSSPToStore.entrySet().stream().collect(Collectors.toMap(x -> x.getValue(), x -> x.getKey().getSystemStream(), (x, y) -> x)); Review comment: iiuc, you are grouping the SSPs by store and projecting the SSPs to system stream. you can also use collectors.groupingBy to simplify. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services