je-ik commented on a change in pull request #15082:
URL: https://github.com/apache/beam/pull/15082#discussion_r663366088
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -391,17 +410,19 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
request.getTransform().getSpec().getUrn());
LOG.debug("Full transform: {}", request.getTransform());
Set<String> existingTransformIds =
request.getComponents().getTransformsMap().keySet();
- Pipeline pipeline = Pipeline.create();
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
- // TODO(BEAM-10670): Remove this when we address performance issue.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read");
+ Pipeline pipeline = createPipeline();
+ if (!ExperimentalOptions.hasExperiment(pipelineOptions,
"use_deprecated_read")) {
Review comment:
:+1:
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -524,4 +554,15 @@ public static void main(String[] args) throws Exception {
server.start();
server.awaitTermination();
}
+
+ private static class NoOpRunner extends PipelineRunner<PipelineResult> {
Review comment:
:+1:
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -524,4 +554,15 @@ public static void main(String[] args) throws Exception {
server.start();
server.awaitTermination();
}
+
+ private static class NoOpRunner extends PipelineRunner<PipelineResult> {
+ public static NoOpRunner fromOptions(PipelineOptions opts) {
+ return new NoOpRunner();
+ }
+
Review comment:
The Pipeline in ExpansionService is not supposed to be run. First of
all, the Pipeline specification is not finished at that time, so calling run()
should result in some sort of error. UnsupportedOperationException seems to be
like one of the best options in this sense. That is probably even better than
using DirectRunner, which might in that case run somewhat partial Pipeline.
I generally like having tests for everything that makes sense to be tested,
but in this case, I'm not sure what to test, because the `NoOpRunner` (we
might come up with a better name, for instance `ExpansionRunner`?) is simply
throwing exception and not doing enything else.
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -362,6 +368,19 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
}
private @MonotonicNonNull Map<String, TransformProvider>
registeredTransforms;
+ private final PipelineOptions pipelineOptions;
+
+ public ExpansionService() {
+ this(new String[] {});
+ }
+
+ public ExpansionService(String[] args) {
+ this(PipelineOptionsFactory.fromArgs(args).create());
+ }
+
+ public ExpansionService(PipelineOptions opts) {
Review comment:
That is more complex than I expected. I actually don't know how to do
that. The only "legal" option seems to be serializing the options to JSON and
then parse them back. That is more complicated than I would expect. The current
approach to validation is that it validates only requirements - and the
validation is called in `as(Class)`. I could add a new validation type there,
but it would require changing public API of PipelineOptionsFactory - probably
the best would be to add annotation of allowed scopes to getter methods. The
PipelineOptionsFactory.fromArgs(args) would have to be extended to accept the
set of allowed scopes (default all scopes). This seems to be a non-trivial
change. I'll try to do a sketch, but I think this would really deserve new JIRA
and a new PR. We might try to get that to 2.32.0 as well.
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -486,6 +504,11 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
.build();
}
+ protected Pipeline createPipeline() {
+ pipelineOptions.setRunner(NoOpRunner.class);
Review comment:
Understand that. I think that the NoOpRunner (or if we rename it to
ExpansionRunner) is actually better than DirectRunner, as mentioned in comment
below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]