scwhittle commented on code in PR #33596:
URL: https://github.com/apache/beam/pull/33596#discussion_r1939764376


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
     return curTimestamp;
   }
 
+  private static final byte[] EMPTY_RECORD_ID = new byte[0];
+
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {

Review Comment:
   the super impl throws an exception if getCurrentSource().requiresDeduping() 
is requested and this isn't implemented.
   
   Maybe it woudl be better to delegate to super if 
!offsetBasedDeduplicationSupported so it fails more loudly if that gets set in 
the future?
   



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -314,6 +343,10 @@ public long getSplitBacklogBytes() {
     return backlogBytes;
   }
 
+  public boolean offsetBasedDeduplicationSupported() {
+    return this.offsetBasedDeduplicationSupported;

Review Comment:
   seems confusing to have function and member with the same name, prefix the 
method with get?
   Or you could remove the member variable and just have this method delegate 
to the source each time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to