reuvenlax commented on a change in pull request #12864:
URL: https://github.com/apache/beam/pull/12864#discussion_r493145233



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
##########
@@ -470,81 +541,514 @@ protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKey cache)
     }
   }
 
-  private static class WindmillOrderedList<T> extends SimpleWindmillState
-      implements OrderedListState<T> {
+  // Coder for closed-open ranges.
+  private static class RangeCoder<T extends Comparable> extends 
CustomCoder<Range<T>> {
+    private Coder<T> boundCoder;
+
+    RangeCoder(Coder<T> boundCoder) {
+      this.boundCoder = NullableCoder.of(boundCoder);
+    }
+
+    @Override
+    public void encode(Range<T> value, OutputStream outStream) throws 
CoderException, IOException {
+      Preconditions.checkState(
+          value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " 
+ value);
+      Preconditions.checkState(
+          value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + 
value);
+      boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, 
outStream);
+      boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, 
outStream);
+    }
+
+    @Override
+    public Range<T> decode(InputStream inStream) throws CoderException, 
IOException {
+      @Nullable T lower = boundCoder.decode(inStream);
+      @Nullable T upper = boundCoder.decode(inStream);
+      if (lower == null) {
+        return upper != null ? Range.lessThan(upper) : Range.all();
+      } else if (upper == null) {
+        return Range.atLeast(lower);
+      } else {
+        return Range.closedOpen(lower, upper);
+      }
+    }
+  }
+
+  private static class RangeSetCoder<T extends Comparable> extends 
CustomCoder<RangeSet<T>> {
+    private SetCoder<Range<T>> rangesCoder;
+
+    RangeSetCoder(Coder<T> boundCoder) {
+      this.rangesCoder = SetCoder.of(new RangeCoder<>(boundCoder));
+    }
+
+    @Override
+    public void encode(RangeSet<T> value, OutputStream outStream) throws 
IOException {
+      rangesCoder.encode(value.asRanges(), outStream);
+    }
+
+    @Override
+    public RangeSet<T> decode(InputStream inStream) throws CoderException, 
IOException {
+      return TreeRangeSet.create(rangesCoder.decode(inStream));
+    }
+  }
+
+  /**
+   * Tracker for the ids used in an ordered list.
+   *
+   * <p>Windmill accepts an int64 id for each timestamped-element in the list. 
Unique elements are
+   * identified by the pair of timestamp and id. This means that tow unique 
elements e1, e2 must
+   * have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket 
time into five-minute
+   * buckets, and store a free list of ids available for each bucket.
+   *
+   * <p>When a timestamp range is deleted, we remove id tracking for elements 
in that range. In
+   * order to handle the case where a range is deleted piecemeal, we track 
sub-range deletions for
+   * each range. For example:
+   *
+   * <p>12:00 - 12:05 ids 12:05 - 12:10 ids
+   *
+   * <p>delete 12:00-12:06
+   *
+   * <p>12:00 - 12:05 *removed* 12:05 - 12:10 ids subranges deleted 12:05-12:06
+   *
+   * <p>delete 12:06 - 12:07
+   *
+   * <p>12:05 - 12:10 ids subranges deleted 12:05-12:07
+   *
+   * <p>delete 12:07 - 12:10
+   *
+   * <p>12:05 - 12:10 *removed*
+   */
+  static final class IdTracker {
+    static final String IDS_AVAILABLE_STR = "IdsAvailable";
+    static final String DELETIONS_STR = "Deletions";
+
+    static final long MIN_ID = Long.MIN_VALUE;
+    static final long MAX_ID = Long.MAX_VALUE;
+
+    // We track ids on five-minute boundaries.
+    private static final Duration RESOLUTION = Duration.standardMinutes(5);
+    static final MapCoder<Range<Instant>, RangeSet<Long>> IDS_AVAILABLE_CODER =
+        MapCoder.of(new RangeCoder<>(InstantCoder.of()), new 
RangeSetCoder<>(VarLongCoder.of()));
+    static final MapCoder<Range<Instant>, RangeSet<Instant>> 
SUBRANGE_DELETIONS_CODER =
+        MapCoder.of(new RangeCoder<>(InstantCoder.of()), new 
RangeSetCoder<>(InstantCoder.of()));
+    private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Long>>>> 
idsAvailableTag;
+    // A map from five-minute ranges to the set of ids available in that 
interval.
+    final ValueState<Map<Range<Instant>, RangeSet<Long>>> idsAvailableValue;
+    private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Instant>>>> 
subRangeDeletionsTag;
+    // If a timestamp-range in the map has been partially cleared, the cleared 
intervals are stored
+    // here.
+    final ValueState<Map<Range<Instant>, RangeSet<Instant>>> 
subRangeDeletionsValue;
+
+    IdTracker(
+        StateTable stateTable,
+        StateNamespace namespace,
+        StateTag<?> spec,
+        String stateFamily,
+        boolean complete) {
+      this.idsAvailableTag =
+          StateTags.makeSystemTagInternal(
+              StateTags.value(spec.getId() + IDS_AVAILABLE_STR, 
IDS_AVAILABLE_CODER));
+      this.idsAvailableValue =
+          stateTable.get(namespace, idsAvailableTag, 
StateContexts.nullContext());
+      this.subRangeDeletionsTag =
+          StateTags.makeSystemTagInternal(
+              StateTags.value(spec.getId() + DELETIONS_STR, 
SUBRANGE_DELETIONS_CODER));
+      this.subRangeDeletionsValue =
+          stateTable.get(namespace, subRangeDeletionsTag, 
StateContexts.nullContext());
+    }
+
+    static <ValueT extends Comparable<? super ValueT>>
+        Map<Range<Instant>, RangeSet<ValueT>> newSortedRangeMap(Class<ValueT> 
valueClass) {
+      return Maps.newTreeMap(
+          Comparator.<Range<Instant>, Instant>comparing(Range::lowerEndpoint)
+              .thenComparing(Range::upperEndpoint));
+    }
+
+    private Range<Instant> getTrackedRange(Instant ts) {
+      Instant snapped =
+          new Instant(ts.getMillis() - ts.plus(RESOLUTION).getMillis() % 
RESOLUTION.getMillis());
+      return Range.closedOpen(snapped, snapped.plus(RESOLUTION));
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    void readLater() {
+      idsAvailableValue.readLater();
+      subRangeDeletionsValue.readLater();
+    }
+
+    Map<Range<Instant>, RangeSet<Long>> readIdsAvailable() {
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = 
idsAvailableValue.read();
+      return idsAvailable != null ? idsAvailable : 
newSortedRangeMap(Long.class);
+    }
+
+    Map<Range<Instant>, RangeSet<Instant>> readSubRangeDeletions() {
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
subRangeDeletionsValue.read();
+      return subRangeDeletions != null ? subRangeDeletions : 
newSortedRangeMap(Instant.class);
+    }
+
+    void clear() throws ExecutionException, InterruptedException {
+      idsAvailableValue.clear();
+      subRangeDeletionsValue.clear();
+    }
+
+    <T> void add(
+        SortedSet<TimestampedValueWithId<T>> elements, 
BiConsumer<TimestampedValue<T>, Long> output)
+        throws ExecutionException, InterruptedException {
+      Range<Long> currentIdRange = null;
+      long currentId = 0;
+
+      Range<Instant> currentTsRange = null;
+      RangeSet<Instant> currentTsRangeDeletions = null;
+
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
readSubRangeDeletions();
+
+      RangeSet<Long> availableIdsForTsRange = null;
+      Iterator<Range<Long>> idRangeIter = null;
+      RangeSet<Long> idsUsed = TreeRangeSet.create();
+      for (TimestampedValueWithId<T> pendingAdd : elements) {
+        // Since elements are in increasing ts order, often we'll be able to 
reuse the previous
+        // iteration's range.
+        if (currentTsRange == null
+            || !currentTsRange.contains(pendingAdd.getValue().getTimestamp())) 
{
+          if (availableIdsForTsRange != null) {
+            // We're moving onto a new ts range. Remove all used ids
+            availableIdsForTsRange.removeAll(idsUsed);
+            idsUsed = TreeRangeSet.create();
+          }
+
+          // Lookup the range for the current timestamp.
+          currentTsRange = 
getTrackedRange(pendingAdd.getValue().getTimestamp());
+          // Lookup available ids for this timestamp range. If nothing there, 
we default to all ids
+          // available.
+          availableIdsForTsRange =
+              idsAvailable.computeIfAbsent(
+                  currentTsRange,
+                  r -> 
TreeRangeSet.create(ImmutableList.of(Range.closedOpen(MIN_ID, MAX_ID))));
+          idRangeIter = availableIdsForTsRange.asRanges().iterator();
+          currentIdRange = null;
+          currentTsRangeDeletions = subRangeDeletions.get(currentTsRange);
+        }
 
+        if (currentIdRange == null || currentId >= 
currentIdRange.upperEndpoint()) {
+          // Move to the next range of free ids, and start assigning ranges 
from there.
+          currentIdRange = idRangeIter.next();
+          currentId = currentIdRange.lowerEndpoint();
+        }
+
+        if (currentTsRangeDeletions != null) {
+          currentTsRangeDeletions.remove(
+              Range.closedOpen(
+                  pendingAdd.getValue().getTimestamp(),
+                  pendingAdd.getValue().getTimestamp().plus(1)));
+        }
+        idsUsed.add(Range.closedOpen(currentId, currentId + 1));
+        output.accept(pendingAdd.getValue(), currentId++);
+      }
+      if (availableIdsForTsRange != null) {
+        availableIdsForTsRange.removeAll(idsUsed);
+      }
+      idsAvailableValue.write(idsAvailable);
+      subRangeDeletionsValue.write(subRangeDeletions);
+    }
+
+    // Remove a timestamp range. Returns ids freed up.
+    void remove(Range<Instant> tsRange) throws ExecutionException, 
InterruptedException {
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
readSubRangeDeletions();
+
+      for (Range<Instant> current = getTrackedRange(tsRange.lowerEndpoint());
+          current.lowerEndpoint().isBefore(tsRange.upperEndpoint());
+          current = getTrackedRange(current.lowerEndpoint().plus(RESOLUTION))) 
{
+        // TODO: shouldn't need to iterate over all ranges.
+        boolean rangeCleared;
+        if (!tsRange.encloses(current)) {
+          // This can happen if the beginning or the end of tsRange doesn't 
fall on a RESOLUTION
+          // boundary. Since we
+          // are deleting a portion of a tracked range, track what we are 
deleting.
+          RangeSet<Instant> rangeDeletions =
+              subRangeDeletions.computeIfAbsent(current, r -> 
TreeRangeSet.create());
+          rangeDeletions.add(tsRange.intersection(current));
+          // If we ended up deleting the whole range, than we can simply 
remove it from the tracking
+          // map.
+          rangeCleared = rangeDeletions.encloses(current);
+        } else {
+          rangeCleared = true;
+        }
+        if (rangeCleared) {
+          // Remove the range from both maps.
+          idsAvailable.remove(current);
+          subRangeDeletions.remove(current);
+        }
+      }
+      idsAvailableValue.write(idsAvailable);
+      subRangeDeletionsValue.write(subRangeDeletions);
+    }
+  }
+
+  @AutoValue
+  abstract static class TimestampedValueWithId<T> {
+    private static final Comparator<TimestampedValueWithId<?>> COMPARATOR =
+        Comparator.<TimestampedValueWithId<?>, Instant>comparing(v -> 
v.getValue().getTimestamp())
+            .thenComparingLong(TimestampedValueWithId::getId);
+
+    abstract TimestampedValue<T> getValue();
+
+    abstract long getId();
+
+    static <T> TimestampedValueWithId<T> of(TimestampedValue<T> value, long 
id) {
+      return new 
AutoValue_WindmillStateInternals_TimestampedValueWithId<>(value, id);
+    }
+
+    static <T> TimestampedValueWithId<T> bound(Instant ts) {
+      return of(TimestampedValue.of(null, ts), Long.MIN_VALUE);
+    }
+  }
+
+  static class WindmillOrderedList<T> extends SimpleWindmillState implements 
OrderedListState<T> {
     private final StateNamespace namespace;
     private final StateTag<OrderedListState<T>> spec;
     private final ByteString stateKey;
     private final String stateFamily;
     private final Coder<T> elemCoder;
+    private boolean complete;
+    private boolean cleared = false;
+    // We need to sort based on timestamp, but we need objects with the same 
timestamp to be treated
+    // as unique. We can't use a MultiSet as we can't construct a comparator 
that uniquely
+    // identifies objects,
+    // so we construct a unique in-memory long ids for each element.
+    private SortedSet<TimestampedValueWithId<T>> pendingAdds =
+        Sets.newTreeSet(TimestampedValueWithId.COMPARATOR);
+
+    private RangeSet<Instant> pendingDeletes = TreeRangeSet.create();
+    private IdTracker idTracker;
+
+    // The default proto values for SortedListRange correspond to the minimum 
and maximum
+    // timestamps.
+    static final long MIN_TS_MICROS = 
SortedListRange.getDefaultInstance().getStart();
+    static final long MAX_TS_MICROS = 
SortedListRange.getDefaultInstance().getLimit();
 
     private WindmillOrderedList(
+        StateTable derivedStateTable,
         StateNamespace namespace,
         StateTag<OrderedListState<T>> spec,
         String stateFamily,
         Coder<T> elemCoder,
         boolean isNewKey) {
       this.namespace = namespace;
       this.spec = spec;
+
       this.stateKey = encodeKey(namespace, spec);
       this.stateFamily = stateFamily;
       this.elemCoder = elemCoder;
+      this.complete = isNewKey;
+      this.idTracker = new IdTracker(derivedStateTable, namespace, spec, 
stateFamily, complete);
     }
 
     @Override
     public Iterable<TimestampedValue<T>> read() {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+      return readRange(null, null);
+    }
+
+    private SortedSet<TimestampedValueWithId<T>> getPendingAddRange(
+        @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+      SortedSet<TimestampedValueWithId<T>> pendingInRange = pendingAdds;
+      if (minTimestamp != null && limitTimestamp != null) {
+        pendingInRange =
+            pendingInRange.subSet(
+                TimestampedValueWithId.bound(minTimestamp),
+                TimestampedValueWithId.bound(limitTimestamp));
+      } else if (minTimestamp == null && limitTimestamp != null) {
+        pendingInRange = 
pendingInRange.headSet(TimestampedValueWithId.bound(limitTimestamp));
+      } else if (limitTimestamp == null && minTimestamp != null) {
+        pendingInRange = 
pendingInRange.tailSet(TimestampedValueWithId.bound(minTimestamp));
+      }
+      return pendingInRange;
     }
 
     @Override
-    public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, 
Instant limitTimestamp) {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+    public Iterable<TimestampedValue<T>> readRange(

Review comment:
       maybe, but right now I think that would be premature. Users only get 
charged for bytes transferred, so it's unclear that this would save end users 
much.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to