This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 254519b857a Add Lineage metrics to KafkaIO (#32170)
254519b857a is described below

commit 254519b857ae36aacf739fd08a1a341cd6b24211
Author: Yi Hu <[email protected]>
AuthorDate: Tue Aug 20 15:30:25 2024 -0400

    Add Lineage metrics to KafkaIO (#32170)
    
    * Add Lineage metrics to KafkaIO
    
    * Add asserts in tests
---
 .../beam/sdk/io/kafka/KafkaExactlyOnceSink.java    | 16 +++++
 .../beam/sdk/io/kafka/KafkaUnboundedSource.java    | 13 ++++
 .../org/apache/beam/sdk/io/kafka/KafkaWriter.java  | 18 +++++-
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 28 +++++----
 ...KafkaIOReadImplementationCompatibilityTest.java | 23 +++++--
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 73 ++++++++++++++++------
 6 files changed, 133 insertions(+), 38 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index ced1e24a8c2..4ffc880d013 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.state.BagState;
@@ -458,6 +459,7 @@ class KafkaExactlyOnceSink<K, V>
       private final String producerName;
       private final WriteRecords<K, V> spec;
       private long committedId;
+      private transient boolean reportedLineage;
 
       ShardWriter(
           int shard,
@@ -524,6 +526,20 @@ class KafkaExactlyOnceSink<K, V>
           ProducerSpEL.commitTransaction(producer);
 
           numTransactions.inc();
+          if (!reportedLineage) {
+            Lineage.getSinks()
+                .add(
+                    "kafka",
+                    ImmutableList.of(
+                        // withBootstrapServers() was required in 
WriteRecord.expand, expect to be
+                        // non-null
+                        (String)
+                            Preconditions.checkStateNotNull(
+                                spec.getProducerConfig()
+                                    
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)),
+                        topic));
+            reportedLineage = true;
+          }
           LOG.debug("{} : committed {} records", shard, lastRecordId - 
committedId);
 
           committedId = lastRecordId;
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 5d8a2556e47..9685d859b0a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -30,10 +30,13 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,6 +68,10 @@ class KafkaUnboundedSource<K, V> extends 
UnboundedSource<KafkaRecord<K, V>, Kafk
     // (b) sort by <topic, partition>
     // (c) round-robin assign the partitions to splits
 
+    String bootStrapServers =
+        (String)
+            Preconditions.checkArgumentNotNull(
+                
spec.getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
     if (partitions.isEmpty()) {
       try (Consumer<?, ?> consumer = 
spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
         List<String> topics = 
Preconditions.checkStateNotNull(spec.getTopics());
@@ -74,6 +81,7 @@ class KafkaUnboundedSource<K, V> extends 
UnboundedSource<KafkaRecord<K, V>, Kafk
             if (pattern.matcher(entry.getKey()).matches()) {
               for (PartitionInfo p : entry.getValue()) {
                 partitions.add(new TopicPartition(p.topic(), p.partition()));
+                Lineage.getSources().add("kafka", 
ImmutableList.of(bootStrapServers, p.topic()));
               }
             }
           }
@@ -87,9 +95,14 @@ class KafkaUnboundedSource<K, V> extends 
UnboundedSource<KafkaRecord<K, V>, Kafk
             for (PartitionInfo p : partitionInfoList) {
               partitions.add(new TopicPartition(p.topic(), p.partition()));
             }
+            Lineage.getSources().add("kafka", 
ImmutableList.of(bootStrapServers, topic));
           }
         }
       }
+    } else {
+      for (TopicPartition p : partitions) {
+        Lineage.getSources().add("kafka", ImmutableList.of(bootStrapServers, 
p.topic()));
+      }
     }
 
     partitions.sort(
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 4f4663aa8cc..6ac819e5705 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -23,10 +23,12 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
 import org.apache.beam.sdk.util.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -74,7 +76,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, 
Void> {
 
     @Nullable String topicName = record.topic();
     if (topicName == null) {
-      topicName = spec.getTopic();
+      topicName = Preconditions.checkStateNotNull(spec.getTopic());
     }
 
     try {
@@ -91,6 +93,19 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, 
Void> {
               callback);
 
       elementsWritten.inc();
+      if (!topicName.equals(reportedLineage)) {
+
+        Lineage.getSinks()
+            .add(
+                "kafka",
+                // withBootstrapServers() was required in WriteRecord.expand, 
expect to be non-null
+                ImmutableList.of(
+                    (String)
+                        Preconditions.checkStateNotNull(
+                            
producerConfig.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)),
+                    topicName));
+        reportedLineage = topicName;
+      }
     } catch (SerializationException e) {
       // This exception should only occur during the key and value 
deserialization when
       // creating the Kafka Record. We can catch the exception here as 
producer.send serializes
@@ -129,6 +144,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, 
Void> {
   private transient @Nullable Producer<K, V> producer = null;
   // first exception and number of failures since last invocation of 
checkForFailures():
   private transient @Nullable Exception sendException = null;
+  private transient @Nullable String reportedLineage;
   private transient long numSendFailures = 0;
 
   private final Counter elementsWritten = SinkMetrics.elementsWritten();
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index d6ec9015a95..9bb950bb8e6 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -33,6 +33,7 @@ import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -49,6 +50,7 @@ import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
@@ -289,22 +291,18 @@ abstract class ReadFromKafkaDoFn<K, V>
   public OffsetRange initialRestriction(@Element KafkaSourceDescriptor 
kafkaSourceDescriptor) {
     Map<String, Object> updatedConsumerConfig =
         overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    LOG.info(
-        "Creating Kafka consumer for initial restriction for {}",
-        kafkaSourceDescriptor.getTopicPartition());
+    TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
+    LOG.info("Creating Kafka consumer for initial restriction for {}", 
partition);
     try (Consumer<byte[], byte[]> offsetConsumer = 
consumerFactoryFn.apply(updatedConsumerConfig)) {
-      ConsumerSpEL.evaluateAssign(
-          offsetConsumer, 
ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
+      ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));
       long startOffset;
       @Nullable Instant startReadTime = 
kafkaSourceDescriptor.getStartReadTime();
       if (kafkaSourceDescriptor.getStartReadOffset() != null) {
         startOffset = kafkaSourceDescriptor.getStartReadOffset();
       } else if (startReadTime != null) {
-        startOffset =
-            ConsumerSpEL.offsetForTime(
-                offsetConsumer, kafkaSourceDescriptor.getTopicPartition(), 
startReadTime);
+        startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, 
startReadTime);
       } else {
-        startOffset = 
offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition());
+        startOffset = offsetConsumer.position(partition);
       }
 
       long endOffset = Long.MAX_VALUE;
@@ -312,11 +310,15 @@ abstract class ReadFromKafkaDoFn<K, V>
       if (kafkaSourceDescriptor.getStopReadOffset() != null) {
         endOffset = kafkaSourceDescriptor.getStopReadOffset();
       } else if (stopReadTime != null) {
-        endOffset =
-            ConsumerSpEL.offsetForTime(
-                offsetConsumer, kafkaSourceDescriptor.getTopicPartition(), 
stopReadTime);
+        endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, 
stopReadTime);
       }
-
+      new OffsetRange(startOffset, endOffset);
+      Lineage.getSources()
+          .add(
+              "kafka",
+              ImmutableList.of(
+                  (String) 
updatedConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+                  MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), 
partition.topic())));
       return new OffsetRange(startOffset, endOffset);
     }
   }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
index ae939d66c21..74f1e83fd86 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
 
 import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransform;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -29,8 +30,10 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.beam.sdk.PipelineResult;
 import 
org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadProperties;
 import org.apache.beam.sdk.io.kafka.KafkaIOTest.ValueAsTimestampFn;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.testing.TestPipeline;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -101,12 +104,12 @@ public class KafkaIOReadImplementationCompatibilityTest {
     }
   }
 
-  private void testReadTransformCreationWithImplementationBoundProperties(
+  private PipelineResult 
testReadTransformCreationWithImplementationBoundProperties(
       Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>> 
kafkaReadDecorator) {
     p.apply(
         kafkaReadDecorator.apply(
             mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false, 
0)));
-    p.run();
+    return p.run();
   }
 
   private Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>>
@@ -121,12 +124,24 @@ public class KafkaIOReadImplementationCompatibilityTest {
 
   @Test
   public void testReadTransformCreationWithLegacyImplementationBoundProperty() 
{
-    
testReadTransformCreationWithImplementationBoundProperties(legacyDecoratorFunction());
+    PipelineResult r =
+        
testReadTransformCreationWithImplementationBoundProperties(legacyDecoratorFunction());
+    String[] expect =
+        KafkaIOTest.mkKafkaTopics.stream()
+            .map(topic -> String.format("kafka:`%s`.%s", 
KafkaIOTest.mkKafkaServers, topic))
+            .toArray(String[]::new);
+    assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), 
containsInAnyOrder(expect));
   }
 
   @Test
   public void testReadTransformCreationWithSdfImplementationBoundProperty() {
-    
testReadTransformCreationWithImplementationBoundProperties(sdfDecoratorFunction());
+    PipelineResult r =
+        
testReadTransformCreationWithImplementationBoundProperties(sdfDecoratorFunction());
+    String[] expect =
+        KafkaIOTest.mkKafkaTopics.stream()
+            .map(topic -> String.format("kafka:`%s`.%s", 
KafkaIOTest.mkKafkaServers, topic))
+            .toArray(String[]::new);
+    assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), 
containsInAnyOrder(expect));
   }
 
   @Test
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 73aee5aeeef..1fe1147a739 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -21,11 +21,13 @@ import static 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerPr
 import static 
org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -77,6 +79,7 @@ import 
org.apache.beam.sdk.io.kafka.KafkaIO.Read.FakeFlinkPipelineOptions;
 import org.apache.beam.sdk.io.kafka.KafkaMocks.PositionErrorConsumerFactory;
 import org.apache.beam.sdk.io.kafka.KafkaMocks.SendErrorProducerFactory;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -185,6 +188,8 @@ public class KafkaIOTest {
   private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000);
   private static final String TIMESTAMP_START_MILLIS_CONFIG = 
"test.timestamp.start.millis";
   private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";
+  static List<String> mkKafkaTopics = ImmutableList.of("topic_a", "topic_b");
+  static String mkKafkaServers = "myServer1:9092,myServer2:9092";
 
   // Update mock consumer with records distributed among the given topics, 
each with given number
   // of partitions. Records are assigned in round-robin order among the 
partitions.
@@ -390,15 +395,13 @@ public class KafkaIOTest {
       @Nullable Boolean redistribute,
       @Nullable Integer numKeys) {
 
-    List<String> topics = ImmutableList.of("topic_a", "topic_b");
-
     KafkaIO.Read<Integer, Long> reader =
         KafkaIO.<Integer, Long>read()
-            .withBootstrapServers("myServer1:9092,myServer2:9092")
-            .withTopics(topics)
+            .withBootstrapServers(mkKafkaServers)
+            .withTopics(mkKafkaTopics)
             .withConsumerFactoryFn(
                 new ConsumerFactoryFn(
-                    topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 
20 partitions
+                    mkKafkaTopics, 10, numElements, 
OffsetResetStrategy.EARLIEST)) // 20 partitions
             .withKeyDeserializer(IntegerDeserializer.class)
             .withValueDeserializer(LongDeserializer.class);
     if (maxNumRecords != null) {
@@ -729,10 +732,11 @@ public class KafkaIOTest {
 
     int numElements = 1000;
     String topic = "my_topic";
+    String bootStrapServer = "none";
 
     KafkaIO.Read<Integer, Long> reader =
         KafkaIO.<Integer, Long>read()
-            .withBootstrapServers("none")
+            .withBootstrapServers(bootStrapServer)
             .withTopic("my_topic")
             .withConsumerFactoryFn(
                 new ConsumerFactoryFn(
@@ -744,19 +748,24 @@ public class KafkaIOTest {
     PCollection<Long> input = 
p.apply(reader.withoutMetadata()).apply(Values.create());
 
     addCountingAsserts(input, numElements);
-    p.run();
+    PipelineResult result = p.run();
+    assertThat(
+        Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+        hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
   }
 
   @Test
   public void testUnboundedSourceWithExplicitPartitions() {
     int numElements = 1000;
 
-    List<String> topics = ImmutableList.of("test");
+    String topic = "test";
+    List<String> topics = ImmutableList.of(topic);
+    String bootStrapServer = "none";
 
     KafkaIO.Read<byte[], Long> reader =
         KafkaIO.<byte[], Long>read()
-            .withBootstrapServers("none")
-            .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 
5)))
+            .withBootstrapServers(bootStrapServer)
+            .withTopicPartitions(ImmutableList.of(new TopicPartition(topic, 
5)))
             .withConsumerFactoryFn(
                 new ConsumerFactoryFn(
                     topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 
10 partitions
@@ -771,7 +780,10 @@ public class KafkaIOTest {
 
     PAssert.thatSingleton(input.apply(Count.globally())).isEqualTo(numElements 
/ 10L);
 
-    p.run();
+    PipelineResult result = p.run();
+    assertThat(
+        Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+        hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
   }
 
   @Test
@@ -782,6 +794,7 @@ public class KafkaIOTest {
         ImmutableList.of(
             "best", "gest", "hest", "jest", "lest", "nest", "pest", "rest", 
"test", "vest", "west",
             "zest");
+    String bootStrapServer = "none";
 
     KafkaIO.Read<byte[], Long> reader =
         KafkaIO.<byte[], Long>read()
@@ -796,7 +809,12 @@ public class KafkaIOTest {
     PCollection<Long> input = 
p.apply(reader.withoutMetadata()).apply(Values.create());
 
     addCountingAsserts(input, numElements);
-    p.run();
+    PipelineResult result = p.run();
+    String[] expect =
+        topics.stream()
+            .map(topic -> String.format("kafka:%s.%s", bootStrapServer, topic))
+            .toArray(String[]::new);
+    assertThat(Lineage.query(result.metrics(), Lineage.Type.SOURCE), 
containsInAnyOrder(expect));
   }
 
   @Test
@@ -805,10 +823,11 @@ public class KafkaIOTest {
     long numMatchedElements = numElements / 2; // Expected elements if split 
across 2 topics
 
     List<String> topics = ImmutableList.of("test", "Test");
+    String bootStrapServer = "none";
 
     KafkaIO.Read<byte[], Long> reader =
         KafkaIO.<byte[], Long>read()
-            .withBootstrapServers("none")
+            .withBootstrapServers(bootStrapServer)
             .withTopicPattern("[a-z]est")
             .withConsumerFactoryFn(
                 new ConsumerFactoryFn(topics, 1, numElements, 
OffsetResetStrategy.EARLIEST))
@@ -825,7 +844,13 @@ public class KafkaIOTest {
 
     PAssert.thatSingleton(input.apply("Count", 
Count.globally())).isEqualTo(numMatchedElements);
 
-    p.run();
+    PipelineResult result = p.run();
+    assertThat(
+        Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+        hasItem(String.format("kafka:%s.test", bootStrapServer)));
+    assertThat(
+        Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+        not(hasItem(String.format("kafka:%s.Test", bootStrapServer))));
   }
 
   @Test
@@ -1435,22 +1460,26 @@ public class KafkaIOTest {
           new 
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
 
       String topic = "test";
+      String bootStrapServer = "none";
 
       p.apply(mkKafkaReadTransform(numElements, new 
ValueAsTimestampFn()).withoutMetadata())
           .apply(
               KafkaIO.<Integer, Long>write()
-                  .withBootstrapServers("none")
+                  .withBootstrapServers(bootStrapServer)
                   .withTopic(topic)
                   .withKeySerializer(IntegerSerializer.class)
                   .withValueSerializer(LongSerializer.class)
                   .withInputTimestamp()
                   .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
 
-      p.run();
+      PipelineResult result = p.run();
 
       completionThread.shutdown();
 
       verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false, true);
+      assertThat(
+          Lineage.query(result.metrics(), Lineage.Type.SINK),
+          hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
     }
   }
 
@@ -1836,27 +1865,31 @@ public class KafkaIOTest {
       ProducerSendCompletionThread completionThread =
           new 
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
 
-      String topic = "test";
+      String topic = "test-eos";
+      String bootStrapServer = "none";
 
       p.apply(mkKafkaReadTransform(numElements, new 
ValueAsTimestampFn()).withoutMetadata())
           .apply(
               KafkaIO.<Integer, Long>write()
-                  .withBootstrapServers("none")
+                  .withBootstrapServers(bootStrapServer)
                   .withTopic(topic)
                   .withKeySerializer(IntegerSerializer.class)
                   .withValueSerializer(LongSerializer.class)
-                  .withEOS(1, "test")
+                  .withEOS(1, "test-eos")
                   .withConsumerFactoryFn(
                       new ConsumerFactoryFn(
                           Lists.newArrayList(topic), 10, 10, 
OffsetResetStrategy.EARLIEST))
                   .withPublishTimestampFunction((e, ts) -> ts)
                   .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
 
-      p.run();
+      PipelineResult result = p.run();
 
       completionThread.shutdown();
 
       verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false, true);
+      assertThat(
+          Lineage.query(result.metrics(), Lineage.Type.SINK),
+          hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
     }
   }
 

Reply via email to