This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d2b2d9d  Use unbounded wrapper for Kafka Read.
     new c22417b  Merge pull request #12577 from boyuanzz/use_wrapper
d2b2d9d is described below

commit d2b2d9d992199b7f2a47c7650df6d91b02ae8eba
Author: Boyuan Zhang <[email protected]>
AuthorDate: Thu Aug 13 17:06:00 2020 -0700

    Use unbounded wrapper for Kafka Read.
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  4 +-
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     | 66 +++++-----------------
 2 files changed, 18 insertions(+), 52 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b14ef22..305a475 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1047,7 +1047,9 @@ public class KafkaIO {
       // "beam_fn_api_use_deprecated_read" is not enabled.
       if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")
           || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")) {
+              input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")
+          || !ExperimentalOptions.hasExperiment(
+              input.getPipeline().getOptions(), "use_sdf_kafka_read")) {
         // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
         Unbounded<KafkaRecord<K, V>> unbounded =
             org.apache.beam.sdk.io.Read.from(
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 0352ba4..706cd98 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -30,6 +30,7 @@ import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExternalConfiguratio
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -105,11 +106,9 @@ public class KafkaIOExternalTest {
                             .setPayload(payload.toByteString())))
             .setNamespace("test_namespace")
             .build();
-
     ExpansionService expansionService = new ExpansionService();
     TestStreamObserver<ExpansionApi.ExpansionResponse> observer = new 
TestStreamObserver<>();
     expansionService.expand(request, observer);
-
     ExpansionApi.ExpansionResponse result = observer.result;
     RunnerApi.PTransform transform = result.getTransform();
     assertThat(
@@ -121,60 +120,25 @@ public class KafkaIOExternalTest {
 
     RunnerApi.PTransform kafkaComposite =
         
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
-
-    // KafkaIO.Read should be expanded into SDF transform.
-    assertThat(
-        kafkaComposite.getSubtransformsList(),
-        Matchers.contains(
-            "test_namespacetest/KafkaIO.Read/Impulse",
-            
"test_namespacetest/KafkaIO.Read/ParDo(GenerateKafkaSourceDescriptor)",
-            "test_namespacetest/KafkaIO.Read/KafkaIO.ReadSourceDescriptors"));
-
-    // Verify the consumerConfig and topics are populated correctly to
-    // GenerateKafkaSourceDescriptor.
-    RunnerApi.PTransform generateParDo =
-        
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(1));
-    KafkaIO.Read.GenerateKafkaSourceDescriptor generateDoFn =
-        (KafkaIO.Read.GenerateKafkaSourceDescriptor)
-            ParDoTranslation.getDoFn(
-                RunnerApi.ParDoPayload.parseFrom(
-                    result
-                        .getComponents()
-                        
.getTransformsOrThrow(generateParDo.getSubtransforms(0))
-                        .getSpec()
-                        .getPayload()));
-    assertThat(generateDoFn.consumerConfig, Matchers.is(consumerConfig));
-    assertThat(generateDoFn.topics, Matchers.is(topics));
-
-    // Verify that the consumerConfig, keyDeserializerProvider, 
valueDeserializerProvider are
-    // populated correctly to the SDF.
-    RunnerApi.PTransform readViaSDF =
-        
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(2));
-    RunnerApi.PTransform subTransform =
-        
result.getComponents().getTransformsOrThrow(readViaSDF.getSubtransforms(0));
-
-    ReadFromKafkaDoFn readSDF =
-        (ReadFromKafkaDoFn)
-            ParDoTranslation.getDoFn(
-                RunnerApi.ParDoPayload.parseFrom(
-                    result
-                        .getComponents()
-                        .getTransformsOrThrow(subTransform.getSubtransforms(0))
-                        .getSpec()
-                        .getPayload()));
-
-    assertThat(readSDF.consumerConfig, Matchers.is(consumerConfig));
+    RunnerApi.PTransform kafkaRead =
+        
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(0));
+    RunnerApi.ReadPayload readPayload =
+        RunnerApi.ReadPayload.parseFrom(kafkaRead.getSpec().getPayload());
+    KafkaUnboundedSource source =
+        (KafkaUnboundedSource) 
ReadTranslation.unboundedSourceFromProto(readPayload);
+    KafkaIO.Read spec = source.getSpec();
+
+    assertThat(spec.getConsumerConfig(), Matchers.is(consumerConfig));
+    assertThat(spec.getTopics(), Matchers.is(topics));
     assertThat(
-        readSDF
-            .keyDeserializerProvider
-            .getDeserializer(readSDF.consumerConfig, true)
+        spec.getKeyDeserializerProvider()
+            .getDeserializer(spec.getConsumerConfig(), true)
             .getClass()
             .getName(),
         Matchers.is(keyDeserializer));
     assertThat(
-        readSDF
-            .valueDeserializerProvider
-            .getDeserializer(readSDF.consumerConfig, false)
+        spec.getValueDeserializerProvider()
+            .getDeserializer(spec.getConsumerConfig(), false)
             .getClass()
             .getName(),
         Matchers.is(valueDeserializer));

Reply via email to