mpiraino commented on issue #30663: URL: https://github.com/apache/beam/issues/30663#issuecomment-2792453120
These configurations: ``` --environment_type=EXTERNAL --environment_config=175.26.0.25:50000 ``` specify the location where beam will run your python code. `ReadFromKafka` is made available in the python sdk through a java expansion service and you need to override the default java expansion service environment type by doing something like: ``` import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service if __name__ == "__main__": options = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=175.26.0.25:50000" ]) with beam.Pipeline(options=options) as pipeline: ( pipeline | "Read Data" >> ReadFromKafka( consumer_config={ "bootstrap.servers": "175.26.0.5:9092", "auto.offset.reset": "earliest", }, topics=["member_entered_store"]), expansion_service=default_io_expansion_service( append_args=[ '--defaultEnvironmentType=PROCESS', '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}', ] ), ) ``` Note that using the using the process type requires you to have the boot executable in the flink task manager. I think that theoretically it is possible to specify EXTERNAL provided you have a pool of java sdk harness workers (I have never actually tried to set up one of those so not sure how that works, but would be very interested to see an example if someone makes it work). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org