If I understand it correctly, there is currently no place to set the
defaultEnvironmentType - python's KafkaIO uses either
'expansion_service' given by the user (which might be a host:port, or an
object that has appropriate method), or calls
'default_io_expansion_service' - which in turn runs ExpansionService
using gradle. Either way, it ends up in ExpansionService#main [1]. It
could be possible to adapt ExpansionService and call it locally -
provided ExpansionService would provide a way to extend it (using
protected method createPipeline()) seems to be enough - but that is not
too much user-friendly. If we could specify the defaultEnvironmentConfig
when starting the ExpansionService, it would be possible to add these
parameters in the python SDK's KafkaIO, which would mean users do not
have to worry about the expansion service at all (leaving aside that
using too many ReafFromKafka or WriteToKafka transforms would somewhat
hurt performance during pipeline build, but that applies to the pipeline
build time only). I have created [2] to track that.
Does that make sense, or is my analysis incorrect?
Jan
[1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
[2] https://issues.apache.org/jira/browse/BEAM-12539
On 6/29/21 6:24 PM, Alexey Romanenko wrote:
I’m sorry if I missed something but do you mean that
PortablePipelineOptions.setDefaultEnvironmentType(String) doesn’t work for you?
Or it’s only a specific case while using portable KafkaIO?
On 29 Jun 2021, at 09:51, Jan Lukavský <x666je...@gmail.com> wrote:
Hi,
I have come across an issue with cross-language transforms. My setup is I have
working environment type PROCESS and I cannot use DOCKER. When I use Python's
KafkaIO, it unfortunately - by default - expands to docker environment, which
then fails due to missing 'docker' command. I didn't find a solution without
tackling the expansion service, yet.
I see several possible solutions to that:
a) I would say, that the cleanest solution would be to add preferred
environment type to the expansion request to the expansion service (probably
along with additional flags, probably --experiments?). This requires deeper
changes to the expansion RPC defintion, probably serializing the
PipelineOptions from the client environment into the ExpansionRequest.
b) Another option would be to allow specifying some of the command-line arguments
when starting the expansion service, which currently accepts only port on command
line, see [1]. The straightforward 'fix' (see [2]) unfortunately does not work,
because it requires DirectRunner to be on the classpath, which then breaks other
runners (see [3]). It seems possible to copy hand selected options from command line
to the Pipeline, but that feels hackish. It would require to either be able to
construct the Pipeline without a runner specified (which seems possible when calling
Pipeline.create(), but not when using PipelineOptions create by parsing command-line
arguments) or to be able to create a Map<String, String> from PIpelineOptions
and then the ability to copy all options into the Pipeline's options.
My proposal would be to create a hackish shortcut and just copy the
--defaultEnvironmentType, --defaultEnvironmentConfig and --experiments into
Pipeline's options for now, and create an issue for a proper solution (possible
a)?).
WDYT? Or did I miss a way to override the default expansion?
Thanks for comments,
Jan
[1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
[2] https://github.com/apache/beam/pull/15082
[3] https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/