kennknowles commented on code in PR #28548:
URL: https://github.com/apache/beam/pull/28548#discussion_r1338653438
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java:
##########
@@ -776,7 +776,7 @@ public double getRemainingParallelism() {
private static class UnboundedReaderIterator<T>
extends
NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
- private final UnboundedSource.UnboundedReader<T> reader;
+ private final UnboundedSource.UnboundedReader<T> reader; // not owned
Review Comment:
What does ownership mean here? Not to do with memory management obviously.
If I understand correctly, it means that there may be other calls to various
state methods so you can never assume what state the reader will be in? (it
would be worth writing the whole long comment about what this means here)
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java:
##########
@@ -505,6 +514,7 @@ BoundedSource<T> getSource() {
}
Checkpoint<T> getCheckpointMark() {
+ checkArgument(!closed, "getCheckpointMark() call on closed %s",
getClass().getName());
Review Comment:
This should be `checkState` since there are no arguments.
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java:
##########
@@ -288,6 +288,15 @@ private void init(
residualElementsList == null
? new ResidualElements(Collections.emptyList())
: new ResidualElements(residualElementsList);
+
+ if (this.residualSource != null) {
Review Comment:
Oh yea based on that the change is obviously correct. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]