This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 254519b857a Add Lineage metrics to KafkaIO (#32170)
254519b857a is described below
commit 254519b857ae36aacf739fd08a1a341cd6b24211
Author: Yi Hu <[email protected]>
AuthorDate: Tue Aug 20 15:30:25 2024 -0400
Add Lineage metrics to KafkaIO (#32170)
* Add Lineage metrics to KafkaIO
* Add asserts in tests
---
.../beam/sdk/io/kafka/KafkaExactlyOnceSink.java | 16 +++++
.../beam/sdk/io/kafka/KafkaUnboundedSource.java | 13 ++++
.../org/apache/beam/sdk/io/kafka/KafkaWriter.java | 18 +++++-
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 28 +++++----
...KafkaIOReadImplementationCompatibilityTest.java | 23 +++++--
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 73 ++++++++++++++++------
6 files changed, 133 insertions(+), 38 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index ced1e24a8c2..4ffc880d013 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.state.BagState;
@@ -458,6 +459,7 @@ class KafkaExactlyOnceSink<K, V>
private final String producerName;
private final WriteRecords<K, V> spec;
private long committedId;
+ private transient boolean reportedLineage;
ShardWriter(
int shard,
@@ -524,6 +526,20 @@ class KafkaExactlyOnceSink<K, V>
ProducerSpEL.commitTransaction(producer);
numTransactions.inc();
+ if (!reportedLineage) {
+ Lineage.getSinks()
+ .add(
+ "kafka",
+ ImmutableList.of(
+ // withBootstrapServers() was required in
WriteRecord.expand, expect to be
+ // non-null
+ (String)
+ Preconditions.checkStateNotNull(
+ spec.getProducerConfig()
+
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)),
+ topic));
+ reportedLineage = true;
+ }
LOG.debug("{} : committed {} records", shard, lastRecordId -
committedId);
committedId = lastRecordId;
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 5d8a2556e47..9685d859b0a 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -30,10 +30,13 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,6 +68,10 @@ class KafkaUnboundedSource<K, V> extends
UnboundedSource<KafkaRecord<K, V>, Kafk
// (b) sort by <topic, partition>
// (c) round-robin assign the partitions to splits
+ String bootStrapServers =
+ (String)
+ Preconditions.checkArgumentNotNull(
+
spec.getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer =
spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
List<String> topics =
Preconditions.checkStateNotNull(spec.getTopics());
@@ -74,6 +81,7 @@ class KafkaUnboundedSource<K, V> extends
UnboundedSource<KafkaRecord<K, V>, Kafk
if (pattern.matcher(entry.getKey()).matches()) {
for (PartitionInfo p : entry.getValue()) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
+ Lineage.getSources().add("kafka",
ImmutableList.of(bootStrapServers, p.topic()));
}
}
}
@@ -87,9 +95,14 @@ class KafkaUnboundedSource<K, V> extends
UnboundedSource<KafkaRecord<K, V>, Kafk
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
+ Lineage.getSources().add("kafka",
ImmutableList.of(bootStrapServers, topic));
}
}
}
+ } else {
+ for (TopicPartition p : partitions) {
+ Lineage.getSources().add("kafka", ImmutableList.of(bootStrapServers,
p.topic()));
+ }
}
partitions.sort(
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 4f4663aa8cc..6ac819e5705 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -23,10 +23,12 @@ import java.util.Map;
import java.util.concurrent.Future;
import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.util.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -74,7 +76,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>,
Void> {
@Nullable String topicName = record.topic();
if (topicName == null) {
- topicName = spec.getTopic();
+ topicName = Preconditions.checkStateNotNull(spec.getTopic());
}
try {
@@ -91,6 +93,19 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>,
Void> {
callback);
elementsWritten.inc();
+ if (!topicName.equals(reportedLineage)) {
+
+ Lineage.getSinks()
+ .add(
+ "kafka",
+ // withBootstrapServers() was required in WriteRecord.expand,
expect to be non-null
+ ImmutableList.of(
+ (String)
+ Preconditions.checkStateNotNull(
+
producerConfig.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)),
+ topicName));
+ reportedLineage = topicName;
+ }
} catch (SerializationException e) {
// This exception should only occur during the key and value
deserialization when
// creating the Kafka Record. We can catch the exception here as
producer.send serializes
@@ -129,6 +144,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>,
Void> {
private transient @Nullable Producer<K, V> producer = null;
// first exception and number of failures since last invocation of
checkForFailures():
private transient @Nullable Exception sendException = null;
+ private transient @Nullable String reportedLineage;
private transient long numSendFailures = 0;
private final Counter elementsWritten = SinkMetrics.elementsWritten();
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index d6ec9015a95..9bb950bb8e6 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -33,6 +33,7 @@ import
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -49,6 +50,7 @@ import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
@@ -289,22 +291,18 @@ abstract class ReadFromKafkaDoFn<K, V>
public OffsetRange initialRestriction(@Element KafkaSourceDescriptor
kafkaSourceDescriptor) {
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
- LOG.info(
- "Creating Kafka consumer for initial restriction for {}",
- kafkaSourceDescriptor.getTopicPartition());
+ TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
+ LOG.info("Creating Kafka consumer for initial restriction for {}",
partition);
try (Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(updatedConsumerConfig)) {
- ConsumerSpEL.evaluateAssign(
- offsetConsumer,
ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
+ ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));
long startOffset;
@Nullable Instant startReadTime =
kafkaSourceDescriptor.getStartReadTime();
if (kafkaSourceDescriptor.getStartReadOffset() != null) {
startOffset = kafkaSourceDescriptor.getStartReadOffset();
} else if (startReadTime != null) {
- startOffset =
- ConsumerSpEL.offsetForTime(
- offsetConsumer, kafkaSourceDescriptor.getTopicPartition(),
startReadTime);
+ startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition,
startReadTime);
} else {
- startOffset =
offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition());
+ startOffset = offsetConsumer.position(partition);
}
long endOffset = Long.MAX_VALUE;
@@ -312,11 +310,15 @@ abstract class ReadFromKafkaDoFn<K, V>
if (kafkaSourceDescriptor.getStopReadOffset() != null) {
endOffset = kafkaSourceDescriptor.getStopReadOffset();
} else if (stopReadTime != null) {
- endOffset =
- ConsumerSpEL.offsetForTime(
- offsetConsumer, kafkaSourceDescriptor.getTopicPartition(),
stopReadTime);
+ endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition,
stopReadTime);
}
-
+ new OffsetRange(startOffset, endOffset);
+ Lineage.getSources()
+ .add(
+ "kafka",
+ ImmutableList.of(
+ (String)
updatedConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+ MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(),
partition.topic())));
return new OffsetRange(startOffset, endOffset);
}
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
index ae939d66c21..74f1e83fd86 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransform;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -29,8 +30,10 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.beam.sdk.PipelineResult;
import
org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadProperties;
import org.apache.beam.sdk.io.kafka.KafkaIOTest.ValueAsTimestampFn;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.testing.TestPipeline;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -101,12 +104,12 @@ public class KafkaIOReadImplementationCompatibilityTest {
}
}
- private void testReadTransformCreationWithImplementationBoundProperties(
+ private PipelineResult
testReadTransformCreationWithImplementationBoundProperties(
Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>>
kafkaReadDecorator) {
p.apply(
kafkaReadDecorator.apply(
mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false,
0)));
- p.run();
+ return p.run();
}
private Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>>
@@ -121,12 +124,24 @@ public class KafkaIOReadImplementationCompatibilityTest {
@Test
public void testReadTransformCreationWithLegacyImplementationBoundProperty()
{
-
testReadTransformCreationWithImplementationBoundProperties(legacyDecoratorFunction());
+ PipelineResult r =
+
testReadTransformCreationWithImplementationBoundProperties(legacyDecoratorFunction());
+ String[] expect =
+ KafkaIOTest.mkKafkaTopics.stream()
+ .map(topic -> String.format("kafka:`%s`.%s",
KafkaIOTest.mkKafkaServers, topic))
+ .toArray(String[]::new);
+ assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE),
containsInAnyOrder(expect));
}
@Test
public void testReadTransformCreationWithSdfImplementationBoundProperty() {
-
testReadTransformCreationWithImplementationBoundProperties(sdfDecoratorFunction());
+ PipelineResult r =
+
testReadTransformCreationWithImplementationBoundProperties(sdfDecoratorFunction());
+ String[] expect =
+ KafkaIOTest.mkKafkaTopics.stream()
+ .map(topic -> String.format("kafka:`%s`.%s",
KafkaIOTest.mkKafkaServers, topic))
+ .toArray(String[]::new);
+ assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE),
containsInAnyOrder(expect));
}
@Test
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 73aee5aeeef..1fe1147a739 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -21,11 +21,13 @@ import static
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerPr
import static
org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -77,6 +79,7 @@ import
org.apache.beam.sdk.io.kafka.KafkaIO.Read.FakeFlinkPipelineOptions;
import org.apache.beam.sdk.io.kafka.KafkaMocks.PositionErrorConsumerFactory;
import org.apache.beam.sdk.io.kafka.KafkaMocks.SendErrorProducerFactory;
import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -185,6 +188,8 @@ public class KafkaIOTest {
private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000);
private static final String TIMESTAMP_START_MILLIS_CONFIG =
"test.timestamp.start.millis";
private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";
+ static List<String> mkKafkaTopics = ImmutableList.of("topic_a", "topic_b");
+ static String mkKafkaServers = "myServer1:9092,myServer2:9092";
// Update mock consumer with records distributed among the given topics,
each with given number
// of partitions. Records are assigned in round-robin order among the
partitions.
@@ -390,15 +395,13 @@ public class KafkaIOTest {
@Nullable Boolean redistribute,
@Nullable Integer numKeys) {
- List<String> topics = ImmutableList.of("topic_a", "topic_b");
-
KafkaIO.Read<Integer, Long> reader =
KafkaIO.<Integer, Long>read()
- .withBootstrapServers("myServer1:9092,myServer2:9092")
- .withTopics(topics)
+ .withBootstrapServers(mkKafkaServers)
+ .withTopics(mkKafkaTopics)
.withConsumerFactoryFn(
new ConsumerFactoryFn(
- topics, 10, numElements, OffsetResetStrategy.EARLIEST)) //
20 partitions
+ mkKafkaTopics, 10, numElements,
OffsetResetStrategy.EARLIEST)) // 20 partitions
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class);
if (maxNumRecords != null) {
@@ -729,10 +732,11 @@ public class KafkaIOTest {
int numElements = 1000;
String topic = "my_topic";
+ String bootStrapServer = "none";
KafkaIO.Read<Integer, Long> reader =
KafkaIO.<Integer, Long>read()
- .withBootstrapServers("none")
+ .withBootstrapServers(bootStrapServer)
.withTopic("my_topic")
.withConsumerFactoryFn(
new ConsumerFactoryFn(
@@ -744,19 +748,24 @@ public class KafkaIOTest {
PCollection<Long> input =
p.apply(reader.withoutMetadata()).apply(Values.create());
addCountingAsserts(input, numElements);
- p.run();
+ PipelineResult result = p.run();
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
}
@Test
public void testUnboundedSourceWithExplicitPartitions() {
int numElements = 1000;
- List<String> topics = ImmutableList.of("test");
+ String topic = "test";
+ List<String> topics = ImmutableList.of(topic);
+ String bootStrapServer = "none";
KafkaIO.Read<byte[], Long> reader =
KafkaIO.<byte[], Long>read()
- .withBootstrapServers("none")
- .withTopicPartitions(ImmutableList.of(new TopicPartition("test",
5)))
+ .withBootstrapServers(bootStrapServer)
+ .withTopicPartitions(ImmutableList.of(new TopicPartition(topic,
5)))
.withConsumerFactoryFn(
new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) //
10 partitions
@@ -771,7 +780,10 @@ public class KafkaIOTest {
PAssert.thatSingleton(input.apply(Count.globally())).isEqualTo(numElements
/ 10L);
- p.run();
+ PipelineResult result = p.run();
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
}
@Test
@@ -782,6 +794,7 @@ public class KafkaIOTest {
ImmutableList.of(
"best", "gest", "hest", "jest", "lest", "nest", "pest", "rest",
"test", "vest", "west",
"zest");
+ String bootStrapServer = "none";
KafkaIO.Read<byte[], Long> reader =
KafkaIO.<byte[], Long>read()
@@ -796,7 +809,12 @@ public class KafkaIOTest {
PCollection<Long> input =
p.apply(reader.withoutMetadata()).apply(Values.create());
addCountingAsserts(input, numElements);
- p.run();
+ PipelineResult result = p.run();
+ String[] expect =
+ topics.stream()
+ .map(topic -> String.format("kafka:%s.%s", bootStrapServer, topic))
+ .toArray(String[]::new);
+ assertThat(Lineage.query(result.metrics(), Lineage.Type.SOURCE),
containsInAnyOrder(expect));
}
@Test
@@ -805,10 +823,11 @@ public class KafkaIOTest {
long numMatchedElements = numElements / 2; // Expected elements if split
across 2 topics
List<String> topics = ImmutableList.of("test", "Test");
+ String bootStrapServer = "none";
KafkaIO.Read<byte[], Long> reader =
KafkaIO.<byte[], Long>read()
- .withBootstrapServers("none")
+ .withBootstrapServers(bootStrapServer)
.withTopicPattern("[a-z]est")
.withConsumerFactoryFn(
new ConsumerFactoryFn(topics, 1, numElements,
OffsetResetStrategy.EARLIEST))
@@ -825,7 +844,13 @@ public class KafkaIOTest {
PAssert.thatSingleton(input.apply("Count",
Count.globally())).isEqualTo(numMatchedElements);
- p.run();
+ PipelineResult result = p.run();
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(String.format("kafka:%s.test", bootStrapServer)));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ not(hasItem(String.format("kafka:%s.Test", bootStrapServer))));
}
@Test
@@ -1435,22 +1460,26 @@ public class KafkaIOTest {
new
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
String topic = "test";
+ String bootStrapServer = "none";
p.apply(mkKafkaReadTransform(numElements, new
ValueAsTimestampFn()).withoutMetadata())
.apply(
KafkaIO.<Integer, Long>write()
- .withBootstrapServers("none")
+ .withBootstrapServers(bootStrapServer)
.withTopic(topic)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(LongSerializer.class)
.withInputTimestamp()
.withProducerFactoryFn(new
ProducerFactoryFn(producerWrapper.producerKey)));
- p.run();
+ PipelineResult result = p.run();
completionThread.shutdown();
verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
false, true);
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
}
}
@@ -1836,27 +1865,31 @@ public class KafkaIOTest {
ProducerSendCompletionThread completionThread =
new
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
- String topic = "test";
+ String topic = "test-eos";
+ String bootStrapServer = "none";
p.apply(mkKafkaReadTransform(numElements, new
ValueAsTimestampFn()).withoutMetadata())
.apply(
KafkaIO.<Integer, Long>write()
- .withBootstrapServers("none")
+ .withBootstrapServers(bootStrapServer)
.withTopic(topic)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(LongSerializer.class)
- .withEOS(1, "test")
+ .withEOS(1, "test-eos")
.withConsumerFactoryFn(
new ConsumerFactoryFn(
Lists.newArrayList(topic), 10, 10,
OffsetResetStrategy.EARLIEST))
.withPublishTimestampFunction((e, ts) -> ts)
.withProducerFactoryFn(new
ProducerFactoryFn(producerWrapper.producerKey)));
- p.run();
+ PipelineResult result = p.run();
completionThread.shutdown();
verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
false, true);
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(String.format("kafka:%s.%s", bootStrapServer, topic)));
}
}