sjvanrossum commented on PR #36099: URL: https://github.com/apache/beam/pull/36099#issuecomment-3277196022
I'll keep the comments I provided offline short. Now that I'm seeing how this would be used based on your tests I've got another suggestion that may help to make this more generic and easier to configure in cross language pipelines. :) I think these additions should be under `sdks/java/extensions/kafka` since it's an optional extension that only affects Kafka client configuration and not Beam IO, but let's not dwell too much on that right now. The functionality is very much Dataflow and GCP centric and I agree that this is very much a Dataflow problem because it provides no way to mount shared file systems or secrets. Still, it may not be possible to add volume mounts in the execution environment of other runners so this can be useful outside of Dataflow and GCP. I'd suggest to use Beam's [`FileSystems`](https://beam.apache.org/releases/javadoc/2.67.0/org/apache/beam/sdk/io/FileSystems.html)) instead of GCS client libraries to provide Beam users the flexibility they might need. If I understood correctly these consumer factory functions process the Kafka client configuration and replace occurrences of a few template strings based on a prefix, right? Variable substitution with custom substitution functions is supported by Kafka through [`ConfigProvider`](https://kafka.apache.org/39/javadoc/org/apache/kafka/common/config/provider/ConfigProvider.html)s implementations. Have a look at https://gist.github.com/sjvanrossum/c7d65bf30c2e8e52a178979a40e4ab79 and https://gist.github.com/sjvanrossum/e63fa805d808f2b284c8a8572fe27461 for examples. ``` config.providers=beam,gce config.providers.beam.class=com.google.cloud.dataflow.examples.RuntimePipelineOptionsProvider config.providers.gce.class=com.google.cloud.dataflow.examples.GceMetadataConfigProvider bootstrap.servers=localhost:9092 group.id=${beam:org.apache.beam.sdk.options.PipelineOptions:jobName} enable.auto.commit=false client.id=${beam:org.apache.beam.sdk.options.ApplicationNameOptions:appName} rack.id=${gce:zone} auto.offset.reset=earliest security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"${beam:com.google.cloud.dataflow.examples.JobOptions:kafkaPassword}\"; ``` The template variables are provided as `config-provider:path:key` and the `ConfigProvider` resolves a set of keys for a single path, e.g. `${copyto:/path/to/local/dir:scheme://path/to/remote/file}`. There's a project at https://github.com/jcustenborder/kafka-config-provider-gcloud that provides this for Google Cloud Secret Manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
