scwhittle commented on code in PR #33591:
URL: https://github.com/apache/beam/pull/33591#discussion_r1924239234
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -215,7 +215,13 @@ public long add(WindowedValue<T> data) throws IOException {
.setMetadata(metadata);
keyedOutput.addMessages(builder.build());
keyedOutput.addMessagesIds(id);
- return (long) key.size() + value.size() + metadata.size() + id.size();
+
+ ByteString offset =
ByteString.copyFrom(context.getCurrentRecordOffset());
Review Comment:
would be nice to avoid allocation if no current record since that is common
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -139,6 +146,11 @@ public void finalizeCheckpoint() throws IOException {
// nothing to do
}
}
+
+ /* Get offset limit for unbounded source split checkpoint. */
+ default byte[] getOffsetLimit() throws IOException {
Review Comment:
maybe we should just throw a runtime exception? it seems like this shouldn't
have to do IO and IOException needs to be handled at callers if method is
annoated as throwing non-runtime exception
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -93,6 +93,13 @@ public boolean requiresDeduping() {
return false;
}
+ /**
+ * Returns whether this source is configured for offset-based deduplication
by the runner.
+ */
+ public boolean offsetDeduplication() {
Review Comment:
can this be named as a property of the source instead? The difference being
that it is available to runners to use for optimization if desired but it isn't
required that the runner does.
so maybe this should be named isOffsetBased with a comment that if it
returns true then the source needs to provide offsets that are unique for each
element, and can be ordered lexicographically for both the UnboundedReaders it
creates and the checkpoint marks it vends must have an offset greater orr equal
to all elements read and less than the next element (or whatever the edge-case
semantics are).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]