scwhittle commented on code in PR #33596:
URL: https://github.com/apache/beam/pull/33596#discussion_r1925305900


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java:
##########
@@ -142,4 +144,14 @@ void update(double quantity) {
       return avg;
     }
   }
+
+  static byte[] getOrderedCode(long offset) {

Review Comment:
   encodeOffset?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java:
##########
@@ -142,4 +144,14 @@ void update(double quantity) {
       return avg;
     }
   }
+
+  static byte[] getOrderedCode(long offset) {
+    OrderedCode orderedCode = new OrderedCode();
+    orderedCode.writeNumIncreasing(offset);

Review Comment:
   could see if this shows up on profiles, it would be cheaper to have some 
static method to to encode a long as just as fixed length bigendian since we 
don't need separator or inf that ordered code supports.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -892,6 +898,10 @@ static <K, V> void setupExternalBuilder(
           builder.setRedistributeNumKeys(0);
           builder.setAllowDuplicates(false);
         }
+        // TODO(tomstepp): Auto-enable offset deduplication if: redistributed 
and !allowDuplicates.

Review Comment:
   should we validate in build that this is only set if those conditions are 
met?
   
   



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
     return curTimestamp;
   }
 
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    if (curId == null) {
+      if (this.offsetDeduplication) {
+        throw new NoSuchElementException();
+      } else {
+        return new byte[0];
+      }
+    }
+    return curId;
+  }
+
+  @Override
+  public byte[] getCurrentRecordOffset() throws NoSuchElementException {

Review Comment:
   ditto



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java:
##########
@@ -113,10 +113,16 @@ public List<KafkaUnboundedSource<K, V>> split(int 
desiredNumSplits, PipelineOpti
         partitions.size() > 0,
         "Could not find any partitions. Please check Kafka configuration and 
topic names");
 
-    int numSplits = Math.min(desiredNumSplits, partitions.size());
-    // XXX make all splits have the same # of partitions
-    while (partitions.size() % numSplits > 0) {
-      ++numSplits;
+    int numSplits;
+    if (isOffsetDeduplication()) {
+      // Enforce 1:1 split to partition ratio for offset deduplication.

Review Comment:
   have a log, indicating this is happening?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
     return curTimestamp;
   }
 
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {

Review Comment:
   can we make it an error to call this if offsetDuplication isn't set and 
provide a way for callers to check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to