kennknowles commented on code in PR #31347:
URL: https://github.com/apache/beam/pull/31347#discussion_r1621076103


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1606,6 +1658,24 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin 
input) {
                   .withMaxNumRecords(kafkaRead.getMaxNumRecords());
         }
 
+        if (kafkaRead.isRedistributed()) {
+          if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
+            LOG.warn(
+                "Both isRedistribute and isCommitOffsetsInFinalizeEnabled are 
enabled, isRedistribute will take precendence");

Review Comment:
   Redistribute is not guaranteed to do the checkpoint that is required by 
commitOffsetsInFinalize. Either we need to make both work correctly, or this 
should be a rejected configuration (throw an exception)



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -948,6 +975,14 @@ public void setTimestampPolicy(String timestampPolicy) {
         public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
           this.consumerPollingTimeout = consumerPollingTimeout;
         }
+
+        public void setRedistributeNumShards(Long redistributeNumShards) {
+          this.redistributeNumShards = redistributeNumShards;
+        }
+
+        public void setRedistribute(Boolean redistribute) {

Review Comment:
   You also need to have `allowDuplicates` as a parameter. It makes sense to 
also Redistribute for exactly once use case.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -948,6 +975,14 @@ public void setTimestampPolicy(String timestampPolicy) {
         public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
           this.consumerPollingTimeout = consumerPollingTimeout;
         }
+
+        public void setRedistributeNumShards(Long redistributeNumShards) {

Review Comment:
   Since it is boxed, you can make it `@Nullable` and that way you can check 
whether it has been set or not. It is fine to rely on defaults to zero also, 
but then make it unboxed (if you can - I don't know how autovalue works with 
that)



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -904,6 +929,8 @@ public static class Configuration {
         private Boolean commitOffsetInFinalize;
         private Long consumerPollingTimeout;
         private String timestampPolicy;
+        private Long redistributeNumShards;

Review Comment:
   When you pass this to Redistribute you cast it to an `int`. So just use 
`Integer` here if it has to be that small (which is obviously plenty big since 
the goal is to reduce the number of keys).
   
   Actually we should rename all of this to `numKeys` because "shard" is an 
element of processing and it is a Dataflow-specific quirk that key == shard 
even when it is not required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to