tomstepp commented on code in PR #33596:
URL: https://github.com/apache/beam/pull/33596#discussion_r1938700730


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
     return curTimestamp;
   }
 
+  private static final byte[] EMPTY_RECORD_ID = new byte[0];
+
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    if (!this.offsetBasedDeduplicationSupported) {
+      // BoundedReadFromUnboundedSource and tests may call 
getCurrentRecordId(), even for non offset
+      // deduplication cases. Therefore, Kafka reader cannot produce an 
exception when offset
+      // deduplication is disabled. Instead an empty record ID is provided.
+      return EMPTY_RECORD_ID;
+    }
+    if (curRecord != null) {

Review Comment:
   Sg, I had switched things around when confused about some nullable lint 
check.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
     return curTimestamp;
   }
 
+  private static final byte[] EMPTY_RECORD_ID = new byte[0];
+
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    if (!this.offsetBasedDeduplicationSupported) {
+      // BoundedReadFromUnboundedSource and tests may call 
getCurrentRecordId(), even for non offset
+      // deduplication cases. Therefore, Kafka reader cannot produce an 
exception when offset
+      // deduplication is disabled. Instead an empty record ID is provided.
+      return EMPTY_RECORD_ID;
+    }
+    if (curRecord != null) {
+      return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
+          curRecord.getTopic(), curRecord.getPartition(), 
curRecord.getOffset());
+    }
+    throw new NoSuchElementException("KafkaUnboundedReader's curRecord is 
null.");
+  }
+
+  @Override
+  public byte[] getCurrentRecordOffset() throws NoSuchElementException {
+    if (!this.offsetBasedDeduplicationSupported) {
+      throw new RuntimeException("UnboundedSource must enable offset-based 
deduplication.");
+    }
+    if (curRecord != null) {

Review Comment:
   Sg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to