This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e1b148b  Implement GrowableOffsetRangeTracker
     new 87dbf84  Merge pull request #11715 from boyuanzz/grow
e1b148b is described below

commit e1b148bf1f2e735ee082c06e1097fe72bb152365
Author: Boyuan Zhang <[email protected]>
AuthorDate: Tue May 12 20:24:56 2020 -0700

    Implement GrowableOffsetRangeTracker
---
 .../splittabledofn/GrowableOffsetRangeTracker.java | 143 ++++++++++++
 .../splittabledofn/OffsetRangeTracker.java         |  58 +++--
 .../GrowableOffsetRangeTrackerTest.java            | 255 +++++++++++++++++++++
 .../splittabledofn/OffsetRangeTrackerTest.java     |  43 ++++
 4 files changed, 485 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
new file mode 100644
index 0000000..a74d72b
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+/**
+ * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code 
Long.MAX_VALUE} is
+ * used as the end of the range to indicate infinity.
+ *
+ * <p>An offset range is considered growable when the end offset could grow 
(or change) during
+ * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ *
+ * <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class GrowableOffsetRangeTracker extends OffsetRangeTracker {
+  /**
+   * Provides the estimated end offset of the range.
+   *
+   * <p>{@link #estimate} is called to give the end offset when {@link 
#trySplit} or {@link
+   * #getProgress} is invoked. The end offset is exclusive for the range. The 
estimated end is not
+   * required to monotonically increase as it will only be taken into 
consideration when the
+   * estimated end offset is larger than the current position. Returning 
{@code Long.MAX_VALUE} as
+   * the estimate implies the largest possible position for the range is 
{@code Long.MAX_VALUE - 1}.
+   * Return {@code Long.MIN_VALUE} if an estimate can not be provided.
+   *
+   * <p>Providing a good estimate is important for an accurate progress signal 
and will impact
+   * splitting decisions by the runner.
+   *
+   * <p>If {@link #estimate} is expensive to compute, consider wrapping the 
implementation with
+   * {@link Suppliers#memoizeWithExpiration} or equivalent as an optimization.
+   *
+   * <p>TODO(BEAM-10032): Also consider using {@link RangeEndEstimator} when 
the range is not ended
+   * with {@code Long.MAX_VALUE}.
+   */
+  @FunctionalInterface
+  public interface RangeEndEstimator {
+    long estimate();
+  }
+
+  private final RangeEndEstimator rangeEndEstimator;
+
+  public GrowableOffsetRangeTracker(long start, RangeEndEstimator 
rangeEndEstimator) {
+    super(new OffsetRange(start, Long.MAX_VALUE));
+    this.rangeEndEstimator = checkNotNull(rangeEndEstimator);
+  }
+
+  @Override
+  public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+    // If current tracking range is no longer growable, split it as a normal 
range.
+    if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+      return super.trySplit(fractionOfRemainder);
+    }
+    // If current range has been done, there is no more space to split.
+    if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {
+      return null;
+    }
+    BigDecimal cur =
+        (lastAttemptedOffset == null)
+            ? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, 
MathContext.DECIMAL128)
+            : BigDecimal.valueOf(lastAttemptedOffset);
+
+    // Fetch the estimated end offset. If the estimated end is smaller than 
the next offset, use
+    // the next offset as end.
+    BigDecimal estimateRangeEnd =
+        BigDecimal.valueOf(rangeEndEstimator.estimate())
+            .max(cur.add(BigDecimal.ONE, MathContext.DECIMAL128));
+
+    // Convert to BigDecimal in computation to prevent overflow, which may 
result in loss of
+    // precision.
+    // split = cur + max(1, (estimateRangeEnd - cur) * fractionOfRemainder)
+    BigDecimal splitPos =
+        cur.add(
+            estimateRangeEnd
+                .subtract(cur, MathContext.DECIMAL128)
+                .multiply(BigDecimal.valueOf(fractionOfRemainder), 
MathContext.DECIMAL128)
+                .max(BigDecimal.ONE),
+            MathContext.DECIMAL128);
+    long split = splitPos.longValue();
+    if (split > estimateRangeEnd.longValue()) {
+      return null;
+    }
+    OffsetRange res = new OffsetRange(split, range.getTo());
+    this.range = new OffsetRange(range.getFrom(), split);
+    return SplitResult.of(range, res);
+  }
+
+  @Override
+  public Progress getProgress() {
+    // If current tracking range is no longer growable, get progress as a 
normal range.
+    if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+      return super.getProgress();
+    }
+
+    // Convert to BigDecimal in computation to prevent overflow, which may 
result in lost of
+    // precision.
+    BigDecimal estimateRangeEnd = 
BigDecimal.valueOf(rangeEndEstimator.estimate());
+
+    if (lastAttemptedOffset == null) {
+      return Progress.from(
+          0,
+          estimateRangeEnd
+              .subtract(BigDecimal.valueOf(range.getFrom()), 
MathContext.DECIMAL128)
+              .max(BigDecimal.ZERO)
+              .doubleValue());
+    }
+
+    BigDecimal workRemaining =
+        estimateRangeEnd
+            .subtract(BigDecimal.valueOf(lastAttemptedOffset), 
MathContext.DECIMAL128)
+            .max(BigDecimal.ZERO);
+    BigDecimal totalWork =
+        estimateRangeEnd
+            .max(BigDecimal.valueOf(lastAttemptedOffset))
+            .subtract(BigDecimal.valueOf(range.getFrom()), 
MathContext.DECIMAL128);
+    return Progress.from(
+        totalWork.subtract(workRemaining, 
MathContext.DECIMAL128).doubleValue(),
+        workRemaining.doubleValue());
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 2743a1d..9c7116f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -21,6 +21,8 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -31,13 +33,16 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects
 /**
  * A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} 
in a monotonically
  * increasing fashion.
+ *
+ * <p>The smallest offset is {@code Long.MIN_VALUE} and the largest offset is 
{@code Long.MAX_VALUE
+ * - 1}.
  */
 @Experimental(Kind.SPLITTABLE_DO_FN)
 public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long>
     implements HasProgress {
-  private OffsetRange range;
-  @Nullable private Long lastClaimedOffset = null;
-  @Nullable private Long lastAttemptedOffset = null;
+  protected OffsetRange range;
+  @Nullable protected Long lastClaimedOffset = null;
+  @Nullable protected Long lastAttemptedOffset = null;
 
   public OffsetRangeTracker(OffsetRange range) {
     this.range = checkNotNull(range);
@@ -50,16 +55,27 @@ public class OffsetRangeTracker extends 
RestrictionTracker<OffsetRange, Long>
 
   @Override
   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
-    long cur = (lastAttemptedOffset == null) ? range.getFrom() - 1 : 
lastAttemptedOffset;
-    long splitPos =
-        cur
-            + Math.max(
-                1L, (Double.valueOf((range.getTo() - cur) * 
fractionOfRemainder)).longValue());
-    if (splitPos >= range.getTo()) {
+    // Convert to BigDecimal in computation to prevent overflow, which may 
result in loss of
+    // precision.
+    BigDecimal cur =
+        (lastAttemptedOffset == null)
+            ? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE, 
MathContext.DECIMAL128)
+            : BigDecimal.valueOf(lastAttemptedOffset);
+    // split = cur + max(1, (range.getTo() - cur) * fractionOfRemainder)
+    BigDecimal splitPos =
+        cur.add(
+            BigDecimal.valueOf(range.getTo())
+                .subtract(cur, MathContext.DECIMAL128)
+                .multiply(BigDecimal.valueOf(fractionOfRemainder), 
MathContext.DECIMAL128)
+                .max(BigDecimal.ONE),
+            MathContext.DECIMAL128);
+
+    long split = splitPos.longValue();
+    if (split >= range.getTo()) {
       return null;
     }
-    OffsetRange res = new OffsetRange(splitPos, range.getTo());
-    this.range = new OffsetRange(range.getFrom(), splitPos);
+    OffsetRange res = new OffsetRange(split, range.getTo());
+    this.range = new OffsetRange(range.getFrom(), split);
     return SplitResult.of(range, res);
   }
 
@@ -120,13 +136,27 @@ public class OffsetRangeTracker extends 
RestrictionTracker<OffsetRange, Long>
   public Progress getProgress() {
     // If we have never attempted an offset, we return the length of the 
entire range as work
     // remaining.
+    // Convert to BigDecimal in computation to prevent overflow, which may 
result in loss of
+    // precision.
     if (lastAttemptedOffset == null) {
-      return Progress.from(0, range.getTo() - range.getFrom());
+      return Progress.from(
+          0,
+          BigDecimal.valueOf(range.getTo())
+              .subtract(BigDecimal.valueOf(range.getFrom()), 
MathContext.DECIMAL128)
+              .doubleValue());
     }
 
     // Compute the amount of work remaining from where we are to where we are 
attempting to get to
     // with a minimum of zero in case we have claimed beyond the end of the 
range.
-    long workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0);
-    return Progress.from(range.getTo() - range.getFrom() - workRemaining, 
workRemaining);
+    BigDecimal workRemaining =
+        BigDecimal.valueOf(range.getTo())
+            .subtract(BigDecimal.valueOf(lastAttemptedOffset), 
MathContext.DECIMAL128)
+            .max(BigDecimal.ZERO);
+    BigDecimal totalWork =
+        BigDecimal.valueOf(range.getTo())
+            .subtract(BigDecimal.valueOf(range.getFrom()), 
MathContext.DECIMAL128);
+    return Progress.from(
+        totalWork.subtract(workRemaining, 
MathContext.DECIMAL128).doubleValue(),
+        workRemaining.doubleValue());
   }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
new file mode 100644
index 0000000..1705015
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GrowableOffsetRangeTracker}. */
+@RunWith(JUnit4.class)
+public class GrowableOffsetRangeTrackerTest {
+  private static class SimpleEstimator implements 
GrowableOffsetRangeTracker.RangeEndEstimator {
+    private long estimateRangeEnd = 0;
+
+    @Override
+    public long estimate() {
+      return estimateRangeEnd;
+    }
+
+    public void setEstimateRangeEnd(long offset) {
+      estimateRangeEnd = offset;
+    }
+  }
+
+  @Rule public final ExpectedException expected = ExpectedException.none();
+
+  @Test
+  public void testIllegalInitialization() throws Exception {
+    expected.expect(NullPointerException.class);
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L, 
null);
+  }
+
+  @Test
+  public void testTryClaim() throws Exception {
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L, 
new SimpleEstimator());
+    assertTrue(tracker.tryClaim(10L));
+    assertTrue(tracker.tryClaim(100L));
+    assertFalse(tracker.tryClaim(Long.MAX_VALUE));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testCheckpointBeforeStart() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L, 
simpleEstimator);
+    simpleEstimator.setEstimateRangeEnd(10);
+    SplitResult res = tracker.trySplit(0);
+    tracker.checkDone();
+    assertEquals(new OffsetRange(0, 0), res.getPrimary());
+    assertEquals(new OffsetRange(0, 0), tracker.currentRestriction());
+    assertEquals(new OffsetRange(0, Long.MAX_VALUE), res.getResidual());
+  }
+
+  @Test
+  public void testCheckpointJustStarted() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L, 
simpleEstimator);
+    assertTrue(tracker.tryClaim(5L));
+    simpleEstimator.setEstimateRangeEnd(0L);
+    SplitResult res = tracker.trySplit(0);
+    tracker.checkDone();
+    assertEquals(new OffsetRange(0, 6), res.getPrimary());
+    assertEquals(new OffsetRange(0, 6), tracker.currentRestriction());
+    assertEquals(new OffsetRange(6, Long.MAX_VALUE), res.getResidual());
+
+    tracker = new GrowableOffsetRangeTracker(0L, simpleEstimator);
+    assertTrue(tracker.tryClaim(5L));
+    simpleEstimator.setEstimateRangeEnd(20L);
+    res = tracker.trySplit(0);
+    tracker.checkDone();
+    assertEquals(new OffsetRange(0, 6), res.getPrimary());
+    assertEquals(new OffsetRange(6, Long.MAX_VALUE), res.getResidual());
+  }
+
+  @Test
+  public void testCheckpointAfterAllProcessed() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L, 
simpleEstimator);
+    assertFalse(tracker.tryClaim(Long.MAX_VALUE));
+    tracker.checkDone();
+    assertNull(tracker.trySplit(0));
+  }
+
+  @Test
+  public void testCheckpointAtEmptyRange() throws Exception {
+    GrowableOffsetRangeTracker tracker =
+        new GrowableOffsetRangeTracker(Long.MAX_VALUE, new SimpleEstimator());
+    tracker.checkDone();
+    assertNull(tracker.trySplit(0));
+  }
+
+  @Test
+  public void testSplit() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L, 
simpleEstimator);
+    assertTrue(tracker.tryClaim(0L));
+
+    simpleEstimator.setEstimateRangeEnd(16L);
+    // The split of infinite range results in one finite range and one 
infinite range.
+    SplitResult res = tracker.trySplit(0.5);
+    assertEquals(new OffsetRange(0, 8), res.getPrimary());
+    assertEquals(new OffsetRange(0, 8), tracker.currentRestriction());
+    assertEquals(new OffsetRange(8, Long.MAX_VALUE), res.getResidual());
+
+    // After the first split, the tracker should track a finite range. 
Estimate offset should not
+    // impact split.
+    simpleEstimator.setEstimateRangeEnd(12L);
+    res = tracker.trySplit(0.5);
+    assertEquals(new OffsetRange(0, 4), res.getPrimary());
+    assertEquals(new OffsetRange(0, 4), tracker.currentRestriction());
+    assertEquals(new OffsetRange(4, 8), res.getResidual());
+    assertFalse(tracker.tryClaim(4L));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testSplitWithMaxEstimateRangeEnd() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(0L, 
simpleEstimator);
+    assertTrue(tracker.tryClaim(1L));
+    simpleEstimator.setEstimateRangeEnd(Long.MAX_VALUE);
+    SplitResult res = tracker.trySplit(0.5);
+    long expectedEnd = 1L + (Long.MAX_VALUE - 1L) / 2;
+    assertEquals(new OffsetRange(0L, expectedEnd), res.getPrimary());
+    assertEquals(new OffsetRange(expectedEnd, Long.MAX_VALUE), 
res.getResidual());
+  }
+
+  @Test
+  public void testProgressBeforeStart() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(10L, 
simpleEstimator);
+    simpleEstimator.setEstimateRangeEnd(20);
+    Progress currentProcess = tracker.getProgress();
+    assertEquals(0, currentProcess.getWorkCompleted(), 0.001);
+    assertEquals(10, currentProcess.getWorkRemaining(), 0.001);
+
+    simpleEstimator.setEstimateRangeEnd(15);
+    currentProcess = tracker.getProgress();
+    assertEquals(0, currentProcess.getWorkCompleted(), 0.001);
+    assertEquals(5, currentProcess.getWorkRemaining(), 0.001);
+
+    simpleEstimator.setEstimateRangeEnd(5);
+    currentProcess = tracker.getProgress();
+    assertEquals(0, currentProcess.getWorkCompleted(), 0.001);
+    assertEquals(0, currentProcess.getWorkRemaining(), 0.001);
+  }
+
+  @Test
+  public void testProgressAfterFinished() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(10L, 
simpleEstimator);
+    assertFalse(tracker.tryClaim(Long.MAX_VALUE));
+    tracker.checkDone();
+    simpleEstimator.setEstimateRangeEnd(0L);
+    Progress currentProgress = tracker.getProgress();
+    assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 
0.001);
+    assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
+  }
+
+  @Test
+  public void testProgress() throws Exception {
+    long start = 10L;
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker = new GrowableOffsetRangeTracker(start, 
simpleEstimator);
+    long cur = 20L;
+    assertTrue(tracker.tryClaim(cur));
+
+    simpleEstimator.setEstimateRangeEnd(5L);
+    Progress currentProgress = tracker.getProgress();
+    assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+    assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
+
+    simpleEstimator.setEstimateRangeEnd(35L);
+    currentProgress = tracker.getProgress();
+    assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+    assertEquals(35L - cur, currentProgress.getWorkRemaining(), 0.001);
+
+    simpleEstimator.setEstimateRangeEnd(25L);
+    currentProgress = tracker.getProgress();
+    assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+    assertEquals(25L - cur, currentProgress.getWorkRemaining(), 0.001);
+
+    simpleEstimator.setEstimateRangeEnd(Long.MAX_VALUE);
+    currentProgress = tracker.getProgress();
+    assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
+    assertEquals(Long.MAX_VALUE - cur, currentProgress.getWorkRemaining(), 
0.001);
+  }
+
+  @Test
+  public void testLargeRange() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker =
+        new GrowableOffsetRangeTracker(Long.MIN_VALUE, simpleEstimator);
+
+    simpleEstimator.setEstimateRangeEnd(Long.MAX_VALUE);
+    Progress progress = tracker.getProgress();
+    assertEquals(0, progress.getWorkCompleted(), 0.001);
+    assertEquals(
+        BigDecimal.valueOf(Long.MAX_VALUE)
+            .subtract(BigDecimal.valueOf(Long.MIN_VALUE), 
MathContext.DECIMAL128)
+            .doubleValue(),
+        progress.getWorkRemaining(),
+        0.001);
+
+    simpleEstimator.setEstimateRangeEnd(Long.MIN_VALUE);
+    SplitResult res = tracker.trySplit(0);
+    assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MIN_VALUE), 
res.getPrimary());
+    assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MAX_VALUE), 
res.getResidual());
+  }
+
+  @Test
+  public void testSmallRangeWithLargeValue() throws Exception {
+    SimpleEstimator simpleEstimator = new SimpleEstimator();
+    GrowableOffsetRangeTracker tracker =
+        new GrowableOffsetRangeTracker(123456789012345677L, simpleEstimator);
+    assertTrue(tracker.tryClaim(123456789012345677L));
+    simpleEstimator.setEstimateRangeEnd(123456789012345679L);
+    SplitResult res = tracker.trySplit(0.5);
+    assertEquals(new OffsetRange(123456789012345677L, 123456789012345678L), 
res.getPrimary());
+    assertEquals(new OffsetRange(123456789012345678L, Long.MAX_VALUE), 
res.getResidual());
+
+    tracker = new GrowableOffsetRangeTracker(123456789012345681L, 
simpleEstimator);
+    assertTrue(tracker.tryClaim(123456789012345681L));
+    simpleEstimator.setEstimateRangeEnd(123456789012345683L);
+    res = tracker.trySplit(0.5);
+    assertEquals(new OffsetRange(123456789012345681L, 123456789012345682L), 
res.getPrimary());
+    assertEquals(new OffsetRange(123456789012345682L, Long.MAX_VALUE), 
res.getResidual());
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index 6fdd254..b2c381c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
 import org.junit.Rule;
@@ -36,6 +38,12 @@ public class OffsetRangeTrackerTest {
   @Rule public final ExpectedException expected = ExpectedException.none();
 
   @Test
+  public void testIllegalInitialization() throws Exception {
+    expected.expect(NullPointerException.class);
+    OffsetRangeTracker tracker = new OffsetRangeTracker(null);
+  }
+
+  @Test
   public void testTryClaim() throws Exception {
     OffsetRange range = new OffsetRange(100, 200);
     OffsetRangeTracker tracker = new OffsetRangeTracker(range);
@@ -231,4 +239,39 @@ public class OffsetRangeTrackerTest {
     assertEquals(50, progress.getWorkCompleted(), 0.001);
     assertEquals(50, progress.getWorkRemaining(), 0.001);
   }
+
+  @Test
+  public void testLargeRange() throws Exception {
+    OffsetRangeTracker tracker =
+        new OffsetRangeTracker(new OffsetRange(Long.MIN_VALUE, 
Long.MAX_VALUE));
+
+    Progress progress = tracker.getProgress();
+    assertEquals(0, progress.getWorkCompleted(), 0.001);
+    assertEquals(
+        BigDecimal.valueOf(Long.MAX_VALUE)
+            .subtract(BigDecimal.valueOf(Long.MIN_VALUE), 
MathContext.DECIMAL128)
+            .doubleValue(),
+        progress.getWorkRemaining(),
+        0.001);
+
+    SplitResult res = tracker.trySplit(0);
+    assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MIN_VALUE), 
res.getPrimary());
+    assertEquals(new OffsetRange(Long.MIN_VALUE, Long.MAX_VALUE), 
res.getResidual());
+  }
+
+  @Test
+  public void testSmallRangeWithLargeValue() throws Exception {
+    OffsetRangeTracker tracker =
+        new OffsetRangeTracker(new OffsetRange(123456789012345677L, 
123456789012345679L));
+    assertTrue(tracker.tryClaim(123456789012345677L));
+    SplitResult res = tracker.trySplit(0.5);
+    assertEquals(new OffsetRange(123456789012345677L, 123456789012345678L), 
res.getPrimary());
+    assertEquals(new OffsetRange(123456789012345678L, 123456789012345679L), 
res.getResidual());
+
+    tracker = new OffsetRangeTracker(new OffsetRange(123456789012345681L, 
123456789012345683L));
+    assertTrue(tracker.tryClaim(123456789012345681L));
+    res = tracker.trySplit(0.5);
+    assertEquals(new OffsetRange(123456789012345681L, 123456789012345682L), 
res.getPrimary());
+    assertEquals(new OffsetRange(123456789012345682L, 123456789012345683L), 
res.getResidual());
+  }
 }

Reply via email to