This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f2cc926  Eliminate beam_fn_api from KafkaIO expansion
     new 3b85447  Merge pull request #14419 from [BEAM-12114] Eliminate 
beam_fn_api from KafkaIO expansion
f2cc926 is described below

commit f2cc92663ad8ae685183e076cdb652d8fc3ba4e0
Author: Boyuan Zhang <[email protected]>
AuthorDate: Fri Apr 2 15:20:53 2021 -0700

    Eliminate beam_fn_api from KafkaIO expansion
---
 runners/google-cloud-dataflow-java/build.gradle    |   1 +
 .../beam/runners/dataflow/DataflowRunner.java      |   4 +
 .../SparkStructuredStreamingRunner.java            |   3 +
 runners/spark/spark_runner.gradle                  |   1 +
 .../org/apache/beam/runners/spark/SparkRunner.java |   3 +
 .../beam/runners/spark/SparkRunnerDebugger.java    |   3 +
 sdks/java/io/kafka/build.gradle                    |   1 +
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 176 +++++++++++++++------
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     |   4 +-
 9 files changed, 147 insertions(+), 49 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 290ea94..476e8c5 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -72,6 +72,7 @@ dependencies {
   compile project(path: ":model:pipeline", configuration: "shadow")
   compile project(path: ":sdks:java:core", configuration: "shadow")
   compile project(":sdks:java:extensions:google-cloud-platform-core")
+  compile project(":sdks:java:io:kafka")
   compile project(":sdks:java:io:google-cloud-platform")
   compile project(":runners:core-construction-java")
   compile library.java.avro
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c81631e..0a79cd9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -115,6 +115,7 @@ import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageId
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -491,6 +492,9 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
                   new StreamingPubsubIOWriteOverrideFactory(this)));
         }
       }
+      if (useUnifiedWorker(options)) {
+        overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
+      }
       overridesBuilder.add(
           PTransformOverride.of(
               PTransformMatchers.writeWithRunnerDeterminedSharding(),
diff --git 
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
 
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index f08c36b..5d8230e 100644
--- 
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++ 
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -37,12 +37,14 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.Pipel
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.metrics.MetricsOptions;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.metrics.MetricsSystem;
@@ -193,6 +195,7 @@ public final class SparkStructuredStreamingRunner
         || ExperimentalOptions.hasExperiment(
             pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
         || ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_deprecated_read")) {
+      pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
       
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
     }
 
diff --git a/runners/spark/spark_runner.gradle 
b/runners/spark/spark_runner.gradle
index fe2393e..38519d0 100644
--- a/runners/spark/spark_runner.gradle
+++ b/runners/spark/spark_runner.gradle
@@ -152,6 +152,7 @@ dependencies {
   compile project(":runners:core-java")
   compile project(":runners:java-fn-execution")
   compile project(":runners:java-job-service")
+  compile project(":sdks:java:io:kafka")
   compile project(":sdks:java:extensions:google-cloud-platform-core")
   compile library.java.jackson_annotations
   compile library.java.slf4j_api
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 364e549..60a113e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -48,6 +48,7 @@ import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvanci
 import org.apache.beam.runners.spark.util.SparkCompat;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.metrics.MetricsOptions;
 import org.apache.beam.sdk.options.ExperimentalOptions;
@@ -66,6 +67,7 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -181,6 +183,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
         || ExperimentalOptions.hasExperiment(
             pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
         || ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_deprecated_read")) {
+      pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
       
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
     }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 33b8408..37d9d54 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -26,9 +26,11 @@ import 
org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.joda.time.Duration;
@@ -85,6 +87,7 @@ public final class SparkRunnerDebugger extends 
PipelineRunner<SparkPipelineResul
         || ExperimentalOptions.hasExperiment(
             pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
         || ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_deprecated_read")) {
+      pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
       
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
     }
     JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index df76a82..7a4ca21 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -48,6 +48,7 @@ kafkaVersions.each{k,v -> 
configurations.create("kafkaVersion$k")}
 dependencies {
   compile library.java.vendored_guava_26_0_jre
   compile project(path: ":sdks:java:core", configuration: "shadow")
+  compile project(":runners:core-construction-java")
   compile project(":sdks:java:expansion-service")
   permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
   compile library.java.avro
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 06d4a5d..8b6058c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -33,8 +33,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -51,6 +54,9 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -72,6 +78,7 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -1209,67 +1216,144 @@ public class KafkaIO {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      // The Read will be expanded into SDF transform when "beam_fn_api" is 
enabled.
-      if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")
-          || ExperimentalOptions.hasExperiment(
+      // For read from unbounded in a bounded manner, we actually are not 
going through Read or SDF.
+      if (ExperimentalOptions.hasExperiment(
               input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")
+          || ExperimentalOptions.hasExperiment(
+              input.getPipeline().getOptions(), "use_deprecated_read")
           || getMaxNumRecords() < Long.MAX_VALUE
           || getMaxReadTime() != null) {
+        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, 
valueCoder));
+      }
+      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, 
valueCoder));
+    }
+
+    /**
+     * A {@link PTransformOverride} for runners to swap {@link 
ReadFromKafkaViaSDF} to legacy Kafka
+     * read if runners doesn't have a good support on executing unbounded 
Splittable DoFn.
+     */
+    @Internal
+    public static final PTransformOverride KAFKA_READ_OVERRIDE =
+        PTransformOverride.of(
+            PTransformMatchers.classEqualTo(ReadFromKafkaViaSDF.class),
+            new KafkaReadOverrideFactory<>());
+
+    private static class KafkaReadOverrideFactory<K, V>
+        implements PTransformOverrideFactory<
+            PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> 
{
+
+      @Override
+      public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> 
getReplacementTransform(
+          AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, 
ReadFromKafkaViaSDF<K, V>>
+              transform) {
+        return PTransformReplacement.of(
+            transform.getPipeline().begin(),
+            new ReadFromKafkaViaUnbounded<>(
+                transform.getTransform().kafkaRead,
+                transform.getTransform().keyCoder,
+                transform.getTransform().valueCoder));
+      }
+
+      @Override
+      public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+          Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, 
V>> newOutput) {
+        return ReplacementOutputs.singleton(outputs, newOutput);
+      }
+    }
+
+    private static class ReadFromKafkaViaUnbounded<K, V>
+        extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+      Read<K, V> kafkaRead;
+      Coder<K> keyCoder;
+      Coder<V> valueCoder;
+
+      ReadFromKafkaViaUnbounded(Read<K, V> kafkaRead, Coder<K> keyCoder, 
Coder<V> valueCoder) {
+        this.kafkaRead = kafkaRead;
+        this.keyCoder = keyCoder;
+        this.valueCoder = valueCoder;
+      }
+
+      @Override
+      public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
         // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
         Unbounded<KafkaRecord<K, V>> unbounded =
             org.apache.beam.sdk.io.Read.from(
-                
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+                kafkaRead
+                    .toBuilder()
+                    .setKeyCoder(keyCoder)
+                    .setValueCoder(valueCoder)
+                    .build()
+                    .makeSource());
 
         PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = 
unbounded;
 
-        if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
+        if (kafkaRead.getMaxNumRecords() < Long.MAX_VALUE || 
kafkaRead.getMaxReadTime() != null) {
           transform =
-              
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
+              unbounded
+                  .withMaxReadTime(kafkaRead.getMaxReadTime())
+                  .withMaxNumRecords(kafkaRead.getMaxNumRecords());
         }
 
         return input.getPipeline().apply(transform);
       }
-      ReadSourceDescriptors<K, V> readTransform =
-          ReadSourceDescriptors.<K, V>read()
-              .withConsumerConfigOverrides(getConsumerConfig())
-              .withOffsetConsumerConfigOverrides(getOffsetConsumerConfig())
-              .withConsumerFactoryFn(getConsumerFactoryFn())
-              .withKeyDeserializerProvider(getKeyDeserializerProvider())
-              .withValueDeserializerProvider(getValueDeserializerProvider())
-              .withManualWatermarkEstimator()
-              .withTimestampPolicyFactory(getTimestampPolicyFactory())
-              .withCheckStopReadingFn(getCheckStopReadingFn());
-      if (isCommitOffsetsInFinalizeEnabled()) {
-        readTransform = readTransform.commitOffsets();
+    }
+
+    static class ReadFromKafkaViaSDF<K, V>
+        extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+      Read<K, V> kafkaRead;
+      Coder<K> keyCoder;
+      Coder<V> valueCoder;
+
+      ReadFromKafkaViaSDF(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> 
valueCoder) {
+        this.kafkaRead = kafkaRead;
+        this.keyCoder = keyCoder;
+        this.valueCoder = valueCoder;
       }
-      PCollection<KafkaSourceDescriptor> output;
-      if (isDynamicRead()) {
-        output =
-            input
-                .getPipeline()
-                .apply(Impulse.create())
-                .apply(
-                    MapElements.into(
-                            TypeDescriptors.kvs(
-                                new TypeDescriptor<byte[]>() {}, new 
TypeDescriptor<byte[]>() {}))
-                        .via(element -> KV.of(element, element)))
-                .apply(
-                    ParDo.of(
-                        new WatchKafkaTopicPartitionDoFn(
-                            getWatchTopicPartitionDuration(),
-                            getConsumerFactoryFn(),
-                            getCheckStopReadingFn(),
-                            getConsumerConfig(),
-                            getStartReadTime())));
 
-      } else {
-        output =
-            input
-                .getPipeline()
-                .apply(Impulse.create())
-                .apply(ParDo.of(new GenerateKafkaSourceDescriptor(this)));
+      @Override
+      public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
+        ReadSourceDescriptors<K, V> readTransform =
+            ReadSourceDescriptors.<K, V>read()
+                .withConsumerConfigOverrides(kafkaRead.getConsumerConfig())
+                
.withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig())
+                .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn())
+                
.withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider())
+                
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
+                .withManualWatermarkEstimator()
+                
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
+                .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
+        if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
+          readTransform = readTransform.commitOffsets();
+        }
+        PCollection<KafkaSourceDescriptor> output;
+        if (kafkaRead.isDynamicRead()) {
+          output =
+              input
+                  .getPipeline()
+                  .apply(Impulse.create())
+                  .apply(
+                      MapElements.into(
+                              TypeDescriptors.kvs(
+                                  new TypeDescriptor<byte[]>() {}, new 
TypeDescriptor<byte[]>() {}))
+                          .via(element -> KV.of(element, element)))
+                  .apply(
+                      ParDo.of(
+                          new WatchKafkaTopicPartitionDoFn(
+                              kafkaRead.getWatchTopicPartitionDuration(),
+                              kafkaRead.getConsumerFactoryFn(),
+                              kafkaRead.getCheckStopReadingFn(),
+                              kafkaRead.getConsumerConfig(),
+                              kafkaRead.getStartReadTime())));
+
+        } else {
+          output =
+              input
+                  .getPipeline()
+                  .apply(Impulse.create())
+                  .apply(ParDo.of(new 
GenerateKafkaSourceDescriptor(kafkaRead)));
+        }
+        return 
output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
       }
-      return 
output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
     }
 
     /**
@@ -1798,10 +1882,6 @@ public class KafkaIO {
 
     @Override
     public PCollection<KafkaRecord<K, V>> 
expand(PCollection<KafkaSourceDescriptor> input) {
-      checkArgument(
-          ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api"),
-          "The ReadSourceDescriptors can only used when beam_fn_api is 
enabled.");
-
       checkArgument(getKeyDeserializerProvider() != null, 
"withKeyDeserializer() is required");
       checkArgument(getValueDeserializerProvider() != null, 
"withValueDeserializer() is required");
 
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 108d858..bb15e42 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -128,8 +128,10 @@ public class KafkaIOExternalTest {
     assertThat(transform.getInputsCount(), Matchers.is(0));
     assertThat(transform.getOutputsCount(), Matchers.is(1));
 
-    RunnerApi.PTransform kafkaComposite =
+    RunnerApi.PTransform kafkaReadComposite =
         
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
+    RunnerApi.PTransform kafkaComposite =
+        
result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0));
     assertThat(
         kafkaComposite.getSubtransformsList(),
         Matchers.hasItem(MatchesPattern.matchesPattern(".*Impulse.*")));

Reply via email to