This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1b2d21acf8b Fix Kafka with Redistribute and commits enabled (#32344)
1b2d21acf8b is described below
commit 1b2d21acf8b88c14cd9395f6e69c5802e65786f0
Author: Naireen Hussain <[email protected]>
AuthorDate: Tue Sep 17 01:50:22 2024 -0700
Fix Kafka with Redistribute and commits enabled (#32344)
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 30 ++++--
...KafkaIOReadImplementationCompatibilityTest.java | 8 +-
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 118 +++++++++++++++++++--
3 files changed, 136 insertions(+), 20 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 1fd3e3e044e..0f28edf19dd 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -890,7 +890,6 @@ public class KafkaIO {
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
}
- System.out.println("xxx builder service" + builder.toString());
}
private static <T> Coder<T> resolveCoder(Class<Deserializer<T>>
deserializer) {
@@ -1697,11 +1696,12 @@ public class KafkaIO {
}
if (kafkaRead.isRedistributed()) {
- // fail here instead.
- checkArgument(
- kafkaRead.isCommitOffsetsInFinalizeEnabled(),
- "commitOffsetsInFinalize() can't be enabled with
isRedistributed");
+ if (kafkaRead.isCommitOffsetsInFinalizeEnabled() &&
kafkaRead.isAllowDuplicates()) {
+ LOG.warn(
+ "Offsets committed due to usage of commitOffsetsInFinalize()
and may not capture all work processed due to use of withRedistribute() with
duplicates enabled");
+ }
PCollection<KafkaRecord<K, V>> output =
input.getPipeline().apply(transform);
+
if (kafkaRead.getRedistributeNumKeys() == 0) {
return output.apply(
"Insert Redistribute",
@@ -1797,7 +1797,7 @@ public class KafkaIO {
return pcol.apply(
"Insert Redistribute with Shards",
Redistribute.<KafkaRecord<K, V>>arbitrarily()
- .withAllowDuplicates(true)
+ .withAllowDuplicates(kafkaRead.isAllowDuplicates())
.withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
}
}
@@ -2654,6 +2654,12 @@ public class KafkaIO {
if (getRedistributeNumKeys() == 0) {
LOG.warn("This will create a key per record, which is sub-optimal
for most use cases.");
}
+ if ((isCommitOffsetEnabled() || configuredKafkaCommit()) &&
isAllowDuplicates()) {
+ LOG.warn(
+ "Either auto_commit is set, or commitOffsetEnabled is enabled
(or both), but since "
+ + "withRestribute() is enabled with allow duplicates, the
runner may have additional work processed that "
+ + "is ahead of the current checkpoint");
+ }
}
if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) ==
null) {
@@ -2687,8 +2693,7 @@ public class KafkaIO {
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));
- boolean applyCommitOffsets =
- isCommitOffsetEnabled() && !configuredKafkaCommit() &&
!isRedistribute();
+ boolean applyCommitOffsets = isCommitOffsetEnabled() &&
!configuredKafkaCommit();
if (!applyCommitOffsets) {
return outputWithDescriptor
.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>()
{}).via(KV::getValue))
@@ -2710,6 +2715,15 @@ public class KafkaIO {
if (Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0) {
+ // Redistribute is not allowed with commits prior to 2.59.0, since
there is a Reshuffle
+ // prior to the redistribute. The reshuffle will occur before
commits are offsetted and
+ // before outputting KafkaRecords. Adding a redistribute then
afterwards doesn't provide
+ // additional performance benefit.
+ checkArgument(
+ !isRedistribute(),
+ "Can not enable isRedistribute() while committing offsets
prior to "
+ + String.join(".", targetVersion));
+
return expand259Commits(
outputWithDescriptor, recordCoder,
input.getPipeline().getSchemaRegistry());
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
index 74f1e83fd86..29c920bf9a6 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
@@ -108,7 +108,13 @@ public class KafkaIOReadImplementationCompatibilityTest {
Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>>
kafkaReadDecorator) {
p.apply(
kafkaReadDecorator.apply(
- mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false,
0)));
+ mkKafkaReadTransform(
+ 1000,
+ null,
+ new ValueAsTimestampFn(),
+ false, /*redistribute*/
+ false, /*allowDuplicates*/
+ 0)));
return p.run();
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 1fe1147a739..25ff6dad124 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -88,6 +88,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -381,7 +382,13 @@ public class KafkaIOTest {
static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
int numElements, @Nullable SerializableFunction<KV<Integer, Long>,
Instant> timestampFn) {
- return mkKafkaReadTransform(numElements, numElements, timestampFn, false,
0);
+ return mkKafkaReadTransform(
+ numElements,
+ numElements,
+ timestampFn,
+ false, /*redistribute*/
+ false, /*allowDuplicates*/
+ 0);
}
/**
@@ -393,6 +400,7 @@ public class KafkaIOTest {
@Nullable Integer maxNumRecords,
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn,
@Nullable Boolean redistribute,
+ @Nullable Boolean withAllowDuplicates,
@Nullable Integer numKeys) {
KafkaIO.Read<Integer, Long> reader =
@@ -408,13 +416,21 @@ public class KafkaIOTest {
reader = reader.withMaxNumRecords(maxNumRecords);
}
+ if (withAllowDuplicates == null) {
+ withAllowDuplicates = false;
+ }
+
if (timestampFn != null) {
reader = reader.withTimestampFn(timestampFn);
}
if (redistribute) {
if (numKeys != null) {
- reader = reader.withRedistribute().withRedistributeNumKeys(numKeys);
+ reader =
+ reader
+ .withRedistribute()
+ .withAllowDuplicates(withAllowDuplicates)
+ .withRedistributeNumKeys(numKeys);
}
reader = reader.withRedistribute();
}
@@ -628,17 +644,47 @@ public class KafkaIOTest {
}
@Test
- public void testCommitOffsetsInFinalizeAndRedistributeErrors() {
- thrown.expect(Exception.class);
- thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with
isRedistributed");
+ public void warningsWithAllowDuplicatesEnabledAndCommitOffsets() {
+ int numElements = 1000;
+ PCollection<Long> input =
+ p.apply(
+ mkKafkaReadTransform(
+ numElements,
+ numElements,
+ new ValueAsTimestampFn(),
+ true, /*redistribute*/
+ true, /*allowDuplicates*/
+ 0)
+ .commitOffsetsInFinalize()
+ .withConsumerConfigUpdates(
+ ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"group_id"))
+ .withoutMetadata())
+ .apply(Values.create());
+
+ addCountingAsserts(input, numElements);
+ p.run();
+
+ kafkaIOExpectedLogs.verifyWarn(
+ "Offsets committed due to usage of commitOffsetsInFinalize() and may
not capture all work processed due to use of withRedistribute() with duplicates
enabled");
+ }
+
+ @Test
+ public void noWarningsWithNoAllowDuplicatesAndCommitOffsets() {
int numElements = 1000;
PCollection<Long> input =
p.apply(
- mkKafkaReadTransform(numElements, numElements, new
ValueAsTimestampFn(), true, 0)
+ mkKafkaReadTransform(
+ numElements,
+ numElements,
+ new ValueAsTimestampFn(),
+ true, /*redistribute*/
+ false, /*allowDuplicates*/
+ 0)
+ .commitOffsetsInFinalize()
.withConsumerConfigUpdates(
-
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
+ ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"group_id"))
.withoutMetadata())
.apply(Values.create());
@@ -648,13 +694,25 @@ public class KafkaIOTest {
@Test
public void testNumKeysIgnoredWithRedistributeNotEnabled() {
+ thrown.expect(Exception.class);
+ thrown.expectMessage(
+ "withRedistributeNumKeys is ignored if withRedistribute() is not
enabled on the transform");
+
int numElements = 1000;
PCollection<Long> input =
p.apply(
- mkKafkaReadTransform(numElements, numElements, new
ValueAsTimestampFn(), false, 0)
+ mkKafkaReadTransform(
+ numElements,
+ numElements,
+ new ValueAsTimestampFn(),
+ false, /*redistribute*/
+ false, /*allowDuplicates*/
+ 0)
+ .withRedistributeNumKeys(100)
+ .commitOffsetsInFinalize()
.withConsumerConfigUpdates(
-
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
+ ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"group_id"))
.withoutMetadata())
.apply(Values.create());
@@ -663,6 +721,32 @@ public class KafkaIOTest {
p.run();
}
+ @Test
+ public void testDisableRedistributeKafkaOffsetLegacy() {
+ thrown.expect(Exception.class);
+ thrown.expectMessage(
+ "Can not enable isRedistribute() while committing offsets prior to
2.60.0");
+
p.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.59.0");
+
+ p.apply(
+ Create.of(
+ KafkaSourceDescriptor.of(
+ new TopicPartition("topic", 1),
+ null,
+ null,
+ null,
+ null,
+ ImmutableList.of("8.8.8.8:9092"))))
+ .apply(
+ KafkaIO.<Long, Long>readSourceDescriptors()
+ .withKeyDeserializer(LongDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
+ .withRedistribute()
+ .withProcessingTime()
+ .commitOffsets());
+ p.run();
+ }
+
@Test
public void testUnreachableKafkaBrokers() {
// Expect an exception when the Kafka brokers are not reachable on the
workers.
@@ -1982,7 +2066,13 @@ public class KafkaIOTest {
PCollection<Long> input =
p.apply(
- mkKafkaReadTransform(numElements, maxNumRecords, new
ValueAsTimestampFn(), false, 0)
+ mkKafkaReadTransform(
+ numElements,
+ maxNumRecords,
+ new ValueAsTimestampFn(),
+ false, /*redistribute*/
+ false, /*allowDuplicates*/
+ 0)
.withStartReadTime(new Instant(startTime))
.withoutMetadata())
.apply(Values.create());
@@ -2006,7 +2096,13 @@ public class KafkaIOTest {
int startTime = numElements / 20;
p.apply(
- mkKafkaReadTransform(numElements, numElements, new
ValueAsTimestampFn(), false, 0)
+ mkKafkaReadTransform(
+ numElements,
+ numElements,
+ new ValueAsTimestampFn(),
+ false, /*redistribute*/
+ false, /*allowDuplicates*/
+ 0)
.withStartReadTime(new Instant(startTime))
.withoutMetadata())
.apply(Values.create());