This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 913fefa8f7d Reverts Python Kafka source to use the Java SDF-based
implementation (#25684)
913fefa8f7d is described below
commit 913fefa8f7dfa16774d880a97cb9e78dcd98be5c
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Thu Mar 2 06:23:49 2023 -0800
Reverts Python Kafka source to use the Java SDF-based implementation
(#25684)
* Reverts Python Kafka source to use the Java SDF-based implementation
* Remove the expansion service workaround for the Kafka perf test
---
sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py | 3 +--
sdks/python/apache_beam/io/kafka.py | 7 +------
2 files changed, 2 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py
b/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py
index 72e2e63e412..08a6baee468 100644
--- a/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py
@@ -110,8 +110,7 @@ class _KafkaIOSDFReadPerfTest(LoadTest):
'bootstrap.servers': self.test_options.bootstrap_servers,
'auto.offset.reset': 'earliest'
},
- topics=[self.kafka_topic],
- expansion_service=kafka.default_io_expansion_service())
+ topics=[self.kafka_topic])
| 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace))
| 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace)))
diff --git a/sdks/python/apache_beam/io/kafka.py
b/sdks/python/apache_beam/io/kafka.py
index eaabaf36363..b96576b4efb 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -187,12 +187,7 @@ class ReadFromKafka(ExternalTransform):
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
timestamp_policy=timestamp_policy)),
- expansion_service or default_io_expansion_service(
- append_args=['--experiments=use_unbounded_sdf_wrapper']))
- # TODO(https://github.com/apache/beam/issues/21730): remove
- # 'use_unbounded_sdf_wrapper' which opts default expansion
- # service into using SDF wrapped legacy Kafka source instead of pure SDF
- # Kafka source.
+ expansion_service or default_io_expansion_service())
WriteToKafkaSchema = typing.NamedTuple(