mpiraino commented on issue #30663:
URL: https://github.com/apache/beam/issues/30663#issuecomment-2792453120
These configurations:
```
--environment_type=EXTERNAL
--environment_config=175.26.0.25:50000
```
specify the location where beam will run your python code. `ReadFromKafka`
is made available in the python sdk through a java expansion service and you
need to override the default java expansion service environment type by doing
something like:
```
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service
if __name__ == "__main__":
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=EXTERNAL",
"--environment_config=175.26.0.25:50000"
])
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Read Data" >> ReadFromKafka(
consumer_config={
"bootstrap.servers": "175.26.0.5:9092",
"auto.offset.reset": "earliest",
},
topics=["member_entered_store"]),
expansion_service=default_io_expansion_service(
append_args=[
'--defaultEnvironmentType=PROCESS',
'--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
]
),
)
```
Note that using the using the process type requires you to have the boot
executable in the flink task manager. I think that theoretically it is possible
to specify EXTERNAL provided you have a pool of java sdk harness workers (I
have never actually tried to set up one of those so not sure how that works,
but would be very interested to see an example if someone makes it work).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]