This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 913fefa8f7d Reverts Python Kafka source to use the Java SDF-based 
implementation (#25684)
913fefa8f7d is described below

commit 913fefa8f7dfa16774d880a97cb9e78dcd98be5c
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Thu Mar 2 06:23:49 2023 -0800

    Reverts Python Kafka source to use the Java SDF-based implementation 
(#25684)
    
    * Reverts Python Kafka source to use the Java SDF-based implementation
    
    * Remove the expansion service workaround for the Kafka perf test
---
 sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py | 3 +--
 sdks/python/apache_beam/io/kafka.py                            | 7 +------
 2 files changed, 2 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py 
b/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py
index 72e2e63e412..08a6baee468 100644
--- a/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py
@@ -110,8 +110,7 @@ class _KafkaIOSDFReadPerfTest(LoadTest):
                 'bootstrap.servers': self.test_options.bootstrap_servers,
                 'auto.offset.reset': 'earliest'
             },
-            topics=[self.kafka_topic],
-            expansion_service=kafka.default_io_expansion_service())
+            topics=[self.kafka_topic])
         | 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace))
         | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace)))
 
diff --git a/sdks/python/apache_beam/io/kafka.py 
b/sdks/python/apache_beam/io/kafka.py
index eaabaf36363..b96576b4efb 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -187,12 +187,7 @@ class ReadFromKafka(ExternalTransform):
                 start_read_time=start_read_time,
                 commit_offset_in_finalize=commit_offset_in_finalize,
                 timestamp_policy=timestamp_policy)),
-        expansion_service or default_io_expansion_service(
-            append_args=['--experiments=use_unbounded_sdf_wrapper']))
-    # TODO(https://github.com/apache/beam/issues/21730): remove
-    #  'use_unbounded_sdf_wrapper' which opts default expansion
-    #  service into using SDF wrapped legacy Kafka source instead of pure SDF
-    #  Kafka source.
+        expansion_service or default_io_expansion_service())
 
 
 WriteToKafkaSchema = typing.NamedTuple(

Reply via email to