This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d2b2d9d Use unbounded wrapper for Kafka Read.
new c22417b Merge pull request #12577 from boyuanzz/use_wrapper
d2b2d9d is described below
commit d2b2d9d992199b7f2a47c7650df6d91b02ae8eba
Author: Boyuan Zhang <[email protected]>
AuthorDate: Thu Aug 13 17:06:00 2020 -0700
Use unbounded wrapper for Kafka Read.
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 +-
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 66 +++++-----------------
2 files changed, 18 insertions(+), 52 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b14ef22..305a475 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1047,7 +1047,9 @@ public class KafkaIO {
// "beam_fn_api_use_deprecated_read" is not enabled.
if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(),
"beam_fn_api")
|| ExperimentalOptions.hasExperiment(
- input.getPipeline().getOptions(),
"beam_fn_api_use_deprecated_read")) {
+ input.getPipeline().getOptions(),
"beam_fn_api_use_deprecated_read")
+ || !ExperimentalOptions.hasExperiment(
+ input.getPipeline().getOptions(), "use_sdf_kafka_read")) {
// Handles unbounded source to bounded conversion if maxNumRecords or
maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
org.apache.beam.sdk.io.Read.from(
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 0352ba4..706cd98 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -30,6 +30,7 @@ import
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExternalConfiguratio
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -105,11 +106,9 @@ public class KafkaIOExternalTest {
.setPayload(payload.toByteString())))
.setNamespace("test_namespace")
.build();
-
ExpansionService expansionService = new ExpansionService();
TestStreamObserver<ExpansionApi.ExpansionResponse> observer = new
TestStreamObserver<>();
expansionService.expand(request, observer);
-
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();
assertThat(
@@ -121,60 +120,25 @@ public class KafkaIOExternalTest {
RunnerApi.PTransform kafkaComposite =
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
-
- // KafkaIO.Read should be expanded into SDF transform.
- assertThat(
- kafkaComposite.getSubtransformsList(),
- Matchers.contains(
- "test_namespacetest/KafkaIO.Read/Impulse",
-
"test_namespacetest/KafkaIO.Read/ParDo(GenerateKafkaSourceDescriptor)",
- "test_namespacetest/KafkaIO.Read/KafkaIO.ReadSourceDescriptors"));
-
- // Verify the consumerConfig and topics are populated correctly to
- // GenerateKafkaSourceDescriptor.
- RunnerApi.PTransform generateParDo =
-
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(1));
- KafkaIO.Read.GenerateKafkaSourceDescriptor generateDoFn =
- (KafkaIO.Read.GenerateKafkaSourceDescriptor)
- ParDoTranslation.getDoFn(
- RunnerApi.ParDoPayload.parseFrom(
- result
- .getComponents()
-
.getTransformsOrThrow(generateParDo.getSubtransforms(0))
- .getSpec()
- .getPayload()));
- assertThat(generateDoFn.consumerConfig, Matchers.is(consumerConfig));
- assertThat(generateDoFn.topics, Matchers.is(topics));
-
- // Verify that the consumerConfig, keyDeserializerProvider,
valueDeserializerProvider are
- // populated correctly to the SDF.
- RunnerApi.PTransform readViaSDF =
-
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(2));
- RunnerApi.PTransform subTransform =
-
result.getComponents().getTransformsOrThrow(readViaSDF.getSubtransforms(0));
-
- ReadFromKafkaDoFn readSDF =
- (ReadFromKafkaDoFn)
- ParDoTranslation.getDoFn(
- RunnerApi.ParDoPayload.parseFrom(
- result
- .getComponents()
- .getTransformsOrThrow(subTransform.getSubtransforms(0))
- .getSpec()
- .getPayload()));
-
- assertThat(readSDF.consumerConfig, Matchers.is(consumerConfig));
+ RunnerApi.PTransform kafkaRead =
+
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(0));
+ RunnerApi.ReadPayload readPayload =
+ RunnerApi.ReadPayload.parseFrom(kafkaRead.getSpec().getPayload());
+ KafkaUnboundedSource source =
+ (KafkaUnboundedSource)
ReadTranslation.unboundedSourceFromProto(readPayload);
+ KafkaIO.Read spec = source.getSpec();
+
+ assertThat(spec.getConsumerConfig(), Matchers.is(consumerConfig));
+ assertThat(spec.getTopics(), Matchers.is(topics));
assertThat(
- readSDF
- .keyDeserializerProvider
- .getDeserializer(readSDF.consumerConfig, true)
+ spec.getKeyDeserializerProvider()
+ .getDeserializer(spec.getConsumerConfig(), true)
.getClass()
.getName(),
Matchers.is(keyDeserializer));
assertThat(
- readSDF
- .valueDeserializerProvider
- .getDeserializer(readSDF.consumerConfig, false)
+ spec.getValueDeserializerProvider()
+ .getDeserializer(spec.getConsumerConfig(), false)
.getClass()
.getName(),
Matchers.is(valueDeserializer));