[
https://issues.apache.org/jira/browse/BEAM-12538?focusedWorklogId=621368&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-621368
]
ASF GitHub Bot logged work on BEAM-12538:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Jul/21 15:23
Start Date: 11/Jul/21 15:23
Worklog Time Spent: 10m
Work Description: chamikaramj commented on a change in pull request
#15082:
URL: https://github.com/apache/beam/pull/15082#discussion_r667497140
##########
File path:
sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java
##########
@@ -45,4 +47,20 @@ public void testHostPortAvailableAfterClose() throws
Exception {
assertThat(expansionServer.getHost(), is("localhost"));
assertThat(expansionServer.getPort(), greaterThan(0));
}
+
+ @Test
Review comment:
Do you also need an end-to-end integration test to make sure that
"PROCESS" mode works for cross-language transforms on Flink ? (can be a
separate PR).
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -462,6 +488,9 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
throw new RuntimeException(exn);
}
}));
+
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
Review comment:
But this call
"SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary"
being in the default path, it looks like we will convert SDFs to
"PrimitiveBoundedRead" if "use_sdf_read" is not set. Hence my request to move
this to the else block above.
https://github.com/apache/beam/blob/403ad51d595677d580c7a5cfd9ba9aea6f5793c2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java#L655
(I understand that we set "use_sdf_read" above for now but removal of that
in the future will make default path break for SDF which is required by all
runners other than Flink)
##########
File path:
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -486,6 +515,19 @@ default InputT createInput(Pipeline p, Map<String,
PCollection<?>> inputs) {
.build();
}
+ protected Pipeline createPipeline() {
+ PipelineOptions effectiveOpts = PipelineOptionsFactory.create();
Review comment:
I think this is OK for now but let's add a TODO and a Jira to implement
proper validation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 621368)
Time Spent: 7h (was: 6h 50m)
> Allow ExpansionService to accept PipelineOptions
> ------------------------------------------------
>
> Key: BEAM-12538
> URL: https://issues.apache.org/jira/browse/BEAM-12538
> Project: Beam
> Issue Type: Improvement
> Components: cross-language
> Affects Versions: 2.31.0
> Reporter: Jan Lukavský
> Assignee: Jan Lukavský
> Priority: P2
> Time Spent: 7h
> Remaining Estimate: 0h
>
> ExpansionService does not accept command-line arguments (other than port
> number). It is needed to allow it to accept at least
> {{defaultEnvironmentType}} and {{defaultEnvironmentConfig}} from
> {{PortablePipelineOptions}} to be able to specify environment of the expanded
> transform.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)