dpcollins-google commented on a change in pull request #17125:
URL: https://github.com/apache/beam/pull/17125#discussion_r831160922
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTracker.java
##########
@@ -76,97 +57,47 @@ public IsBounded isBounded() {
@Override
public boolean tryClaim(OffsetByteProgress position) {
- long toClaim = position.lastOffset().value();
- checkArgument(
- lastClaimed == null || toClaim > lastClaimed,
- "Trying to claim offset %s while last attempted was %s",
- position.lastOffset().value(),
- lastClaimed);
- checkArgument(
- toClaim >= range.getRange().getFrom(),
- "Trying to claim offset %s before start of the range %s",
- toClaim,
- range);
- // split() has already been called, truncating this range. No more offsets
may be claimed.
- if (range.getRange().getTo() != Long.MAX_VALUE) {
- boolean isRangeEmpty = range.getRange().getTo() ==
range.getRange().getFrom();
- boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
- checkState(
- isRangeEmpty || isValidClosedRange,
- "Violated class precondition: offset range improperly split. Please
report a beam bug.");
- return false;
- }
- lastClaimed = toClaim;
- range = OffsetByteRange.of(range.getRange(), range.getByteCount() +
position.batchBytes());
+ if (!rangeTracker.tryClaim(position.lastOffset().value())) return false;
+ lastClaimed = position.lastOffset().value();
+ bytes += position.batchBytes();
return true;
}
@Override
public OffsetByteRange currentRestriction() {
- return range;
+ return OffsetByteRange.of(rangeTracker.currentRestriction(), bytes);
}
private long nextOffset() {
checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
return lastClaimed == null ? currentRestriction().getRange().getFrom() :
lastClaimed + 1;
}
- /**
- * Whether the tracker has received enough data/been running for enough time
that it can
- * checkpoint and be confident it can get sufficient throughput.
- */
- private boolean receivedEnough() {
- Duration duration =
Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS));
- if (duration.isLongerThan(minTrackingTime)) {
- return true;
- }
- if (currentRestriction().getByteCount() >= minBytesReceived) {
- return true;
- }
- return false;
- }
-
@Override
public @Nullable SplitResult<OffsetByteRange> trySplit(double
fractionOfRemainder) {
// Cannot split a bounded range. This should already be completely claimed.
- if (range.getRange().getTo() != Long.MAX_VALUE) {
+ if (rangeTracker.currentRestriction().getTo() != Long.MAX_VALUE) {
return null;
}
- if (!receivedEnough()) {
+ @Nullable SplitResult<OffsetRange> ranges =
rangeTracker.trySplit(fractionOfRemainder);
+ if (ranges == null) {
return null;
}
- range =
- OffsetByteRange.of(
- new OffsetRange(currentRestriction().getRange().getFrom(),
nextOffset()),
- range.getByteCount());
+
checkArgument(rangeTracker.currentRestriction().equals(ranges.getPrimary()));
return SplitResult.of(
- this.range, OffsetByteRange.of(new OffsetRange(nextOffset(),
Long.MAX_VALUE), 0));
+ currentRestriction(),
OffsetByteRange.of(checkArgumentNotNull(ranges.getResidual())));
}
@Override
@SuppressWarnings("unboxing.of.nullable")
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]