mynameborat commented on a change in pull request #1218: Samza 2382: User 
specified side input processor should take precedence over the identity 
processor
URL: https://github.com/apache/samza/pull/1218#discussion_r345524476
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##########
 @@ -526,51 +526,57 @@ private StorageEngine createStore(String storeName, 
TaskName taskName, TaskModel
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
 
     Map<TaskName, Map<String, SideInputsProcessor>> 
sideInputStoresToProcessors = new HashMap<>();
-    getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> 
{
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
         sideInputStoresToProcessors.put(taskName, new HashMap<>());
+        TaskMode taskMode = taskModel.getTaskMode();
+
         for (String storeName : sideInputSystemStreams.keySet()) {
+
+          SideInputsProcessor sideInputsProcessor;
           Optional<String> sideInputsProcessorSerializedInstance =
               config.getSideInputsProcessorSerializedInstance(storeName);
+
           if (sideInputsProcessorSerializedInstance.isPresent()) {
-            sideInputStoresToProcessors.get(taskName)
-                .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
-                    sideInputsProcessorSerializedInstance.get()));
-          } else {
-            String sideInputsProcessorFactoryClassName = 
config.getSideInputsProcessorFactory(storeName)
-                .orElseThrow(() -> new SamzaException(
-                    String.format("Could not find sideInputs processor factory 
for store: %s", storeName)));
+
+            sideInputsProcessor = SerdeUtils.deserialize("Side Inputs 
Processor", sideInputsProcessorSerializedInstance.get());
+            LOG.info("Using serialized side-inputs-processor for store: {}, 
task: {}", storeName, taskName);
+
+          } else if 
(config.getSideInputsProcessorFactory(storeName).isPresent()) {
+            String sideInputsProcessorFactoryClassName = 
config.getSideInputsProcessorFactory(storeName).get();
             SideInputsProcessorFactory sideInputsProcessorFactory =
                 ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, 
SideInputsProcessorFactory.class);
-            SideInputsProcessor sideInputsProcessor =
-                sideInputsProcessorFactory.getSideInputsProcessor(config, 
taskInstanceMetrics.get(taskName).registry());
-            sideInputStoresToProcessors.get(taskName).put(storeName, 
sideInputsProcessor);
-          }
-        }
-      });
+            sideInputsProcessor = 
sideInputsProcessorFactory.getSideInputsProcessor(config, 
taskInstanceMetrics.get(taskName).registry());
+            LOG.info("Using side-inputs-processor from factory: {} for store: 
{}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), 
storeName, taskName);
 
-    // creating identity sideInputProcessor for stores of standbyTasks
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
-        sideInputStoresToProcessors.put(taskName, new HashMap<>());
-        for (String storeName : sideInputSystemStreams.keySet()) {
+          } else if 
(!config.getSideInputsProcessorFactory(storeName).isPresent() && 
taskMode.equals(TaskMode.Active)) {
 
 Review comment:
   Ideally all side inputs should have side inputs processor irrespective of 
what task mode is. At this point, we blur the line between actual side inputs 
vs changelog SSPs and that only allows us to validate for task mode active 
since we can't differentiate the actual side inputs vs changelog SSPs as side 
inputs.
   
   I suggest we remove this validation since we have a validation upstream to 
check if the side inputs are specified along with the processor factory or 
processor lambda.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to