scwhittle commented on code in PR #33596:
URL: https://github.com/apache/beam/pull/33596#discussion_r1939764376
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws
NoSuchElementException {
return curTimestamp;
}
+ private static final byte[] EMPTY_RECORD_ID = new byte[0];
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
Review Comment:
the super impl throws an exception if getCurrentSource().requiresDeduping()
is requested and this isn't implemented.
Maybe it woudl be better to delegate to super if
!offsetBasedDeduplicationSupported so it fails more loudly if that gets set in
the future?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -314,6 +343,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}
+ public boolean offsetBasedDeduplicationSupported() {
+ return this.offsetBasedDeduplicationSupported;
Review Comment:
seems confusing to have function and member with the same name, prefix the
method with get?
Or you could remove the member variable and just have this method delegate
to the source each time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]