If I understand it correctly, there is currently no place to set the defaultEnvironmentType - python's KafkaIO uses either 'expansion_service' given by the user (which might be a host:port, or an object that has appropriate method), or calls 'default_io_expansion_service' - which in turn runs ExpansionService using gradle. Either way, it ends up in ExpansionService#main [1]. It could be possible to adapt ExpansionService and call it locally - provided ExpansionService would provide a way to extend it (using protected method createPipeline()) seems to be enough - but that is not too much user-friendly. If we could specify the defaultEnvironmentConfig when starting the ExpansionService, it would be possible to add these parameters in the python SDK's KafkaIO, which would mean users do not have to worry about the expansion service at all (leaving aside that using too many ReafFromKafka or WriteToKafka transforms would somewhat hurt performance during pipeline build, but that applies to the pipeline build time only). I have created [2] to track that.

Does that make sense, or is my analysis incorrect?

 Jan

[1] https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511

[2] https://issues.apache.org/jira/browse/BEAM-12539


On 6/29/21 6:24 PM, Alexey Romanenko wrote:
I’m sorry if I missed something but do you mean that 
PortablePipelineOptions.setDefaultEnvironmentType(String) doesn’t work for you? 
Or it’s only a specific case while using portable KafkaIO?

On 29 Jun 2021, at 09:51, Jan Lukavský <x666je...@gmail.com> wrote:

Hi,

I have come across an issue with cross-language transforms. My setup is I have 
working environment type PROCESS and I cannot use DOCKER. When I use Python's 
KafkaIO, it unfortunately - by default - expands to docker environment, which 
then fails due to missing 'docker' command. I didn't find a solution without 
tackling the expansion service, yet.

I see several possible solutions to that:

  a) I would say, that the cleanest solution would be to add preferred 
environment type to the expansion request to the expansion service (probably 
along with additional flags, probably --experiments?). This requires deeper 
changes to the expansion RPC defintion, probably serializing the 
PipelineOptions from the client environment into the ExpansionRequest.

  b) Another option would be to allow specifying some of the command-line arguments 
when starting the expansion service, which currently accepts only port on command 
line, see [1]. The straightforward 'fix' (see [2]) unfortunately does not work, 
because it requires DirectRunner to be on the classpath, which then breaks other 
runners (see [3]). It seems possible to copy hand selected options from command line 
to the Pipeline, but that feels hackish. It would require to either be able to 
construct the Pipeline without a runner specified (which seems possible when calling 
Pipeline.create(), but not when using PipelineOptions create by parsing command-line 
arguments) or to be able to create a Map<String, String> from PIpelineOptions 
and then the ability to copy all options into the Pipeline's options.

My proposal would be to create a hackish shortcut and just copy the 
--defaultEnvironmentType, --defaultEnvironmentConfig and --experiments into 
Pipeline's options for now, and create an issue for a proper solution (possible 
a)?).

WDYT? Or did I miss a way to override the default expansion?

Thanks for comments,

  Jan

[1] 
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511

[2] https://github.com/apache/beam/pull/15082

[3] https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/

Reply via email to