chamikaramj commented on a change in pull request #15082:
URL: https://github.com/apache/beam/pull/15082#discussion_r663144696



##########
File path: 
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -362,6 +368,19 @@ default InputT createInput(Pipeline p, Map<String, 
PCollection<?>> inputs) {
   }
 
   private @MonotonicNonNull Map<String, TransformProvider> 
registeredTransforms;
+  private final PipelineOptions pipelineOptions;
+
+  public ExpansionService() {
+    this(new String[] {});
+  }
+
+  public ExpansionService(String[] args) {
+    this(PipelineOptionsFactory.fromArgs(args).create());
+  }
+
+  public ExpansionService(PipelineOptions opts) {

Review comment:
       I think we should implement some sort of a validation to limit the 
supported options in the first version of the PR (this can be based on 
annotations as you mentioned in the mailing list). 

##########
File path: 
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -391,17 +410,19 @@ default InputT createInput(Pipeline p, Map<String, 
PCollection<?>> inputs) {
         request.getTransform().getSpec().getUrn());
     LOG.debug("Full transform: {}", request.getTransform());
     Set<String> existingTransformIds = 
request.getComponents().getTransformsMap().keySet();
-    Pipeline pipeline = Pipeline.create();
-    ExperimentalOptions.addExperiment(
-        pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
-    // TODO(BEAM-10670): Remove this when we address performance issue.
-    ExperimentalOptions.addExperiment(
-        pipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read");
+    Pipeline pipeline = createPipeline();
+    if (!ExperimentalOptions.hasExperiment(pipelineOptions, 
"use_deprecated_read")) {

Review comment:
       As discussed in the mailing list, "use_deprecated_read" only works for 
Flink since it has special handling for non-SDF sources in the runner. Such 
sources cannot be read in the SDK Harness without SDF. We should add a 
warn/info log for this.

##########
File path: 
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -524,4 +554,15 @@ public static void main(String[] args) throws Exception {
     server.start();
     server.awaitTermination();
   }
+
+  private static class NoOpRunner extends PipelineRunner<PipelineResult> {

Review comment:
       I'm bit hesitant regarding the NoOpRunner since I'm not sure if it 
implements the runner contract correctly (for expansion part) but I'm OK with 
it dost not result in the failure of existing tests or use-cases.

##########
File path: 
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -524,4 +554,15 @@ public static void main(String[] args) throws Exception {
     server.start();
     server.awaitTermination();
   }
+
+  private static class NoOpRunner extends PipelineRunner<PipelineResult> {
+    public static NoOpRunner fromOptions(PipelineOptions opts) {
+      return new NoOpRunner();
+    }
+

Review comment:
       Please move NoOpRunner to a separate class and add basic unit tests 
(pipeline creation with and without options, expansion etc.).

##########
File path: 
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -486,6 +504,11 @@ default InputT createInput(Pipeline p, Map<String, 
PCollection<?>> inputs) {
         .build();
   }
 
+  protected Pipeline createPipeline() {
+    pipelineOptions.setRunner(NoOpRunner.class);

Review comment:
       I would not remove current validations (which have been around forever) 
without additional discussions in the mailing list.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to