xinyuiscool commented on code in PR #22370:
URL: https://github.com/apache/beam/pull/22370#discussion_r927082169


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -274,6 +282,49 @@ public void open(
     }
   }
 
+  private String toStepName(ExecutableStage executableStage) {

Review Comment:
   static? Since this is more like a util function, please move to a util 
class, e.g. DoFnUtils or something.
   
   Please also add unit tests for this. Please cover the cases that the 
ExecutableStage starting from PBegin or ending with PEnd.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -369,6 +419,7 @@ public void finishBundle() {
         // RemoteBundle close blocks until all results are received
         remoteBundle.close();
         emitResults();
+        emitMetrics();

Review Comment:
   since we included all the other state ops in the metrics, let's 
emitMetrics() after clear the state. That will cover the full operation of 
invoking the bundle.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -274,6 +282,49 @@ public void open(
     }
   }
 
+  private String toStepName(ExecutableStage executableStage) {
+    /*
+     * Look for the first/input ParDo/DoFn in this executable stage by
+     * matching ParDo/DoFn's input PCollection with executable stage's
+     * input PCollection
+     */
+    Set<PipelineNode.PTransformNode> inputs =
+        executableStage.getTransforms().stream()
+            .filter(
+                transform ->
+                    transform
+                        .getTransform()
+                        .getInputsMap()
+                        
.containsValue(executableStage.getInputPCollection().getId()))
+            .collect(Collectors.toSet());
+
+    Set<String> outputIds =
+        executableStage.getOutputPCollections().stream()
+            .map(PipelineNode.PCollectionNode::getId)
+            .collect(Collectors.toSet());
+
+    /*
+     * Look for the last/output ParDo/DoFn in this executable stage by
+     * matching ParDo/DoFn's output PCollection(s) with executable stage's
+     * out PCollection(s)
+     */
+    Set<PipelineNode.PTransformNode> outputs =
+        executableStage.getTransforms().stream()
+            .filter(
+                transform ->
+                    CollectionUtils.containsAny(
+                        transform.getTransform().getOutputsMap().values(), 
outputIds))
+            .collect(Collectors.toSet());
+
+    return String.format("[%s-%s]", toStepName(inputs), toStepName(outputs));
+  }
+
+  private String toStepName(Set<PipelineNode.PTransformNode> nodes) {

Review Comment:
   similar above, make it static, and move to a util class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to