rmatharu commented on a change in pull request #1218: Samza 2382: User 
specified side input processor should take precedence over the identity 
processor
URL: https://github.com/apache/samza/pull/1218#discussion_r345528437
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##########
 @@ -526,51 +526,57 @@ private StorageEngine createStore(String storeName, 
TaskName taskName, TaskModel
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
 
     Map<TaskName, Map<String, SideInputsProcessor>> 
sideInputStoresToProcessors = new HashMap<>();
-    getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> 
{
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
         sideInputStoresToProcessors.put(taskName, new HashMap<>());
+        TaskMode taskMode = taskModel.getTaskMode();
+
         for (String storeName : sideInputSystemStreams.keySet()) {
+
+          SideInputsProcessor sideInputsProcessor;
           Optional<String> sideInputsProcessorSerializedInstance =
               config.getSideInputsProcessorSerializedInstance(storeName);
+
           if (sideInputsProcessorSerializedInstance.isPresent()) {
-            sideInputStoresToProcessors.get(taskName)
-                .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
-                    sideInputsProcessorSerializedInstance.get()));
-          } else {
-            String sideInputsProcessorFactoryClassName = 
config.getSideInputsProcessorFactory(storeName)
-                .orElseThrow(() -> new SamzaException(
-                    String.format("Could not find sideInputs processor factory 
for store: %s", storeName)));
+
+            sideInputsProcessor = SerdeUtils.deserialize("Side Inputs 
Processor", sideInputsProcessorSerializedInstance.get());
+            LOG.info("Using serialized side-inputs-processor for store: {}, 
task: {}", storeName, taskName);
+
+          } else if 
(config.getSideInputsProcessorFactory(storeName).isPresent()) {
+            String sideInputsProcessorFactoryClassName = 
config.getSideInputsProcessorFactory(storeName).get();
             SideInputsProcessorFactory sideInputsProcessorFactory =
                 ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, 
SideInputsProcessorFactory.class);
-            SideInputsProcessor sideInputsProcessor =
-                sideInputsProcessorFactory.getSideInputsProcessor(config, 
taskInstanceMetrics.get(taskName).registry());
-            sideInputStoresToProcessors.get(taskName).put(storeName, 
sideInputsProcessor);
-          }
-        }
-      });
+            sideInputsProcessor = 
sideInputsProcessorFactory.getSideInputsProcessor(config, 
taskInstanceMetrics.get(taskName).registry());
+            LOG.info("Using side-inputs-processor from factory: {} for store: 
{}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), 
storeName, taskName);
 
-    // creating identity sideInputProcessor for stores of standbyTasks
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
-        sideInputStoresToProcessors.put(taskName, new HashMap<>());
-        for (String storeName : sideInputSystemStreams.keySet()) {
+          } else if 
(!config.getSideInputsProcessorFactory(storeName).isPresent() && 
taskMode.equals(TaskMode.Active)) {
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to