lukecwik commented on a change in pull request #11454:
URL: https://github.com/apache/beam/pull/11454#discussion_r414023186



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
##########
@@ -64,31 +64,56 @@ public ByteKeyRange currentRestriction() {
 
   @Override
   public SplitResult<ByteKeyRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8871): Add support for splitting off a fixed amount of work 
for this restriction
-    // instead of only supporting checkpointing.
+    // No split on an empty range.
+    if (NO_KEYS.equals(range)) {

Review comment:
       or startKey == endKey

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
##########
@@ -64,31 +64,56 @@ public ByteKeyRange currentRestriction() {
 
   @Override
   public SplitResult<ByteKeyRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8871): Add support for splitting off a fixed amount of work 
for this restriction
-    // instead of only supporting checkpointing.
+    // No split on an empty range.
+    if (NO_KEYS.equals(range)) {
+      return null;
+    }
+    // There is no more remaining work after the entire range has been claimed.
+    if (lastAttemptedKey != null && lastAttemptedKey.isEmpty()) {
+      return null;
+    }
 
-    // If we haven't done any work, we should return the original range we 
were processing
-    // as the checkpoint.
-    if (lastAttemptedKey == null) {
-      ByteKeyRange rval = range;
-      // We update our current range to an interval that contains no elements.
-      range = NO_KEYS;
-      return SplitResult.of(range, rval);
+    ByteKey startKey = (lastAttemptedKey == null) ? range.getStartKey() : 
next(lastAttemptedKey);
+    ByteKey endKey = range.getEndKey();

Review comment:
       I would get rid of this local variable since its not being computed and 
is always range.getEndKey(). We can see that it doesn't add value since you use 
range.getEndKey() and not endKey below.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
##########
@@ -156,6 +181,11 @@ public void checkDone() throws IllegalStateException {
       return;

Review comment:
       We should cover the case where start == end at the NO_KEYS check at the 
top since it would be valid to be done and not have attempted anything because 
of how checkpoint is implemented.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
##########
@@ -64,31 +64,56 @@ public ByteKeyRange currentRestriction() {
 
   @Override
   public SplitResult<ByteKeyRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8871): Add support for splitting off a fixed amount of work 
for this restriction
-    // instead of only supporting checkpointing.
+    // No split on an empty range.
+    if (NO_KEYS.equals(range)) {
+      return null;
+    }
+    // There is no more remaining work after the entire range has been claimed.
+    if (lastAttemptedKey != null && lastAttemptedKey.isEmpty()) {
+      return null;
+    }
 
-    // If we haven't done any work, we should return the original range we 
were processing
-    // as the checkpoint.
-    if (lastAttemptedKey == null) {
-      ByteKeyRange rval = range;
-      // We update our current range to an interval that contains no elements.
-      range = NO_KEYS;
-      return SplitResult.of(range, rval);
+    ByteKey startKey = (lastAttemptedKey == null) ? range.getStartKey() : 
next(lastAttemptedKey);
+    ByteKey endKey = range.getEndKey();
+    // There is no more space for split.
+    if (!endKey.isEmpty() && startKey.compareTo(endKey) >= 0) {
+      return null;
     }
 
-    // Return an empty range if the current range is done.
-    if (lastAttemptedKey.isEmpty()
-        || !(range.getEndKey().isEmpty() || 
range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
-      return SplitResult.of(range, NO_KEYS);
+    // Treat checkpoint specially because {@link ByteKeyRange#interpolateKey} 
computes a key with
+    // trailing zeros when fraction is 0.
+    if (fractionOfRemainder == 0.0) {
+      // If we haven't done any work, we should return the original range we 
were processing
+      // as the checkpoint.
+      if (lastAttemptedKey == null) {
+        // We update our current range to an interval that contains no 
elements.
+        ByteKeyRange rval = range;
+        range = NO_KEYS;
+        return SplitResult.of(range, rval);
+      } else {
+        range = ByteKeyRange.of(range.getStartKey(), startKey);
+        return SplitResult.of(range, ByteKeyRange.of(startKey, endKey));
+      }
     }
 
-    // Otherwise we compute the "remainder" of the range from the last key.
-    assert lastAttemptedKey.equals(lastClaimedKey)
-        : "Expect both keys to be equal since the last key attempted was a 
valid key in the range.";
-    ByteKey nextKey = next(lastAttemptedKey);
-    ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey());
-    this.range = ByteKeyRange.of(range.getStartKey(), nextKey);
-    return SplitResult.of(range, res);
+    ByteKeyRange unprocessedRange = ByteKeyRange.of(startKey, 
range.getEndKey());
+    ByteKey splitPos;
+    try {
+      // The interpolateKey shouldn't return empty key. Please refer to {@link
+      // ByteKeyRange#interpolateKey}.
+      splitPos = unprocessedRange.interpolateKey(fractionOfRemainder);
+      checkState(!splitPos.isEmpty());
+    } catch (Exception e) {
+      // There is no way to interpolate a key based on provided fraction.
+      return null;
+    }
+    // Computed splitPos is out of current tracking restriction.
+    if (!range.getEndKey().isEmpty() && splitPos.equals(range.getEndKey())) {

Review comment:
       compare whether splitPos >= range.getEndKey()

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
##########
@@ -64,31 +64,56 @@ public ByteKeyRange currentRestriction() {
 
   @Override
   public SplitResult<ByteKeyRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8871): Add support for splitting off a fixed amount of work 
for this restriction
-    // instead of only supporting checkpointing.
+    // No split on an empty range.
+    if (NO_KEYS.equals(range)) {
+      return null;
+    }
+    // There is no more remaining work after the entire range has been claimed.
+    if (lastAttemptedKey != null && lastAttemptedKey.isEmpty()) {
+      return null;
+    }
 
-    // If we haven't done any work, we should return the original range we 
were processing
-    // as the checkpoint.
-    if (lastAttemptedKey == null) {
-      ByteKeyRange rval = range;
-      // We update our current range to an interval that contains no elements.
-      range = NO_KEYS;
-      return SplitResult.of(range, rval);
+    ByteKey startKey = (lastAttemptedKey == null) ? range.getStartKey() : 
next(lastAttemptedKey);

Review comment:
       startKey -> unprocessedRangeStartKey

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
##########
@@ -64,31 +64,56 @@ public ByteKeyRange currentRestriction() {
 
   @Override
   public SplitResult<ByteKeyRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8871): Add support for splitting off a fixed amount of work 
for this restriction
-    // instead of only supporting checkpointing.
+    // No split on an empty range.
+    if (NO_KEYS.equals(range)) {
+      return null;
+    }
+    // There is no more remaining work after the entire range has been claimed.
+    if (lastAttemptedKey != null && lastAttemptedKey.isEmpty()) {
+      return null;
+    }
 
-    // If we haven't done any work, we should return the original range we 
were processing
-    // as the checkpoint.
-    if (lastAttemptedKey == null) {
-      ByteKeyRange rval = range;
-      // We update our current range to an interval that contains no elements.
-      range = NO_KEYS;
-      return SplitResult.of(range, rval);
+    ByteKey startKey = (lastAttemptedKey == null) ? range.getStartKey() : 
next(lastAttemptedKey);
+    ByteKey endKey = range.getEndKey();
+    // There is no more space for split.
+    if (!endKey.isEmpty() && startKey.compareTo(endKey) >= 0) {
+      return null;
     }
 
-    // Return an empty range if the current range is done.
-    if (lastAttemptedKey.isEmpty()
-        || !(range.getEndKey().isEmpty() || 
range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
-      return SplitResult.of(range, NO_KEYS);
+    // Treat checkpoint specially because {@link ByteKeyRange#interpolateKey} 
computes a key with
+    // trailing zeros when fraction is 0.
+    if (fractionOfRemainder == 0.0) {
+      // If we haven't done any work, we should return the original range we 
were processing
+      // as the checkpoint.
+      if (lastAttemptedKey == null) {
+        // We update our current range to an interval that contains no 
elements.
+        ByteKeyRange rval = range;
+        range = NO_KEYS;

Review comment:
       We should return `[startKey, startKey)` as the primary unless `startKey` 
is `""` and then we should return `NO_KEYS`

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
##########
@@ -152,9 +158,35 @@ public void testCheckpointAfterLastUsingEmptyKey() throws 
Exception {
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
     assertFalse(tracker.tryClaim(ByteKey.EMPTY));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+    assertNull(tracker.trySplit(0));
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), 
tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testTrySplit() throws Exception {
+    ByteKeyRangeTracker tracker =
+        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.EMPTY, 
ByteKey.of(0x80)));

Review comment:
       Start with the ALL_KEYS range.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -78,8 +79,11 @@
    *
    * @param fractionOfRemainder A hint as to the fraction of work the primary 
restriction should
    *     represent based upon the current known remaining amount of work.
-   * @return a {@link SplitResult} if a split was possible, otherwise returns 
{@code null}.
+   * @return a {@link SplitResult} if a split was possible, otherwise returns 
{@code null}. A
+   *     checkpoint(fractionOfRemainder == 0) must either return a valid split 
result or null which
+   *     means there is no more left work.

Review comment:
       ```suggestion
      * @return a {@link SplitResult} if a split was possible, otherwise 
returns {@code null}. If the {@code fractionOfRemainder == 0}, a {@code null} 
result MUST imply that the restriction tracker is done and there is no more 
work left to do.
   ```

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
##########
@@ -152,9 +158,35 @@ public void testCheckpointAfterLastUsingEmptyKey() throws 
Exception {
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
     assertFalse(tracker.tryClaim(ByteKey.EMPTY));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+    assertNull(tracker.trySplit(0));
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), 
tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testTrySplit() throws Exception {
+    ByteKeyRangeTracker tracker =
+        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.EMPTY, 
ByteKey.of(0x80)));
+    SplitResult<ByteKeyRange> res = tracker.trySplit(0.5);
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x40)), res.getPrimary());
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.of(0x40), ByteKey.of(0x80)), 
res.getResidual());
+    tracker.tryClaim(ByteKey.of(0x00));
+    res = tracker.trySplit(0.5);
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.of(0x00), ByteKey.of(0x20)), res.getPrimary());

Review comment:
       Why is the primary here 0x00 for the start key and not ByteKey.EMPTY?

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
##########
@@ -50,6 +52,7 @@ public void testTryClaim() throws Exception {
     assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
     assertTrue(tracker.tryClaim(ByteKey.of(0x99)));
     assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
+    tracker.checkDone();
   }
 

Review comment:
       Add a tryClaim test for empty ranges covering NO_KEYS and when startKey 
== endKey

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
##########
@@ -152,9 +158,35 @@ public void testCheckpointAfterLastUsingEmptyKey() throws 
Exception {
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
     assertFalse(tracker.tryClaim(ByteKey.EMPTY));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+    assertNull(tracker.trySplit(0));
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), 
tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testTrySplit() throws Exception {
+    ByteKeyRangeTracker tracker =
+        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.EMPTY, 
ByteKey.of(0x80)));
+    SplitResult<ByteKeyRange> res = tracker.trySplit(0.5);
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x40)), res.getPrimary());
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.of(0x40), ByteKey.of(0x80)), 
res.getResidual());
+    tracker.tryClaim(ByteKey.of(0x00));
+    res = tracker.trySplit(0.5);
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.of(0x00), ByteKey.of(0x20)), res.getPrimary());
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.of(0x20), ByteKey.of(0x40)), 
res.getResidual());
+    assertNull(tracker.trySplit(1));
+  }
+
+  @Test
+  public void testTrySplitAtEmptyRange() throws Exception {
+    ByteKeyRangeTracker tracker = 
ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);

Review comment:
       Cover the case where startKey == endKey and also add a call for trySplit 
at 0.5




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to