chamikaramj commented on a change in pull request #15082:
URL: https://github.com/apache/beam/pull/15082#discussion_r664797025
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -462,6 +488,9 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
throw new RuntimeException(exn);
}
}));
+
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
Review comment:
Please move this to "else" block for "use_deprecated_read" above.
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -524,4 +554,15 @@ public static void main(String[] args) throws Exception {
server.start();
server.awaitTermination();
}
+
+ private static class NoOpRunner extends PipelineRunner<PipelineResult> {
+ public static NoOpRunner fromOptions(PipelineOptions opts) {
+ return new NoOpRunner();
+ }
+
Review comment:
I mean some basic tests to make sure that a Pipeline can be created and
expanded with a "NoOpRunner" (not to check for any functionality of
"NoOpRunner").
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -362,6 +368,19 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
}
private @MonotonicNonNull Map<String, TransformProvider>
registeredTransforms;
+ private final PipelineOptions pipelineOptions;
+
+ public ExpansionService() {
+ this(new String[] {});
+ }
+
+ public ExpansionService(String[] args) {
+ this(PipelineOptionsFactory.fromArgs(args).create());
+ }
+
+ public ExpansionService(PipelineOptions opts) {
Review comment:
Till we have proper validation in place, can we just explicitly create a
second PipelineOptions with only the set of options that we wish to support ?
(to make sure that arbitrary options that may or may not be supported do not
get passed to the external transform expansion).
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -486,6 +504,11 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
.build();
}
+ protected Pipeline createPipeline() {
+ pipelineOptions.setRunner(NoOpRunner.class);
Review comment:
It's redundant if DirectRunner works for expansion, right ? I would also
add a comment regarding why this was needed and a TODO with JIRA to remove this
when DirectRunner does not conflict with Flink.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]