Sanil15 commented on code in PR #26437:
URL: https://github.com/apache/beam/pull/26437#discussion_r1181788990
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -292,4 +302,29 @@ static Map<String, Map.Entry<String, String>>
buildTransformIOMap(
private static Supplier<List<String>> getIOPValueList(Map<TupleTag<?>,
PCollection<?>> map) {
return () -> map.values().stream().map(pColl ->
pColl.getName()).collect(Collectors.toList());
}
+
+ // Reads the config to build transformIOMap, i.e. map of inputs & output
PValues for each
+ // PTransform
+ public static Map<String, Map.Entry<String, String>>
getTransformIOMap(Config config) {
+ checkNotNull(config, "Config cannot be null");
+ final Map<String, Map.Entry<String, String>> result = new HashMap<>();
+ final String pipelineJsonGraph = config.get(SamzaRunner.BEAM_JSON_GRAPH);
+ if (pipelineJsonGraph == null) {
+ LOG.warn(
+ "Cannot build transformIOMap since Config: {} is found null ",
+ SamzaRunner.BEAM_JSON_GRAPH);
+ return result;
+ }
+ JsonObject jsonObject =
JsonParser.parseString(pipelineJsonGraph).getAsJsonObject();
+ JsonArray transformIOInfo = jsonObject.getAsJsonArray("transformIOInfo");
+ transformIOInfo.forEach(
+ transform -> {
+ final String transformName =
+ transform.getAsJsonObject().get("transformName").getAsString();
+ final String inputs =
transform.getAsJsonObject().get("inputs").getAsString();
Review Comment:
done, made this return a input and output pvalues as list here
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/TestSamzaRunner.java:
##########
@@ -30,22 +31,33 @@
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
/** Test {@link SamzaRunner}. */
public class TestSamzaRunner extends PipelineRunner<PipelineResult> {
private final SamzaRunner delegate;
+ private static InMemoryMetricsReporter testMetricsReporter = new
InMemoryMetricsReporter();
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]