xinyuiscool commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172872669
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -107,6 +114,47 @@ public static void createConfig(
pipeline.traverseTopologically(visitor);
}
+ /**
+ * Builds a map from PTransform to its input and output PValues. The map is
serialized and stored
+ * in the job config.
+ */
+ public static Map<String, Map.Entry<String, String>> buildTransformIOMap(
Review Comment:
Move this to the json renderer class. This method has nothing to do with
translation.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
PipelineDotRenderer.toDotString(pipeline));
LOG.debug(
"Pre-processed Beam pipeline in json format:\n{}",
- PipelineJsonRenderer.toJsonString(pipeline));
+ PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
}
pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
+ final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+ final Map<String, Map.Entry<String, String>> transformIOMap =
+ SamzaPipelineTranslator.buildTransformIOMap(pipeline, options, idMap,
nonUniqueStateIds);
+
final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);
- final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
+ final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline,
transformIOMap);
Review Comment:
same above: create a ConfigContext from idMap... and pass it in.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
PipelineDotRenderer.toDotString(pipeline));
LOG.debug(
"Pre-processed Beam pipeline in json format:\n{}",
- PipelineJsonRenderer.toJsonString(pipeline));
+ PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
Review Comment:
Instead of passing in this emptyMap(), let's create a ConfigContext and pass
it in here. This map should be constructed inside the JsonRenderer.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -86,9 +87,12 @@ public static String toJsonString(RunnerApi.Pipeline
pipeline) {
private final StringBuilder jsonBuilder = new StringBuilder();
private final StringBuilder graphLinks = new StringBuilder();
private final Map<PValue, String> valueToProducerNodeName = new HashMap<>();
+ private final Map<String, Map.Entry<String, String>> transformIOMap;
private int indent;
- private PipelineJsonRenderer() {}
+ private PipelineJsonRenderer(Map<String, Map.Entry<String, String>>
transformIOMap) {
Review Comment:
Same above.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
PipelineDotRenderer.toDotString(pipeline));
LOG.debug(
"Pre-processed Beam pipeline in json format:\n{}",
- PipelineJsonRenderer.toJsonString(pipeline));
+ PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
}
pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
+ final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+ final Map<String, Map.Entry<String, String>> transformIOMap =
+ SamzaPipelineTranslator.buildTransformIOMap(pipeline, options, idMap,
nonUniqueStateIds);
+
final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);
- final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
+ final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline,
transformIOMap);
LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
- final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
- final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
final ConfigBuilder configBuilder = new ConfigBuilder(options);
-
SamzaPipelineTranslator.createConfig(
pipeline, options, idMap, nonUniqueStateIds, configBuilder);
Review Comment:
Let's refactor this method a bit to pass in the previously created
ConfigContext instead of options, idMap, nonUni.. That way the code is much
more readable and extendable.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -66,8 +66,9 @@ public interface SamzaIORegistrar {
* @param pipeline The beam pipeline
* @return JSON string representation of the pipeline
*/
- public static String toJsonString(Pipeline pipeline) {
- final PipelineJsonRenderer visitor = new PipelineJsonRenderer();
+ public static String toJsonString(
+ Pipeline pipeline, Map<String, Map.Entry<String, String>>
transformIOMap) {
Review Comment:
Instead of pass in transformIOMap, we should pass in a ConfigContext and
then construct the IOMap here. The IO map is only used for creating json.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
PipelineDotRenderer.toDotString(pipeline));
LOG.debug(
"Pre-processed Beam pipeline in json format:\n{}",
- PipelineJsonRenderer.toJsonString(pipeline));
+ PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
}
pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
+ final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+ final Map<String, Map.Entry<String, String>> transformIOMap =
Review Comment:
Let's not create this map here. Instead, create a ConfigContext and pass it
in below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]