chamikaramj commented on a change in pull request #15082:
URL: https://github.com/apache/beam/pull/15082#discussion_r667497140



##########
File path: 
sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java
##########
@@ -45,4 +47,20 @@ public void testHostPortAvailableAfterClose() throws 
Exception {
     assertThat(expansionServer.getHost(), is("localhost"));
     assertThat(expansionServer.getPort(), greaterThan(0));
   }
+
+  @Test

Review comment:
       Do you also need an end-to-end integration test to make sure that 
"PROCESS" mode works for cross-language transforms on Flink ? (can be a 
separate PR).

##########
File path: 
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -462,6 +488,9 @@ default InputT createInput(Pipeline p, Map<String, 
PCollection<?>> inputs) {
                         throw new RuntimeException(exn);
                       }
                     }));
+
+    
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);

Review comment:
       But this call 
"SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary" 
being in the default path, it looks like we will convert SDFs to 
"PrimitiveBoundedRead" if "use_sdf_read" is not set. Hence my request to move 
this to the else block above.
   
https://github.com/apache/beam/blob/403ad51d595677d580c7a5cfd9ba9aea6f5793c2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java#L655
   
   (I understand that we set "use_sdf_read" above for now but removal of that 
in the future will make default path break for SDF which is required by all 
runners other than Flink)
   

##########
File path: 
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -486,6 +515,19 @@ default InputT createInput(Pipeline p, Map<String, 
PCollection<?>> inputs) {
         .build();
   }
 
+  protected Pipeline createPipeline() {
+    PipelineOptions effectiveOpts = PipelineOptionsFactory.create();

Review comment:
       I think this is OK for now but let's add a TODO and a Jira to implement 
proper validation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to