[
https://issues.apache.org/jira/browse/BEAM-5974?focusedWorklogId=163300&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163300
]
ASF GitHub Bot logged work on BEAM-5974:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Nov/18 00:55
Start Date: 07/Nov/18 00:55
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #6949: [BEAM-5974] Fix
ByteKeyRangeTracker to handle tryClaim(ByteKey.EMPTY) instead of exposing
markDone
URL: https://github.com/apache/beam/pull/6949
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
index 0a553f7d931..52ee359135f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
@@ -27,14 +27,20 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
-import org.apache.beam.sdk.transforms.DoFn;
/**
* A {@link RestrictionTracker} for claiming {@link ByteKey}s in a {@link
ByteKeyRange} in a
* monotonically increasing fashion. The range is a semi-open bounded interval
[startKey, endKey)
- * where the limits are both represented by ByteKey.EMPTY.
+ * where the limits are both represented by {@link ByteKey#EMPTY}.
+ *
+ * <p>Note, one can complete a range by claiming the {@link ByteKey#EMPTY}
once one runs out of keys
+ * to process.
*/
public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange,
ByteKey> {
+ /* An empty range which contains no keys. */
+ @VisibleForTesting
+ static final ByteKeyRange NO_KEYS = ByteKeyRange.of(ByteKey.EMPTY,
ByteKey.of(0x00));
+
private ByteKeyRange range;
@Nullable private ByteKey lastClaimedKey = null;
@Nullable private ByteKey lastAttemptedKey = null;
@@ -54,8 +60,25 @@ public synchronized ByteKeyRange currentRestriction() {
@Override
public synchronized ByteKeyRange checkpoint() {
- checkState(lastClaimedKey != null, "Can't checkpoint before any key was
successfully claimed");
- ByteKey nextKey = next(lastClaimedKey);
+ // If we haven't done any work, we should return the original range we
were processing
+ // as the checkpoint.
+ if (lastAttemptedKey == null) {
+ ByteKeyRange rval = ByteKeyRange.of(range.getStartKey(),
range.getEndKey());
+ // We update our current range to an interval that contains no elements.
+ range = NO_KEYS;
+ return rval;
+ }
+
+ // Return an empty range if the current range is done.
+ if (lastAttemptedKey.isEmpty()
+ || !(range.getEndKey().isEmpty() ||
range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
+ return NO_KEYS;
+ }
+
+ // Otherwise we compute the "remainder" of the range from the last key.
+ assert lastAttemptedKey.equals(lastClaimedKey)
+ : "Expect both keys to be equal since the last key attempted was a
valid key in the range.";
+ ByteKey nextKey = next(lastAttemptedKey);
ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey());
this.range = ByteKeyRange.of(range.getStartKey(), nextKey);
return res;
@@ -64,16 +87,28 @@ public synchronized ByteKeyRange checkpoint() {
/**
* Attempts to claim the given key.
*
- * <p>Must be larger than the last successfully claimed key.
+ * <p>Must be larger than the last attempted key. Note that passing in
{@link ByteKey#EMPTY}
+ * claims all keys to the end of range and can only be claimed once.
*
* @return {@code true} if the key was successfully claimed, {@code false}
if it is outside the
* current {@link ByteKeyRange} of this tracker (in that case this
operation is a no-op).
*/
@Override
protected synchronized boolean tryClaimImpl(ByteKey key) {
+ // Handle claiming the end of range EMPTY key
+ if (key.isEmpty()) {
+ checkArgument(
+ lastAttemptedKey == null || !lastAttemptedKey.isEmpty(),
+ "Trying to claim key %s while last attempted key was %s",
+ key,
+ lastAttemptedKey);
+ lastAttemptedKey = key;
+ return false;
+ }
+
checkArgument(
lastAttemptedKey == null || key.compareTo(lastAttemptedKey) > 0,
- "Trying to claim key %s while last attempted was %s",
+ "Trying to claim key %s while last attempted key was %s",
key,
lastAttemptedKey);
checkArgument(
@@ -81,6 +116,7 @@ protected synchronized boolean tryClaimImpl(ByteKey key) {
"Trying to claim key %s before start of the range %s",
key,
range);
+
lastAttemptedKey = key;
// No respective checkArgument for i < range.to() - it's ok to try
claiming keys beyond
if (!range.getEndKey().isEmpty() && key.compareTo(range.getEndKey()) > -1)
{
@@ -90,32 +126,38 @@ protected synchronized boolean tryClaimImpl(ByteKey key) {
return true;
}
- /**
- * Marks that there are no more keys to be claimed in the range.
- *
- * <p>E.g., a {@link DoFn} reading a file and claiming the key of each
record in the file might
- * call this if it hits EOF - even though the last attempted claim was
before the end of the
- * range, there are no more keys to claim.
- */
- public synchronized void markDone() {
- lastAttemptedKey = range.getEndKey();
- }
-
@Override
public synchronized void checkDone() throws IllegalStateException {
- checkState(lastAttemptedKey != null, "Can't check if done before any key
claim was attempted");
- ByteKey nextKey = next(lastAttemptedKey);
+ // Handle checking the empty range which is implicitly done.
+ // This case can occur if the range tracker is checkpointed before any
keys have been claimed
+ // or if the range tracker is checkpointed once the range is done.
+ if (NO_KEYS.equals(range)) {
+ return;
+ }
+
checkState(
- nextKey.compareTo(range.getEndKey()) > -1,
- "Last attempted key was %s in range %s, claiming work in [%s, %s) was
not attempted",
- lastAttemptedKey,
- range,
- nextKey,
- range.getEndKey());
+ lastAttemptedKey != null,
+ "Key range is non-empty %s and no keys have been attempted.",
+ range);
+
+ // Return if the last attempted key was the empty key representing the end
of range for
+ // all ranges.
+ if (lastAttemptedKey.isEmpty()) {
+ return;
+ }
+
+ // If the last attempted key was not at or beyond the end of the range
then throw.
+ if (range.getEndKey().isEmpty() ||
range.getEndKey().compareTo(lastAttemptedKey) > 0) {
+ ByteKey nextKey = next(lastAttemptedKey);
+ throw new IllegalStateException(
+ String.format(
+ "Last attempted key was %s in range %s, claiming work in [%s,
%s) was not attempted",
+ lastAttemptedKey, range, nextKey, range.getEndKey()));
+ }
}
@Override
- public String toString() {
+ public synchronized String toString() {
return MoreObjects.toStringHelper(this)
.add("range", range)
.add("lastClaimedKey", lastClaimedKey)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
index a285ec82ef6..3c3204e7e7c 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
@@ -52,8 +52,23 @@ public void testTryClaim() throws Exception {
public void testCheckpointUnstarted() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
- expected.expect(IllegalStateException.class);
- tracker.checkpoint();
+
+ ByteKeyRange checkpoint = tracker.checkpoint();
+ // We expect to get the original range back and that the current
restriction
+ // is effectively made empty.
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)),
checkpoint);
+ assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
+ }
+
+ @Test
+ public void testCheckpointUnstartedForAllKeysRange() throws Exception {
+ ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
+
+ ByteKeyRange checkpoint = tracker.checkpoint();
+ // We expect to get the original range back and that the current
restriction
+ // is effectively made empty.
+ assertEquals(ByteKeyRange.ALL_KEYS, checkpoint);
+ assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
}
@Test
@@ -61,8 +76,9 @@ public void testCheckpointOnlyFailedClaim() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
- expected.expect(IllegalStateException.class);
- tracker.checkpoint();
+ ByteKeyRange checkpoint = tracker.checkpoint();
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)),
tracker.currentRestriction());
+ assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}
@Test
@@ -89,20 +105,30 @@ public void testCheckpointRegular() throws Exception {
}
@Test
- public void testCheckpointClaimedLast() throws Exception {
+ public void testCheckpointAtLast() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
- assertTrue(tracker.tryClaim(ByteKey.of(0xbf)));
+ assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
ByteKeyRange checkpoint = tracker.checkpoint();
- assertEquals(
- ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xbf, 0x00)),
tracker.currentRestriction());
- assertEquals(ByteKeyRange.of(ByteKey.of(0xbf, 0x00), ByteKey.of(0xc0)),
checkpoint);
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)),
tracker.currentRestriction());
+ assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}
@Test
- public void testCheckpointAfterFailedClaim() throws Exception {
+ public void testCheckpointAtLastUsingAllKeysAndEmptyKey() throws Exception {
+ ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
+ assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
+ assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
+ assertFalse(tracker.tryClaim(ByteKey.EMPTY));
+ ByteKeyRange checkpoint = tracker.checkpoint();
+ assertEquals(ByteKeyRange.ALL_KEYS, tracker.currentRestriction());
+ assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+ }
+
+ @Test
+ public void testCheckpointAfterLast() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
@@ -110,28 +136,40 @@ public void testCheckpointAfterFailedClaim() throws
Exception {
assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
ByteKeyRange checkpoint = tracker.checkpoint();
- assertEquals(
- ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa0, 0x00)),
tracker.currentRestriction());
- assertEquals(ByteKeyRange.of(ByteKey.of(0xa0, 0x00), ByteKey.of(0xc0)),
checkpoint);
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)),
tracker.currentRestriction());
+ assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+ }
+
+ @Test
+ public void testCheckpointAfterLastUsingEmptyKey() throws Exception {
+ ByteKeyRangeTracker tracker =
+ ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
+ assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
+ assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
+ assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
+ assertFalse(tracker.tryClaim(ByteKey.EMPTY));
+ ByteKeyRange checkpoint = tracker.checkpoint();
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)),
tracker.currentRestriction());
+ assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}
@Test
public void testNonMonotonicClaim() throws Exception {
- expected.expectMessage("Trying to claim key [70] while last attempted was
[90]");
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
+ expected.expectMessage("Trying to claim key [70] while last attempted key
was [90]");
tracker.tryClaim(ByteKey.of(0x70));
}
@Test
public void testClaimBeforeStartOfRange() throws Exception {
+ ByteKeyRangeTracker tracker =
+ ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
expected.expectMessage(
"Trying to claim key [05] before start of the range "
+ "ByteKeyRange{startKey=[10], endKey=[c0]}");
- ByteKeyRangeTracker tracker =
- ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
tracker.tryClaim(ByteKey.of(0x05));
}
@@ -155,6 +193,16 @@ public void testCheckDoneAfterTryClaimAtEndOfRange() {
tracker.checkDone();
}
+ @Test
+ public void testCheckDoneWhenClaimingEndOfRangeForEmptyKey() {
+ ByteKeyRangeTracker tracker =
+ ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.EMPTY));
+ assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
+ assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
+ assertFalse(tracker.tryClaim(ByteKey.EMPTY));
+ tracker.checkDone();
+ }
+
@Test
public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
ByteKeyRangeTracker tracker =
@@ -169,24 +217,20 @@ public void
testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
}
@Test
- public void testCheckDoneWhenNotDone() {
- ByteKeyRangeTracker tracker =
- ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
- assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
- assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
- expected.expectMessage(
- "Last attempted key was [90] in range ByteKeyRange{startKey=[10],
endKey=[c0]}, "
- + "claiming work in [[9000], [c0]) was not attempted");
+ public void testCheckDoneForEmptyRange() {
+ ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
tracker.checkDone();
}
@Test
- public void testCheckDoneWhenExplicitlyMarkedDone() {
+ public void testCheckDoneWhenNotDone() {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10),
ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
- tracker.markDone();
+ expected.expectMessage(
+ "Last attempted key was [90] in range ByteKeyRange{startKey=[10],
endKey=[c0]}, "
+ + "claiming work in [[9000], [c0]) was not attempted");
tracker.checkDone();
}
diff --git
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
index 5df8c037ea0..a3c17e6ca1b 100644
---
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
+++
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
@@ -70,7 +70,7 @@ public void processElement(ProcessContext c,
ByteKeyRangeTracker tracker) throws
}
c.output(result);
}
- tracker.markDone();
+ tracker.tryClaim(ByteKey.EMPTY);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 163300)
Time Spent: 1.5h (was: 1h 20m)
> Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range
> claim instead of markDone
> ----------------------------------------------------------------------------------------------------
>
> Key: BEAM-5974
> URL: https://issues.apache.org/jira/browse/BEAM-5974
> Project: Beam
> Issue Type: Bug
> Components: io-java-hbase, sdk-java-core
> Affects Versions: 2.8.0
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker doesn't
> handle tryClaim(ByteKey.EMPTY) and the related doneness check or
> checkpointing since doneness checking can't handle the empty interval,
> checkpointing returns invalid checkpoints or errors out since it is using the
> lastClaimedKey vs the lastAttemptedKey for doneness checking
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)