sjvanrossum commented on PR #36099:
URL: https://github.com/apache/beam/pull/36099#issuecomment-3277196022

   I'll keep the comments I provided offline short. Now that I'm seeing how 
this would be used based on your tests I've got another suggestion that may 
help to make this more generic and easier to configure in cross language 
pipelines. :)
   
   I think these additions should be under `sdks/java/extensions/kafka` since 
it's an optional extension that only affects Kafka client configuration and not 
Beam IO, but let's not dwell too much on that right now.
   
   The functionality is very much Dataflow and GCP centric and I agree that 
this is very much a Dataflow problem because it provides no way to mount shared 
file systems or secrets. Still, it may not be possible to add volume mounts in 
the execution environment of other runners so this can be useful outside of 
Dataflow and GCP. I'd suggest to use Beam's 
[`FileSystems`](https://beam.apache.org/releases/javadoc/2.67.0/org/apache/beam/sdk/io/FileSystems.html))
 instead of GCS client libraries to provide Beam users the flexibility they 
might need.
   
   If I understood correctly these consumer factory functions process the Kafka 
client configuration and replace occurrences of a few template strings based on 
a prefix, right? Variable substitution with custom substitution functions is 
supported by Kafka through 
[`ConfigProvider`](https://kafka.apache.org/39/javadoc/org/apache/kafka/common/config/provider/ConfigProvider.html)s
 implementations. Have a look at 
https://gist.github.com/sjvanrossum/c7d65bf30c2e8e52a178979a40e4ab79 and 
https://gist.github.com/sjvanrossum/e63fa805d808f2b284c8a8572fe27461 for 
examples.
   ```
   config.providers=beam,gce
   
config.providers.beam.class=com.google.cloud.dataflow.examples.RuntimePipelineOptionsProvider
   
config.providers.gce.class=com.google.cloud.dataflow.examples.GceMetadataConfigProvider
   bootstrap.servers=localhost:9092
   group.id=${beam:org.apache.beam.sdk.options.PipelineOptions:jobName}
   enable.auto.commit=false
   client.id=${beam:org.apache.beam.sdk.options.ApplicationNameOptions:appName}
   rack.id=${gce:zone}
   auto.offset.reset=earliest
   security.protocol=SASL_SSL
   sasl.mechanism=PLAIN
   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username=\"user\" 
password=\"${beam:com.google.cloud.dataflow.examples.JobOptions:kafkaPassword}\";
   ```
   
   The template variables are provided as `config-provider:path:key` and the 
`ConfigProvider` resolves a set of keys for a single path, e.g. 
`${copyto:/path/to/local/dir:scheme://path/to/remote/file}`. There's a project 
at https://github.com/jcustenborder/kafka-config-provider-gcloud that provides 
this for Google Cloud Secret Manager.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to