This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e1b148b Implement GrowableOffsetRangeTracker
new 87dbf84 Merge pull request #11715 from boyuanzz/grow
e1b148b is described below
commit e1b148bf1f2e735ee082c06e1097fe72bb152365
Author: Boyuan Zhang <[email protected]>
AuthorDate: Tue May 12 20:24:56 2020 -0700
Implement GrowableOffsetRangeTracker
---
.../splittabledofn/GrowableOffsetRangeTracker.java | 143 ++++++++++++
.../splittabledofn/OffsetRangeTracker.java | 58 +++--
.../GrowableOffsetRangeTrackerTest.java | 255 +++++++++++++++++++++
.../splittabledofn/OffsetRangeTrackerTest.java | 43 ++++
4 files changed, 485 insertions(+), 14 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
new file mode 100644
index 0000000..a74d72b
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+/**
+ * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code
Long.MAX_VALUE} is
+ * used as the end of the range to indicate infinity.
+ *
+ * <p>An offset range is considered growable when the end offset could grow
(or change) during
+ * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ *
+ * <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class GrowableOffsetRangeTracker extends OffsetRangeTracker {
+ /**
+ * Provides the estimated end offset of the range.
+ *
+ * <p>{@link #estimate} is called to give the end offset when {@link
#trySplit} or {@link
+ * #getProgress} is invoked. The end offset is exclusive for the range. The
estimated end is not
+ * required to monotonically increase as it will only be taken into
consideration when the
+ * estimated end offset is larger than the current position. Returning
{@code Long.MAX_VALUE} as
+ * the estimate implies the largest possible position for the range is
{@code Long.MAX_VALUE - 1}.
+ * Return {@code Long.MIN_VALUE} if an estimate can not be provided.
+ *
+ * <p>Providing a good estimate is important for an accurate progress signal
and will impact
+ * splitting decisions by the runner.
+ *
+ * <p>If {@link #estimate} is expensive to compute, consider wrapping the
implementation with
+ * {@link Suppliers#memoizeWithExpiration} or equivalent as an optimization.
+ *
+ * <p>TODO(BEAM-10032): Also consider using {@link RangeEndEstimator} when
the range is not ended
+ * with {@code Long.MAX_VALUE}.
+ */
+ @FunctionalInterface
+ public interface RangeEndEstimator {
+ long estimate();
+ }
+
+ private final RangeEndEstimator rangeEndEstimator;
+
+ public GrowableOffsetRangeTracker(long start, RangeEndEstimator
rangeEndEstimator) {
+ super(new OffsetRange(start, Long.MAX_VALUE));
+ this.rangeEndEstimator = checkNotNull(rangeEndEstimator);
+ }
+
+ @Override
+ public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+ // If current tracking range is no longer growable, split it as a normal
range.
+ if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+ return super.trySplit(fractionOfRemainder);
+ }
+ // If current range has been done, there is no more space to split.
+ if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {
+ return null;
+ }
+ BigDecimal cur =
+ (lastAttemptedOffset == null)
+ ? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE,
MathContext.DECIMAL128)
+ : BigDecimal.valueOf(lastAttemptedOffset);
+
+ // Fetch the estimated end offset. If the estimated end is smaller than
the next offset, use
+ // the next offset as end.
+ BigDecimal estimateRangeEnd =
+ BigDecimal.valueOf(rangeEndEstimator.estimate())
+ .max(cur.add(BigDecimal.ONE, MathContext.DECIMAL128));
+
+ // Convert to BigDecimal in computation to prevent overflow, which may
result in loss of
+ // precision.
+ // split = cur + max(1, (estimateRangeEnd - cur) * fractionOfRemainder)
+ BigDecimal splitPos =
+ cur.add(
+ estimateRangeEnd
+ .subtract(cur, MathContext.DECIMAL128)
+ .multiply(BigDecimal.valueOf(fractionOfRemainder),
MathContext.DECIMAL128)
+ .max(BigDecimal.ONE),
+ MathContext.DECIMAL128);
+ long split = splitPos.longValue();
+ if (split > estimateRangeEnd.longValue()) {
+ return null;
+ }
+ OffsetRange res = new OffsetRange(split, range.getTo());
+ this.range = new OffsetRange(range.getFrom(), split);
+ return SplitResult.of(range, res);
+ }
+
+ @Override
+ public Progress getProgress() {
+ // If current tracking range is no longer growable, get progress as a
normal range.
+ if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+ return super.getProgress();
+ }
+
+ // Convert to BigDecimal in computation to prevent overflow, which may
result in lost of
+ // precision.
+ BigDecimal estimateRangeEnd =
BigDecimal.valueOf(rangeEndEstimator.estimate());
+
+ if (lastAttemptedOffset == null) {
+ return Progress.from(
+ 0,
+ estimateRangeEnd
+ .subtract(BigDecimal.valueOf(range.getFrom()),
MathContext.DECIMAL128)
+ .max(BigDecimal.ZERO)
+ .doubleValue());
+ }
+
+ BigDecimal workRemaining =
+ estimateRangeEnd
+ .subtract(BigDecimal.valueOf(lastAttemptedOffset),
MathContext.DECIMAL128)
+ .max(BigDecimal.ZERO);
+ BigDecimal totalWork =
+ estimateRangeEnd
+ .max(BigDecimal.valueOf(lastAttemptedOffset))
+ .subtract(BigDecimal.valueOf(range.getFrom()),
MathContext.DECIMAL128);
+ return Progress.from(
+ totalWork.subtract(workRemaining,
MathContext.DECIMAL128).doubleValue(),
+ workRemaining.doubleValue());
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 2743a1d..9c7116f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -21,6 +21,8 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import java.math.BigDecimal;
+import java.math.MathContext;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -31,13 +33,16 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects
/**
* A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange}
in a monotonically
* increasing fashion.
+ *
+ * <p>The smallest offset is {@code Long.MIN_VALUE} and the largest offset is
{@code Long.MAX_VALUE
+ * - 1}.
*/
@Experimental(Kind.SPLITTABLE_DO_FN)
public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long>
implements HasProgress {
- private OffsetRange range;
- @Nullable private Long lastClaimedOffset = null;
- @Nullable private Long lastAttemptedOffset = null;
+ protected OffsetRange range;
+ @Nullable protected Long lastClaimedOffset = null;
+ @Nullable protected Long lastAttemptedOffset = null;
public OffsetRangeTracker(OffsetRange range) {
this.range = checkNotNull(range);
@@ -50,16 +55,27 @@ public class OffsetRangeTracker extends
RestrictionTracker<OffsetRange, Long>
@Override
public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
- long cur = (lastAttemptedOffset == null) ? range.getFrom() - 1 :
lastAttemptedOffset;
- long splitPos =
- cur
- + Math.max(
- 1L, (Double.valueOf((range.getTo() - cur) *
fractionOfRemainder)).longValue());
- if (splitPos >= range.getTo()) {
+ // Convert to BigDecimal in computation to prevent overflow, which may
result in loss of
+ // precision.
+ BigDecimal cur =
+ (lastAttemptedOffset == null)
+ ? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE,
MathContext.DECIMAL128)
+ : BigDecimal.valueOf(lastAttemptedOffset);
+ // split = cur + max(1, (range.getTo() - cur) * fractionOfRemainder)
+ BigDecimal splitPos =
+ cur.add(
+ BigDecimal.valueOf(range.getTo())
+ .subtract(cur, MathContext.DECIMAL128)
+ .multiply(BigDecimal.valueOf(fractionOfRemainder),
MathContext.DECIMAL128)
+ .max(BigDecimal.ONE),
+ MathContext.DECIMAL128);
+
+ long split = splitPos.longValue();
+ if (split >= range.getTo()) {
return null;
}
- OffsetRange res = new OffsetRange(splitPos, range.getTo());
- this.range = new OffsetRange(range.getFrom(), splitPos);
+ OffsetRange res = new OffsetRange(split, range.getTo());
+ this.range = new OffsetRange(range.getFrom(), split);
return SplitResult.of(range, res);
}
@@ -120,13 +136,27 @@ public class OffsetRangeTracker extends
RestrictionTracker<OffsetRange, Long>
public Progress getProgress() {
// If we have never attempted an offset, we return the length of the
entire range as work
// remaining.
+ // Convert to BigDecimal in computation to prevent overflow, which may
result in loss of
+ // precision.
if (lastAttemptedOffset == null) {
- return Progress.from(0, range.getTo() - range.getFrom());
+ return Progress.from(
+ 0,
+ BigDecimal.valueOf(range.getTo())
+ .subtract(BigDecimal.valueOf(range.getFrom()),
MathContext.DECIMAL128)
+ .doubleValue());
}
// Compute the amount of work remaining from where we are to where we are
attempting to get to
// with a minimum of zero in case we have claimed beyond the end of the
range.
- long workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0);
- return Progress.from(range.getTo() - range.getFrom() - workRemaining,
workRemaining);
+ BigDecimal workRemaining =
+ BigDecimal.valueOf(range.getTo())
+ .subtract(BigDecimal.valueOf(lastAttemptedOffset),
MathContext.DECIMAL128)
+ .max(BigDecimal.ZERO);
+ BigDecimal totalWork =
+ BigDecimal.valueOf(range.getTo())
+ .subtract(BigDecimal.valueOf(range.getFrom()),
MathContext.DECIMAL128);
+ return Progress.from(
+ totalWork.subtract(workRemaining,
MathContext.DECIMAL128).doubleValue(),
+ workRemaining.doubleValue());
}
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
new file mode 100644
index 0000000..1705015
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GrowableOffsetRangeTracker}. */
+@RunWith(JUnit4.class)
+public class GrowableOffsetRangeTrackerTest {
+ private static class SimpleEstimator implements
GrowableOffsetRangeTracker.RangeEndEstimator {
+ private long estimateRangeEnd = 0;
+
+ @Override
+ public long estimate() {
+ return estimateRangeEnd;
+ }
+
+ public void setEstimateRangeEnd(long offset) {
+ estimateRangeEnd = offset;
+ }
+ }
+
+ @Rule public final ExpectedException expected = ExpectedException.none();
+
+ @Test
+ public void testIllegalInitialization() throws Exception {
+ expected.expect(NullPointerException.class);
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L,
null);
+ }
+
+ @Test
+ public void testTryClaim() throws Exception {
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L,
new SimpleEstimator());
+ assertTrue(tracker.tryClaim(10L));
+ assertTrue(tracker.tryClaim(100L));
+ assertFalse(tracker.tryClaim(Long.MAX_VALUE));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testCheckpointBeforeStart() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L,
simpleEstimator);
+ simpleEstimator.setEstimateRangeEnd(10);
+ SplitResult res = tracker.trySplit(0);
+ tracker.checkDone();
+ assertEquals(new OffsetRange(0, 0), res.getPrimary());
+ assertEquals(new OffsetRange(0, 0), tracker.currentRestriction());
+ assertEquals(new OffsetRange(0, Long.MAX_VALUE), res.getResidual());
+ }
+
+ @Test
+ public void testCheckpointJustStarted() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L,
simpleEstimator);
+ assertTrue(tracker.tryClaim(5L));
+ simpleEstimator.setEstimateRangeEnd(0L);
+ SplitResult res = tracker.trySplit(0);
+ tracker.checkDone();
+ assertEquals(new OffsetRange(0, 6), res.getPrimary());
+ assertEquals(new OffsetRange(0, 6), tracker.currentRestriction());
+ assertEquals(new OffsetRange(6, Long.MAX_VALUE), res.getResidual());
+
+ tracker = new GrowableOffsetRangeTracker(0L, simpleEstimator);
+ assertTrue(tracker.tryClaim(5L));
+ simpleEstimator.setEstimateRangeEnd(20L);
+ res = tracker.trySplit(0);
+ tracker.checkDone();
+ assertEquals(new OffsetRange(0, 6), res.getPrimary());
+ assertEquals(new OffsetRange(6, Long.MAX_VALUE), res.getResidual());
+ }
+
+ @Test
+ public void testCheckpointAfterAllProcessed() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L,
simpleEstimator);
+ assertFalse(tracker.tryClaim(Long.MAX_VALUE));
+ tracker.checkDone();
+ assertNull(tracker.trySplit(0));
+ }
+
+ @Test
+ public void testCheckpointAtEmptyRange() throws Exception {
+ GrowableOffsetRangeTracker tracker =
+ new GrowableOffsetRangeTracker(Long.MAX_VALUE, new SimpleEstimator());
+ tracker.checkDone();
+ assertNull(tracker.trySplit(0));
+ }
+
+ @Test
+ public void testSplit() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L,
simpleEstimator);
+ assertTrue(tracker.tryClaim(0L));
+
+ simpleEstimator.setEstimateRangeEnd(16L);
+ // The split of infinite range results in one finite range and one
infinite range.
+ SplitResult res = tracker.trySplit(0.5);
+ assertEquals(new OffsetRange(0, 8), res.getPrimary());
+ assertEquals(new OffsetRange(0, 8), tracker.currentRestriction());
+ assertEquals(new OffsetRange(8, Long.MAX_VALUE), res.getResidual());
+
+ // After the first split, the tracker should track a finite range.
Estimate offset should not
+ // impact split.
+ simpleEstimator.setEstimateRangeEnd(12L);
+ res = tracker.trySplit(0.5);
+ assertEquals(new OffsetRange(0, 4), res.getPrimary());
+ assertEquals(new OffsetRange(0, 4), tracker.currentRestriction());
+ assertEquals(new OffsetRange(4, 8), res.getResidual());
+ assertFalse(tracker.tryClaim(4L));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testSplitWithMaxEstimateRangeEnd() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L,
simpleEstimator);
+ assertTrue(tracker.tryClaim(1L));
+ simpleEstimator.setEstimateRangeEnd(Long.MAX_VALUE);
+ SplitResult res = tracker.trySplit(0.5);
+ long expectedEnd = 1L + (Long.MAX_VALUE - 1L) / 2;
+ assertEquals(new OffsetRange(0L, expectedEnd), res.getPrimary());
+ assertEquals(new OffsetRange(expectedEnd, Long.MAX_VALUE),
res.getResidual());
+ }
+
+ @Test
+ public void testProgressBeforeStart() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(10L,
simpleEstimator);
+ simpleEstimator.setEstimateRangeEnd(20);
+ Progress currentProcess = tracker.getProgress();
+ assertEquals(0, currentProcess.getWorkCompleted(), 0.001);
+ assertEquals(10, currentProcess.getWorkRemaining(), 0.001);
+
+ simpleEstimator.setEstimateRangeEnd(15);
+ currentProcess = tracker.getProgress();
+ assertEquals(0, currentProcess.getWorkCompleted(), 0.001);
+ assertEquals(5, currentProcess.getWorkRemaining(), 0.001);
+
+ simpleEstimator.setEstimateRangeEnd(5);
+ currentProcess = tracker.getProgress();
+ assertEquals(0, currentProcess.getWorkCompleted(), 0.001);
+ assertEquals(0, currentProcess.getWorkRemaining(), 0.001);
+ }
+
+ @Test
+ public void testProgressAfterFinished() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(10L,
simpleEstimator);
+ assertFalse(tracker.tryClaim(Long.MAX_VALUE));
+ tracker.checkDone();
+ simpleEstimator.setEstimateRangeEnd(0L);
+ Progress currentProgress = tracker.getProgress();
+ assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(),
0.001);
+ assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
+ }
+
+ @Test
+ public void testProgress() throws Exception {
+ long start = 10L;
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(start,
simpleEstimator);
+ long cur = 20L;
+ assertTrue(tracker.tryClaim(cur));
+
+ simpleEstimator.setEstimateRangeEnd(5L);
+ Progress currentProgress = tracker.getProgress();
+ assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+ assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
+
+ simpleEstimator.setEstimateRangeEnd(35L);
+ currentProgress = tracker.getProgress();
+ assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+ assertEquals(35L - cur, currentProgress.getWorkRemaining(), 0.001);
+
+ simpleEstimator.setEstimateRangeEnd(25L);
+ currentProgress = tracker.getProgress();
+ assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+ assertEquals(25L - cur, currentProgress.getWorkRemaining(), 0.001);
+
+ simpleEstimator.setEstimateRangeEnd(Long.MAX_VALUE);
+ currentProgress = tracker.getProgress();
+ assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+ assertEquals(Long.MAX_VALUE - cur, currentProgress.getWorkRemaining(),
0.001);
+ }
+
+ @Test
+ public void testLargeRange() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker =
+ new GrowableOffsetRangeTracker(Long.MIN_VALUE, simpleEstimator);
+
+ simpleEstimator.setEstimateRangeEnd(Long.MAX_VALUE);
+ Progress progress = tracker.getProgress();
+ assertEquals(0, progress.getWorkCompleted(), 0.001);
+ assertEquals(
+ BigDecimal.valueOf(Long.MAX_VALUE)
+ .subtract(BigDecimal.valueOf(Long.MIN_VALUE),
MathContext.DECIMAL128)
+ .doubleValue(),
+ progress.getWorkRemaining(),
+ 0.001);
+
+ simpleEstimator.setEstimateRangeEnd(Long.MIN_VALUE);
+ SplitResult res = tracker.trySplit(0);
+ assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MIN_VALUE),
res.getPrimary());
+ assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MAX_VALUE),
res.getResidual());
+ }
+
+ @Test
+ public void testSmallRangeWithLargeValue() throws Exception {
+ SimpleEstimator simpleEstimator = new SimpleEstimator();
+ GrowableOffsetRangeTracker tracker =
+ new GrowableOffsetRangeTracker(123456789012345677L, simpleEstimator);
+ assertTrue(tracker.tryClaim(123456789012345677L));
+ simpleEstimator.setEstimateRangeEnd(123456789012345679L);
+ SplitResult res = tracker.trySplit(0.5);
+ assertEquals(new OffsetRange(123456789012345677L, 123456789012345678L),
res.getPrimary());
+ assertEquals(new OffsetRange(123456789012345678L, Long.MAX_VALUE),
res.getResidual());
+
+ tracker = new GrowableOffsetRangeTracker(123456789012345681L,
simpleEstimator);
+ assertTrue(tracker.tryClaim(123456789012345681L));
+ simpleEstimator.setEstimateRangeEnd(123456789012345683L);
+ res = tracker.trySplit(0.5);
+ assertEquals(new OffsetRange(123456789012345681L, 123456789012345682L),
res.getPrimary());
+ assertEquals(new OffsetRange(123456789012345682L, Long.MAX_VALUE),
res.getResidual());
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index 6fdd254..b2c381c 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.math.BigDecimal;
+import java.math.MathContext;
import org.apache.beam.sdk.io.range.OffsetRange;
import
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
import org.junit.Rule;
@@ -36,6 +38,12 @@ public class OffsetRangeTrackerTest {
@Rule public final ExpectedException expected = ExpectedException.none();
@Test
+ public void testIllegalInitialization() throws Exception {
+ expected.expect(NullPointerException.class);
+ OffsetRangeTracker tracker = new OffsetRangeTracker(null);
+ }
+
+ @Test
public void testTryClaim() throws Exception {
OffsetRange range = new OffsetRange(100, 200);
OffsetRangeTracker tracker = new OffsetRangeTracker(range);
@@ -231,4 +239,39 @@ public class OffsetRangeTrackerTest {
assertEquals(50, progress.getWorkCompleted(), 0.001);
assertEquals(50, progress.getWorkRemaining(), 0.001);
}
+
+ @Test
+ public void testLargeRange() throws Exception {
+ OffsetRangeTracker tracker =
+ new OffsetRangeTracker(new OffsetRange(Long.MIN_VALUE,
Long.MAX_VALUE));
+
+ Progress progress = tracker.getProgress();
+ assertEquals(0, progress.getWorkCompleted(), 0.001);
+ assertEquals(
+ BigDecimal.valueOf(Long.MAX_VALUE)
+ .subtract(BigDecimal.valueOf(Long.MIN_VALUE),
MathContext.DECIMAL128)
+ .doubleValue(),
+ progress.getWorkRemaining(),
+ 0.001);
+
+ SplitResult res = tracker.trySplit(0);
+ assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MIN_VALUE),
res.getPrimary());
+ assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MAX_VALUE),
res.getResidual());
+ }
+
+ @Test
+ public void testSmallRangeWithLargeValue() throws Exception {
+ OffsetRangeTracker tracker =
+ new OffsetRangeTracker(new OffsetRange(123456789012345677L,
123456789012345679L));
+ assertTrue(tracker.tryClaim(123456789012345677L));
+ SplitResult res = tracker.trySplit(0.5);
+ assertEquals(new OffsetRange(123456789012345677L, 123456789012345678L),
res.getPrimary());
+ assertEquals(new OffsetRange(123456789012345678L, 123456789012345679L),
res.getResidual());
+
+ tracker = new OffsetRangeTracker(new OffsetRange(123456789012345681L,
123456789012345683L));
+ assertTrue(tracker.tryClaim(123456789012345681L));
+ res = tracker.trySplit(0.5);
+ assertEquals(new OffsetRange(123456789012345681L, 123456789012345682L),
res.getPrimary());
+ assertEquals(new OffsetRange(123456789012345682L, 123456789012345683L),
res.getResidual());
+ }
}