[ 
https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=171741&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171741
 ]

ASF GitHub Bot logged work on BEAM-2939:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Dec/18 22:42
            Start Date: 03/Dec/18 22:42
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7177: [BEAM-2939] Add 
support for backlog reporting to byte key and offset restriction trackers.
URL: https://github.com/apache/beam/pull/7177
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
index db83af4c9765..14216c194d56 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
@@ -24,6 +24,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.primitives.Bytes;
+import java.math.BigDecimal;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
@@ -36,7 +37,8 @@
  * <p>Note, one can complete a range by claiming the {@link ByteKey#EMPTY} 
once one runs out of keys
  * to process.
  */
-public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, 
ByteKey> {
+public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, 
ByteKey>
+    implements Backlogs.HasBacklog {
   /* An empty range which contains no keys. */
   @VisibleForTesting
   static final ByteKeyRange NO_KEYS = ByteKeyRange.of(ByteKey.EMPTY, 
ByteKey.of(0x00));
@@ -180,4 +182,30 @@ static ByteKey next(ByteKey key) {
   }
 
   private static final byte[] ZERO_BYTE_ARRAY = new byte[] {0};
+
+  @Override
+  public Backlog getBacklog() {
+    // Return 0 for the empty range which is implicitly done.
+    // This case can occur if the range tracker is checkpointed before any 
keys have been claimed
+    // or if the range tracker is checkpointed once the range is done.
+    if (NO_KEYS.equals(range)) {
+      return Backlog.of(BigDecimal.ZERO);
+    }
+
+    // If we are attempting to get the backlog without processing a single 
key, we return 1.0
+    if (lastAttemptedKey == null) {
+      return Backlog.of(BigDecimal.ONE);
+    }
+
+    // Return 0 if the last attempted key was the empty key representing the 
end of range for
+    // all ranges or the last attempted key is beyond the end of the range.
+    if (lastAttemptedKey.isEmpty()
+        || !(range.getEndKey().isEmpty() || 
range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
+      return Backlog.of(BigDecimal.ZERO);
+    }
+
+    // TODO: Use the ability of BigDecimal's additional precision to more 
accurately report backlog
+    // for keys which are long.
+    return 
Backlog.of(BigDecimal.valueOf(range.estimateFractionForKey(lastAttemptedKey)));
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 2d5626b8d7c3..a8287edf54ca 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -22,15 +22,16 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.MoreObjects;
+import java.math.BigDecimal;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.transforms.DoFn;
 
 /**
  * A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} 
in a monotonically
  * increasing fashion.
  */
-public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long> {
+public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long>
+    implements Backlogs.HasBacklog {
   private OffsetRange range;
   @Nullable private Long lastClaimedOffset = null;
   @Nullable private Long lastAttemptedOffset = null;
@@ -79,17 +80,6 @@ public boolean tryClaim(Long i) {
     return true;
   }
 
-  /**
-   * Marks that there are no more offsets to be claimed in the range.
-   *
-   * <p>E.g., a {@link DoFn} reading a file and claiming the offset of each 
record in the file might
-   * call this if it hits EOF - even though the last attempted claim was 
before the end of the
-   * range, there are no more offsets to claim.
-   */
-  public void markDone() {
-    lastAttemptedOffset = Long.MAX_VALUE;
-  }
-
   @Override
   public void checkDone() throws IllegalStateException {
     checkState(
@@ -109,4 +99,16 @@ public String toString() {
         .add("lastAttemptedOffset", lastAttemptedOffset)
         .toString();
   }
+
+  @Override
+  public Backlog getBacklog() {
+    // If we have never attempted an offset, we return the length of the 
entire range.
+    if (lastAttemptedOffset == null) {
+      return Backlog.of(BigDecimal.valueOf(range.getTo() - range.getFrom()));
+    }
+
+    // Otherwise we return the length from where we are to where we are 
attempting to get to
+    // with a minimum of zero in case we have claimed beyond the end of the 
range.
+    return Backlog.of(BigDecimal.valueOf(Math.max(range.getTo() - 
lastAttemptedOffset, 0)));
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
index fd10f57459fe..7489d59146f6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
@@ -25,5 +25,5 @@
    * authors mark their restriction type with this interface if the 
restriction produces a bounded
    * amount of output.
    */
-  interface IsBounded {}
+  public interface IsBounded {}
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
index 3c3204e7e7ce..bc6da0673e4f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
@@ -18,10 +18,15 @@
 package org.apache.beam.sdk.transforms.splittabledofn;
 
 import static 
org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker.next;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.junit.Rule;
@@ -252,4 +257,39 @@ public void testNextByteKey() {
     assertEquals(next(ByteKey.of(0x00, 0xff)), ByteKey.of(0x00, 0xff, 0x00));
     assertEquals(next(ByteKey.of(0xff, 0xff)), ByteKey.of(0xff, 0xff, 0x00));
   }
+
+  @Test
+  public void testBacklogUnstarted() {
+    ByteKeyRangeTracker tracker = 
ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
+    assertEquals(BigDecimal.ONE, tracker.getBacklog().backlog());
+
+    tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), 
ByteKey.of(0xc0)));
+    assertEquals(BigDecimal.ONE, tracker.getBacklog().backlog());
+  }
+
+  @Test
+  public void testBacklogFinished() {
+    ByteKeyRangeTracker tracker = 
ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
+    tracker.tryClaim(ByteKey.EMPTY);
+    assertEquals(BigDecimal.ZERO, tracker.getBacklog().backlog());
+
+    tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), 
ByteKey.of(0xc0)));
+    tracker.tryClaim(ByteKey.of(0xd0));
+    assertEquals(BigDecimal.ZERO, tracker.getBacklog().backlog());
+  }
+
+  @Test
+  public void testBacklogPartiallyCompleted() {
+    ByteKeyRangeTracker tracker = 
ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
+    tracker.tryClaim(ByteKey.of(0xa0));
+    assertThat(
+        tracker.getBacklog().backlog(),
+        allOf(greaterThan(BigDecimal.ZERO), lessThan(BigDecimal.ONE)));
+
+    tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), 
ByteKey.of(0xc0)));
+    tracker.tryClaim(ByteKey.of(0xa0));
+    assertThat(
+        tracker.getBacklog().backlog(),
+        allOf(greaterThan(BigDecimal.ZERO), lessThan(BigDecimal.ONE)));
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index b723dd886603..2f162c6791b4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -21,6 +21,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.junit.Rule;
 import org.junit.Test;
@@ -156,11 +157,33 @@ public void testCheckDoneWhenNotDone() {
   }
 
   @Test
-  public void testCheckDoneWhenExplicitlyMarkedDone() {
-    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 
200));
-    assertTrue(tracker.tryClaim(150L));
-    assertTrue(tracker.tryClaim(175L));
-    tracker.markDone();
-    tracker.checkDone();
+  public void testBacklogUnstarted() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0, 
200));
+    assertEquals(BigDecimal.valueOf(200), tracker.getBacklog().backlog());
+
+    tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    assertEquals(BigDecimal.valueOf(100), tracker.getBacklog().backlog());
+  }
+
+  @Test
+  public void testBacklogFinished() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0, 
200));
+    tracker.tryClaim(300L);
+    assertEquals(BigDecimal.ZERO, tracker.getBacklog().backlog());
+
+    tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    tracker.tryClaim(300L);
+    assertEquals(BigDecimal.ZERO, tracker.getBacklog().backlog());
+  }
+
+  @Test
+  public void testBacklogPartiallyCompleted() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0, 
200));
+    tracker.tryClaim(150L);
+    assertEquals(BigDecimal.valueOf(50), tracker.getBacklog().backlog());
+
+    tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    tracker.tryClaim(150L);
+    assertEquals(BigDecimal.valueOf(50), tracker.getBacklog().backlog());
   }
 }
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
index 53270f474c02..addeb687caf2 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.fn.splittabledofn;
 
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlogs;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 
 /** Support utilities for interacting with {@link RestrictionTracker 
RestrictionTrackers}. */
@@ -35,70 +38,101 @@
    * A {@link RestrictionTracker} which forwards all calls to the delegate 
{@link
    * RestrictionTracker}.
    */
-  private static class ForwardingRestrictionTracker<RestrictionT, PositionT>
+  @ThreadSafe
+  private static class RestrictionTrackerObserver<RestrictionT, PositionT>
       extends RestrictionTracker<RestrictionT, PositionT> {
-    private final RestrictionTracker<RestrictionT, PositionT> delegate;
+    protected final RestrictionTracker<RestrictionT, PositionT> delegate;
+    private final ClaimObserver<PositionT> claimObserver;
 
-    protected ForwardingRestrictionTracker(RestrictionTracker<RestrictionT, 
PositionT> delegate) {
+    protected RestrictionTrackerObserver(
+        RestrictionTracker<RestrictionT, PositionT> delegate,
+        ClaimObserver<PositionT> claimObserver) {
       this.delegate = delegate;
+      this.claimObserver = claimObserver;
     }
 
     @Override
-    public boolean tryClaim(PositionT position) {
-      return delegate.tryClaim(position);
+    public synchronized boolean tryClaim(PositionT position) {
+      if (delegate.tryClaim(position)) {
+        claimObserver.onClaimed(position);
+        return true;
+      } else {
+        claimObserver.onClaimFailed(position);
+        return false;
+      }
     }
 
     @Override
-    public RestrictionT currentRestriction() {
+    public synchronized RestrictionT currentRestriction() {
       return delegate.currentRestriction();
     }
 
     @Override
-    public RestrictionT checkpoint() {
+    public synchronized RestrictionT checkpoint() {
       return delegate.checkpoint();
     }
 
     @Override
-    public void checkDone() throws IllegalStateException {
+    public synchronized void checkDone() throws IllegalStateException {
       delegate.checkDone();
     }
   }
 
   /**
-   * A {@link RestrictionTracker} which notifies the {@link ClaimObserver} if 
a claim succeeded or
-   * failed.
+   * A {@link RestrictionTracker} which forwards all calls to the delegate 
backlog reporting {@link
+   * RestrictionTracker}.
    */
-  private static class RestrictionTrackerObserver<RestrictionT, PositionT>
-      extends ForwardingRestrictionTracker<RestrictionT, PositionT> {
-    private final ClaimObserver<PositionT> claimObserver;
+  @ThreadSafe
+  private static class RestrictionTrackerObserverWithBacklog<RestrictionT, 
PositionT>
+      extends RestrictionTrackerObserver<RestrictionT, PositionT> implements 
Backlogs.HasBacklog {
 
-    private RestrictionTrackerObserver(
+    protected RestrictionTrackerObserverWithBacklog(
         RestrictionTracker<RestrictionT, PositionT> delegate,
         ClaimObserver<PositionT> claimObserver) {
-      super(delegate);
-      this.claimObserver = claimObserver;
+      super(delegate, claimObserver);
     }
 
     @Override
-    public boolean tryClaim(PositionT position) {
-      if (super.tryClaim(position)) {
-        claimObserver.onClaimed(position);
-        return true;
-      } else {
-        claimObserver.onClaimFailed(position);
-        return false;
-      }
+    public synchronized Backlog getBacklog() {
+      return ((Backlogs.HasBacklog) delegate).getBacklog();
+    }
+  }
+
+  /**
+   * A {@link RestrictionTracker} which forwards all calls to the delegate 
partitioned backlog
+   * reporting {@link RestrictionTracker}.
+   */
+  @ThreadSafe
+  private static class 
RestrictionTrackerObserverWithPartitionedBacklog<RestrictionT, PositionT>
+      extends RestrictionTrackerObserverWithBacklog<RestrictionT, PositionT>
+      implements Backlogs.HasPartitionedBacklog {
+
+    protected RestrictionTrackerObserverWithPartitionedBacklog(
+        RestrictionTracker<RestrictionT, PositionT> delegate,
+        ClaimObserver<PositionT> claimObserver) {
+      super(delegate, claimObserver);
+    }
+
+    @Override
+    public synchronized byte[] getBacklogPartition() {
+      return ((Backlogs.HasPartitionedBacklog) delegate).getBacklogPartition();
     }
   }
 
   /**
-   * Returns a {@link RestrictionTracker} which reports all claim attempts to 
the specified {@link
-   * ClaimObserver}.
+   * Returns a thread safe {@link RestrictionTracker} which reports all claim 
attempts to the
+   * specified {@link ClaimObserver}.
    */
   public static <RestrictionT, PositionT> RestrictionTracker<RestrictionT, 
PositionT> observe(
       RestrictionTracker<RestrictionT, PositionT> restrictionTracker,
       ClaimObserver<PositionT> claimObserver) {
-    return new RestrictionTrackerObserver<RestrictionT, PositionT>(
-        restrictionTracker, claimObserver);
+    if (restrictionTracker instanceof Backlogs.HasPartitionedBacklog) {
+      return new RestrictionTrackerObserverWithPartitionedBacklog<>(
+          restrictionTracker, claimObserver);
+    } else if (restrictionTracker instanceof Backlogs.HasBacklog) {
+      return new RestrictionTrackerObserverWithBacklog<>(restrictionTracker, 
claimObserver);
+    } else {
+      return new RestrictionTrackerObserver<>(restrictionTracker, 
claimObserver);
+    }
   }
 }
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
index 52d03ed11d1f..c3bb28906fde 100644
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
+++ 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
@@ -18,12 +18,15 @@
 package org.apache.beam.sdk.fn.splittabledofn;
 
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlogs;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -81,4 +84,73 @@ public void onClaimFailed(String position) {
 
     assertThat(positionsObserved, contains("goodClaim", "badClaim"));
   }
+
+  private static class RestrictionTrackerWithBacklog extends 
RestrictionTracker<Object, Object>
+      implements Backlogs.HasBacklog {
+
+    @Override
+    public Backlog getBacklog() {
+      return null;
+    }
+
+    @Override
+    public boolean tryClaim(Object position) {
+      return false;
+    }
+
+    @Override
+    public Object currentRestriction() {
+      return null;
+    }
+
+    @Override
+    public Object checkpoint() {
+      return null;
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {}
+  }
+
+  private static class RestrictionTrackerWithBacklogPartitionedBacklog
+      extends RestrictionTracker<Object, Object> implements 
Backlogs.HasPartitionedBacklog {
+
+    @Override
+    public Backlog getBacklog() {
+      return null;
+    }
+
+    @Override
+    public boolean tryClaim(Object position) {
+      return false;
+    }
+
+    @Override
+    public Object currentRestriction() {
+      return null;
+    }
+
+    @Override
+    public Object checkpoint() {
+      return null;
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {}
+
+    @Override
+    public byte[] getBacklogPartition() {
+      return null;
+    }
+  }
+
+  @Test
+  public void testClaimObserversMaintainBacklogInterfaces() {
+    RestrictionTracker hasBacklog =
+        RestrictionTrackers.observe(new RestrictionTrackerWithBacklog(), null);
+    assertThat(hasBacklog, instanceOf(Backlogs.HasBacklog.class));
+    RestrictionTracker hasPartitionedBacklog =
+        RestrictionTrackers.observe(new 
RestrictionTrackerWithBacklogPartitionedBacklog(), null);
+    assertThat(hasPartitionedBacklog, 
instanceOf(Backlogs.HasPartitionedBacklog.class));
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 171741)
    Time Spent: 12h 20m  (was: 12h 10m)

> Fn API streaming SDF support
> ----------------------------
>
>                 Key: BEAM-2939
>                 URL: https://issues.apache.org/jira/browse/BEAM-2939
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Henning Rohde
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> The Fn API should support streaming SDF. Detailed design TBD.
> Once design is ready, expand subtasks similarly to BEAM-2822.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to