kennknowles commented on code in PR #31347:
URL: https://github.com/apache/beam/pull/31347#discussion_r1612123815
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2295,6 +2391,15 @@ public ReadSourceDescriptors<K, V> withProcessingTime() {
ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
}
+ /** Enable Redistribute. */
+ public ReadSourceDescriptors<K, V> withRedistributeEnabled() {
+ return toBuilder().setRedistributeEnabled(true).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withNumShards(int numShards) {
+ return toBuilder().setNumShards(numShards).build();
Review Comment:
`setNumRedistributeShards` because it has to do with the redistribute not
the top-level KafkaIO transform
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2136,6 +2230,8 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
.setConsumerPollingTimeout(2L)
+ .setRedistributeEnabled(false)
Review Comment:
nit: just say `withRedistribute`
##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -616,6 +624,58 @@ public void testRiskyConfigurationWarnsProperly() {
p.run();
}
+ @Test
+ public void testRiskyConfigurationWarnsProperlyWithNumShardsNotSet() {
+ int numElements = 1000;
+
+ PCollection<Long> input =
+ p.apply(
+ mkKafkaReadTransform(numElements, numElements, new
ValueAsTimestampFn(), true, 0)
+ .withConsumerConfigUpdates(
+
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
+ .withoutMetadata())
+ .apply(Values.create());
+
+ addCountingAsserts(input, numElements);
+
+ kafkaIOExpectedLogs.verifyWarn(
+ "This will redistribute the load across the same number of shards as
the Kafka source.");
Review Comment:
Won't `Redistribute.arbitrarily` currently create a random key for each
record and just create way too many keys and that is the problem? In other
words, numShards is used to _decrease_ the number of keys, not to increase. And
that is an implementation detail that actually could/should change in the
future if we do something clever and actually make it a black box that uses
e.g. least loaded algorithms.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]