dpmills commented on a change in pull request #12864:
URL: https://github.com/apache/beam/pull/12864#discussion_r492927734



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
##########
@@ -470,81 +541,514 @@ protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKey cache)
     }
   }
 
-  private static class WindmillOrderedList<T> extends SimpleWindmillState
-      implements OrderedListState<T> {
+  // Coder for closed-open ranges.
+  private static class RangeCoder<T extends Comparable> extends 
CustomCoder<Range<T>> {

Review comment:
       This should probably extend StructuredCoder instead of CustomCoder, 
although it may not matter if it's never serialized.  If you leave it as 
CustomCoder, implement hashCode and equals

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
##########
@@ -470,81 +541,514 @@ protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKey cache)
     }
   }
 
-  private static class WindmillOrderedList<T> extends SimpleWindmillState
-      implements OrderedListState<T> {
+  // Coder for closed-open ranges.
+  private static class RangeCoder<T extends Comparable> extends 
CustomCoder<Range<T>> {
+    private Coder<T> boundCoder;
+
+    RangeCoder(Coder<T> boundCoder) {
+      this.boundCoder = NullableCoder.of(boundCoder);
+    }
+
+    @Override
+    public void encode(Range<T> value, OutputStream outStream) throws 
CoderException, IOException {
+      Preconditions.checkState(
+          value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " 
+ value);
+      Preconditions.checkState(
+          value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + 
value);
+      boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, 
outStream);
+      boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, 
outStream);
+    }
+
+    @Override
+    public Range<T> decode(InputStream inStream) throws CoderException, 
IOException {
+      @Nullable T lower = boundCoder.decode(inStream);
+      @Nullable T upper = boundCoder.decode(inStream);
+      if (lower == null) {
+        return upper != null ? Range.lessThan(upper) : Range.all();
+      } else if (upper == null) {
+        return Range.atLeast(lower);
+      } else {
+        return Range.closedOpen(lower, upper);
+      }
+    }
+  }
+
+  private static class RangeSetCoder<T extends Comparable> extends 
CustomCoder<RangeSet<T>> {
+    private SetCoder<Range<T>> rangesCoder;
+
+    RangeSetCoder(Coder<T> boundCoder) {
+      this.rangesCoder = SetCoder.of(new RangeCoder<>(boundCoder));
+    }
+
+    @Override
+    public void encode(RangeSet<T> value, OutputStream outStream) throws 
IOException {
+      rangesCoder.encode(value.asRanges(), outStream);
+    }
+
+    @Override
+    public RangeSet<T> decode(InputStream inStream) throws CoderException, 
IOException {
+      return TreeRangeSet.create(rangesCoder.decode(inStream));
+    }
+  }
+
+  /**
+   * Tracker for the ids used in an ordered list.
+   *
+   * <p>Windmill accepts an int64 id for each timestamped-element in the list. 
Unique elements are
+   * identified by the pair of timestamp and id. This means that tow unique 
elements e1, e2 must
+   * have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket 
time into five-minute
+   * buckets, and store a free list of ids available for each bucket.
+   *
+   * <p>When a timestamp range is deleted, we remove id tracking for elements 
in that range. In
+   * order to handle the case where a range is deleted piecemeal, we track 
sub-range deletions for
+   * each range. For example:
+   *
+   * <p>12:00 - 12:05 ids 12:05 - 12:10 ids
+   *
+   * <p>delete 12:00-12:06
+   *
+   * <p>12:00 - 12:05 *removed* 12:05 - 12:10 ids subranges deleted 12:05-12:06
+   *
+   * <p>delete 12:06 - 12:07
+   *
+   * <p>12:05 - 12:10 ids subranges deleted 12:05-12:07
+   *
+   * <p>delete 12:07 - 12:10
+   *
+   * <p>12:05 - 12:10 *removed*
+   */
+  static final class IdTracker {
+    static final String IDS_AVAILABLE_STR = "IdsAvailable";
+    static final String DELETIONS_STR = "Deletions";
+
+    static final long MIN_ID = Long.MIN_VALUE;
+    static final long MAX_ID = Long.MAX_VALUE;
+
+    // We track ids on five-minute boundaries.
+    private static final Duration RESOLUTION = Duration.standardMinutes(5);
+    static final MapCoder<Range<Instant>, RangeSet<Long>> IDS_AVAILABLE_CODER =
+        MapCoder.of(new RangeCoder<>(InstantCoder.of()), new 
RangeSetCoder<>(VarLongCoder.of()));
+    static final MapCoder<Range<Instant>, RangeSet<Instant>> 
SUBRANGE_DELETIONS_CODER =
+        MapCoder.of(new RangeCoder<>(InstantCoder.of()), new 
RangeSetCoder<>(InstantCoder.of()));
+    private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Long>>>> 
idsAvailableTag;
+    // A map from five-minute ranges to the set of ids available in that 
interval.
+    final ValueState<Map<Range<Instant>, RangeSet<Long>>> idsAvailableValue;
+    private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Instant>>>> 
subRangeDeletionsTag;
+    // If a timestamp-range in the map has been partially cleared, the cleared 
intervals are stored
+    // here.
+    final ValueState<Map<Range<Instant>, RangeSet<Instant>>> 
subRangeDeletionsValue;
+
+    IdTracker(
+        StateTable stateTable,
+        StateNamespace namespace,
+        StateTag<?> spec,
+        String stateFamily,
+        boolean complete) {
+      this.idsAvailableTag =
+          StateTags.makeSystemTagInternal(
+              StateTags.value(spec.getId() + IDS_AVAILABLE_STR, 
IDS_AVAILABLE_CODER));
+      this.idsAvailableValue =
+          stateTable.get(namespace, idsAvailableTag, 
StateContexts.nullContext());
+      this.subRangeDeletionsTag =
+          StateTags.makeSystemTagInternal(
+              StateTags.value(spec.getId() + DELETIONS_STR, 
SUBRANGE_DELETIONS_CODER));
+      this.subRangeDeletionsValue =
+          stateTable.get(namespace, subRangeDeletionsTag, 
StateContexts.nullContext());
+    }
+
+    static <ValueT extends Comparable<? super ValueT>>
+        Map<Range<Instant>, RangeSet<ValueT>> newSortedRangeMap(Class<ValueT> 
valueClass) {
+      return Maps.newTreeMap(
+          Comparator.<Range<Instant>, Instant>comparing(Range::lowerEndpoint)
+              .thenComparing(Range::upperEndpoint));
+    }
+
+    private Range<Instant> getTrackedRange(Instant ts) {
+      Instant snapped =
+          new Instant(ts.getMillis() - ts.plus(RESOLUTION).getMillis() % 
RESOLUTION.getMillis());
+      return Range.closedOpen(snapped, snapped.plus(RESOLUTION));
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    void readLater() {
+      idsAvailableValue.readLater();
+      subRangeDeletionsValue.readLater();
+    }
+
+    Map<Range<Instant>, RangeSet<Long>> readIdsAvailable() {
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = 
idsAvailableValue.read();
+      return idsAvailable != null ? idsAvailable : 
newSortedRangeMap(Long.class);
+    }
+
+    Map<Range<Instant>, RangeSet<Instant>> readSubRangeDeletions() {
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
subRangeDeletionsValue.read();
+      return subRangeDeletions != null ? subRangeDeletions : 
newSortedRangeMap(Instant.class);
+    }
+
+    void clear() throws ExecutionException, InterruptedException {
+      idsAvailableValue.clear();
+      subRangeDeletionsValue.clear();
+    }
+
+    <T> void add(
+        SortedSet<TimestampedValueWithId<T>> elements, 
BiConsumer<TimestampedValue<T>, Long> output)
+        throws ExecutionException, InterruptedException {
+      Range<Long> currentIdRange = null;
+      long currentId = 0;
+
+      Range<Instant> currentTsRange = null;
+      RangeSet<Instant> currentTsRangeDeletions = null;
+
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
readSubRangeDeletions();
+
+      RangeSet<Long> availableIdsForTsRange = null;
+      Iterator<Range<Long>> idRangeIter = null;
+      RangeSet<Long> idsUsed = TreeRangeSet.create();
+      for (TimestampedValueWithId<T> pendingAdd : elements) {
+        // Since elements are in increasing ts order, often we'll be able to 
reuse the previous
+        // iteration's range.
+        if (currentTsRange == null
+            || !currentTsRange.contains(pendingAdd.getValue().getTimestamp())) 
{
+          if (availableIdsForTsRange != null) {
+            // We're moving onto a new ts range. Remove all used ids
+            availableIdsForTsRange.removeAll(idsUsed);
+            idsUsed = TreeRangeSet.create();
+          }
+
+          // Lookup the range for the current timestamp.
+          currentTsRange = 
getTrackedRange(pendingAdd.getValue().getTimestamp());
+          // Lookup available ids for this timestamp range. If nothing there, 
we default to all ids
+          // available.
+          availableIdsForTsRange =
+              idsAvailable.computeIfAbsent(
+                  currentTsRange,
+                  r -> 
TreeRangeSet.create(ImmutableList.of(Range.closedOpen(MIN_ID, MAX_ID))));
+          idRangeIter = availableIdsForTsRange.asRanges().iterator();
+          currentIdRange = null;
+          currentTsRangeDeletions = subRangeDeletions.get(currentTsRange);
+        }
 
+        if (currentIdRange == null || currentId >= 
currentIdRange.upperEndpoint()) {
+          // Move to the next range of free ids, and start assigning ranges 
from there.
+          currentIdRange = idRangeIter.next();
+          currentId = currentIdRange.lowerEndpoint();
+        }
+
+        if (currentTsRangeDeletions != null) {
+          currentTsRangeDeletions.remove(
+              Range.closedOpen(
+                  pendingAdd.getValue().getTimestamp(),
+                  pendingAdd.getValue().getTimestamp().plus(1)));
+        }
+        idsUsed.add(Range.closedOpen(currentId, currentId + 1));
+        output.accept(pendingAdd.getValue(), currentId++);
+      }
+      if (availableIdsForTsRange != null) {
+        availableIdsForTsRange.removeAll(idsUsed);
+      }
+      idsAvailableValue.write(idsAvailable);
+      subRangeDeletionsValue.write(subRangeDeletions);
+    }
+
+    // Remove a timestamp range. Returns ids freed up.
+    void remove(Range<Instant> tsRange) throws ExecutionException, 
InterruptedException {
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
readSubRangeDeletions();
+
+      for (Range<Instant> current = getTrackedRange(tsRange.lowerEndpoint());
+          current.lowerEndpoint().isBefore(tsRange.upperEndpoint());
+          current = getTrackedRange(current.lowerEndpoint().plus(RESOLUTION))) 
{
+        // TODO: shouldn't need to iterate over all ranges.

Review comment:
       Put your username or a JIRA issue in the TODOs

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
##########
@@ -470,81 +541,514 @@ protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKey cache)
     }
   }
 
-  private static class WindmillOrderedList<T> extends SimpleWindmillState
-      implements OrderedListState<T> {
+  // Coder for closed-open ranges.
+  private static class RangeCoder<T extends Comparable> extends 
CustomCoder<Range<T>> {
+    private Coder<T> boundCoder;
+
+    RangeCoder(Coder<T> boundCoder) {
+      this.boundCoder = NullableCoder.of(boundCoder);
+    }
+
+    @Override
+    public void encode(Range<T> value, OutputStream outStream) throws 
CoderException, IOException {
+      Preconditions.checkState(
+          value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " 
+ value);
+      Preconditions.checkState(
+          value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + 
value);
+      boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, 
outStream);
+      boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, 
outStream);
+    }
+
+    @Override
+    public Range<T> decode(InputStream inStream) throws CoderException, 
IOException {
+      @Nullable T lower = boundCoder.decode(inStream);
+      @Nullable T upper = boundCoder.decode(inStream);
+      if (lower == null) {
+        return upper != null ? Range.lessThan(upper) : Range.all();
+      } else if (upper == null) {
+        return Range.atLeast(lower);
+      } else {
+        return Range.closedOpen(lower, upper);
+      }
+    }
+  }
+
+  private static class RangeSetCoder<T extends Comparable> extends 
CustomCoder<RangeSet<T>> {

Review comment:
       Could simplify this a little with DelegateCoder

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java
##########
@@ -577,9 +655,30 @@ public void addWeighted(T elem, long weight) {
     return valueList;
   }
 
-  private <T> void consumeBag(TagBag bag, StateTag stateTag) {
+  private <T> List<TimestampedValue<T>> sortedListPageValues(
+      Windmill.TagSortedListFetchResponse sortedListFetchResponse, Coder<T> 
elemCoder) {
+    if (sortedListFetchResponse.getEntriesCount() == 0) {
+      return new WeightedList<>(Collections.emptyList());
+    }
+
+    WeightedList<TimestampedValue<T>> entryList =
+        new WeightedList<>(new 
ArrayList<>(sortedListFetchResponse.getEntriesCount()));
+    for (SortedListEntry entry : sortedListFetchResponse.getEntriesList()) {
+      try {
+        T value = elemCoder.decode(entry.getValue().newInput(), 
Coder.Context.OUTER);
+        entryList.addWeighted(
+            TimestampedValue.of(value, Instant.ofEpochMilli(entry.getSortKey() 
/ 1000)),

Review comment:
       Use WindmillTimeUtils.windmillToHarnessTimestamp().  Current code is 
wrong for negative timestamps.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
##########
@@ -470,81 +541,514 @@ protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKey cache)
     }
   }
 
-  private static class WindmillOrderedList<T> extends SimpleWindmillState
-      implements OrderedListState<T> {
+  // Coder for closed-open ranges.
+  private static class RangeCoder<T extends Comparable> extends 
CustomCoder<Range<T>> {
+    private Coder<T> boundCoder;
+
+    RangeCoder(Coder<T> boundCoder) {
+      this.boundCoder = NullableCoder.of(boundCoder);
+    }
+
+    @Override
+    public void encode(Range<T> value, OutputStream outStream) throws 
CoderException, IOException {
+      Preconditions.checkState(
+          value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " 
+ value);
+      Preconditions.checkState(
+          value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + 
value);
+      boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, 
outStream);
+      boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, 
outStream);
+    }
+
+    @Override
+    public Range<T> decode(InputStream inStream) throws CoderException, 
IOException {
+      @Nullable T lower = boundCoder.decode(inStream);
+      @Nullable T upper = boundCoder.decode(inStream);
+      if (lower == null) {
+        return upper != null ? Range.lessThan(upper) : Range.all();
+      } else if (upper == null) {
+        return Range.atLeast(lower);
+      } else {
+        return Range.closedOpen(lower, upper);
+      }
+    }
+  }
+
+  private static class RangeSetCoder<T extends Comparable> extends 
CustomCoder<RangeSet<T>> {
+    private SetCoder<Range<T>> rangesCoder;
+
+    RangeSetCoder(Coder<T> boundCoder) {
+      this.rangesCoder = SetCoder.of(new RangeCoder<>(boundCoder));
+    }
+
+    @Override
+    public void encode(RangeSet<T> value, OutputStream outStream) throws 
IOException {
+      rangesCoder.encode(value.asRanges(), outStream);
+    }
+
+    @Override
+    public RangeSet<T> decode(InputStream inStream) throws CoderException, 
IOException {
+      return TreeRangeSet.create(rangesCoder.decode(inStream));
+    }
+  }
+
+  /**
+   * Tracker for the ids used in an ordered list.
+   *
+   * <p>Windmill accepts an int64 id for each timestamped-element in the list. 
Unique elements are
+   * identified by the pair of timestamp and id. This means that tow unique 
elements e1, e2 must
+   * have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket 
time into five-minute
+   * buckets, and store a free list of ids available for each bucket.
+   *
+   * <p>When a timestamp range is deleted, we remove id tracking for elements 
in that range. In
+   * order to handle the case where a range is deleted piecemeal, we track 
sub-range deletions for
+   * each range. For example:
+   *
+   * <p>12:00 - 12:05 ids 12:05 - 12:10 ids
+   *
+   * <p>delete 12:00-12:06
+   *
+   * <p>12:00 - 12:05 *removed* 12:05 - 12:10 ids subranges deleted 12:05-12:06
+   *
+   * <p>delete 12:06 - 12:07
+   *
+   * <p>12:05 - 12:10 ids subranges deleted 12:05-12:07
+   *
+   * <p>delete 12:07 - 12:10
+   *
+   * <p>12:05 - 12:10 *removed*
+   */
+  static final class IdTracker {
+    static final String IDS_AVAILABLE_STR = "IdsAvailable";
+    static final String DELETIONS_STR = "Deletions";
+
+    static final long MIN_ID = Long.MIN_VALUE;
+    static final long MAX_ID = Long.MAX_VALUE;
+
+    // We track ids on five-minute boundaries.
+    private static final Duration RESOLUTION = Duration.standardMinutes(5);
+    static final MapCoder<Range<Instant>, RangeSet<Long>> IDS_AVAILABLE_CODER =
+        MapCoder.of(new RangeCoder<>(InstantCoder.of()), new 
RangeSetCoder<>(VarLongCoder.of()));
+    static final MapCoder<Range<Instant>, RangeSet<Instant>> 
SUBRANGE_DELETIONS_CODER =
+        MapCoder.of(new RangeCoder<>(InstantCoder.of()), new 
RangeSetCoder<>(InstantCoder.of()));
+    private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Long>>>> 
idsAvailableTag;
+    // A map from five-minute ranges to the set of ids available in that 
interval.
+    final ValueState<Map<Range<Instant>, RangeSet<Long>>> idsAvailableValue;
+    private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Instant>>>> 
subRangeDeletionsTag;
+    // If a timestamp-range in the map has been partially cleared, the cleared 
intervals are stored
+    // here.
+    final ValueState<Map<Range<Instant>, RangeSet<Instant>>> 
subRangeDeletionsValue;
+
+    IdTracker(
+        StateTable stateTable,
+        StateNamespace namespace,
+        StateTag<?> spec,
+        String stateFamily,
+        boolean complete) {
+      this.idsAvailableTag =
+          StateTags.makeSystemTagInternal(
+              StateTags.value(spec.getId() + IDS_AVAILABLE_STR, 
IDS_AVAILABLE_CODER));
+      this.idsAvailableValue =
+          stateTable.get(namespace, idsAvailableTag, 
StateContexts.nullContext());
+      this.subRangeDeletionsTag =
+          StateTags.makeSystemTagInternal(
+              StateTags.value(spec.getId() + DELETIONS_STR, 
SUBRANGE_DELETIONS_CODER));
+      this.subRangeDeletionsValue =
+          stateTable.get(namespace, subRangeDeletionsTag, 
StateContexts.nullContext());
+    }
+
+    static <ValueT extends Comparable<? super ValueT>>
+        Map<Range<Instant>, RangeSet<ValueT>> newSortedRangeMap(Class<ValueT> 
valueClass) {
+      return Maps.newTreeMap(
+          Comparator.<Range<Instant>, Instant>comparing(Range::lowerEndpoint)
+              .thenComparing(Range::upperEndpoint));
+    }
+
+    private Range<Instant> getTrackedRange(Instant ts) {
+      Instant snapped =
+          new Instant(ts.getMillis() - ts.plus(RESOLUTION).getMillis() % 
RESOLUTION.getMillis());
+      return Range.closedOpen(snapped, snapped.plus(RESOLUTION));
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    void readLater() {
+      idsAvailableValue.readLater();
+      subRangeDeletionsValue.readLater();
+    }
+
+    Map<Range<Instant>, RangeSet<Long>> readIdsAvailable() {
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = 
idsAvailableValue.read();
+      return idsAvailable != null ? idsAvailable : 
newSortedRangeMap(Long.class);
+    }
+
+    Map<Range<Instant>, RangeSet<Instant>> readSubRangeDeletions() {
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
subRangeDeletionsValue.read();
+      return subRangeDeletions != null ? subRangeDeletions : 
newSortedRangeMap(Instant.class);
+    }
+
+    void clear() throws ExecutionException, InterruptedException {
+      idsAvailableValue.clear();
+      subRangeDeletionsValue.clear();
+    }
+
+    <T> void add(
+        SortedSet<TimestampedValueWithId<T>> elements, 
BiConsumer<TimestampedValue<T>, Long> output)
+        throws ExecutionException, InterruptedException {
+      Range<Long> currentIdRange = null;
+      long currentId = 0;
+
+      Range<Instant> currentTsRange = null;
+      RangeSet<Instant> currentTsRangeDeletions = null;
+
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
readSubRangeDeletions();
+
+      RangeSet<Long> availableIdsForTsRange = null;
+      Iterator<Range<Long>> idRangeIter = null;
+      RangeSet<Long> idsUsed = TreeRangeSet.create();
+      for (TimestampedValueWithId<T> pendingAdd : elements) {
+        // Since elements are in increasing ts order, often we'll be able to 
reuse the previous
+        // iteration's range.
+        if (currentTsRange == null
+            || !currentTsRange.contains(pendingAdd.getValue().getTimestamp())) 
{
+          if (availableIdsForTsRange != null) {
+            // We're moving onto a new ts range. Remove all used ids
+            availableIdsForTsRange.removeAll(idsUsed);
+            idsUsed = TreeRangeSet.create();
+          }
+
+          // Lookup the range for the current timestamp.
+          currentTsRange = 
getTrackedRange(pendingAdd.getValue().getTimestamp());
+          // Lookup available ids for this timestamp range. If nothing there, 
we default to all ids
+          // available.
+          availableIdsForTsRange =
+              idsAvailable.computeIfAbsent(
+                  currentTsRange,
+                  r -> 
TreeRangeSet.create(ImmutableList.of(Range.closedOpen(MIN_ID, MAX_ID))));
+          idRangeIter = availableIdsForTsRange.asRanges().iterator();
+          currentIdRange = null;
+          currentTsRangeDeletions = subRangeDeletions.get(currentTsRange);
+        }
 
+        if (currentIdRange == null || currentId >= 
currentIdRange.upperEndpoint()) {
+          // Move to the next range of free ids, and start assigning ranges 
from there.
+          currentIdRange = idRangeIter.next();
+          currentId = currentIdRange.lowerEndpoint();
+        }
+
+        if (currentTsRangeDeletions != null) {
+          currentTsRangeDeletions.remove(
+              Range.closedOpen(
+                  pendingAdd.getValue().getTimestamp(),
+                  pendingAdd.getValue().getTimestamp().plus(1)));
+        }
+        idsUsed.add(Range.closedOpen(currentId, currentId + 1));
+        output.accept(pendingAdd.getValue(), currentId++);
+      }
+      if (availableIdsForTsRange != null) {
+        availableIdsForTsRange.removeAll(idsUsed);
+      }
+      idsAvailableValue.write(idsAvailable);
+      subRangeDeletionsValue.write(subRangeDeletions);
+    }
+
+    // Remove a timestamp range. Returns ids freed up.
+    void remove(Range<Instant> tsRange) throws ExecutionException, 
InterruptedException {
+      Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+      Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = 
readSubRangeDeletions();
+
+      for (Range<Instant> current = getTrackedRange(tsRange.lowerEndpoint());
+          current.lowerEndpoint().isBefore(tsRange.upperEndpoint());
+          current = getTrackedRange(current.lowerEndpoint().plus(RESOLUTION))) 
{
+        // TODO: shouldn't need to iterate over all ranges.
+        boolean rangeCleared;
+        if (!tsRange.encloses(current)) {
+          // This can happen if the beginning or the end of tsRange doesn't 
fall on a RESOLUTION
+          // boundary. Since we
+          // are deleting a portion of a tracked range, track what we are 
deleting.
+          RangeSet<Instant> rangeDeletions =
+              subRangeDeletions.computeIfAbsent(current, r -> 
TreeRangeSet.create());
+          rangeDeletions.add(tsRange.intersection(current));
+          // If we ended up deleting the whole range, than we can simply 
remove it from the tracking
+          // map.
+          rangeCleared = rangeDeletions.encloses(current);
+        } else {
+          rangeCleared = true;
+        }
+        if (rangeCleared) {
+          // Remove the range from both maps.
+          idsAvailable.remove(current);
+          subRangeDeletions.remove(current);
+        }
+      }
+      idsAvailableValue.write(idsAvailable);
+      subRangeDeletionsValue.write(subRangeDeletions);
+    }
+  }
+
+  @AutoValue
+  abstract static class TimestampedValueWithId<T> {
+    private static final Comparator<TimestampedValueWithId<?>> COMPARATOR =
+        Comparator.<TimestampedValueWithId<?>, Instant>comparing(v -> 
v.getValue().getTimestamp())
+            .thenComparingLong(TimestampedValueWithId::getId);
+
+    abstract TimestampedValue<T> getValue();
+
+    abstract long getId();
+
+    static <T> TimestampedValueWithId<T> of(TimestampedValue<T> value, long 
id) {
+      return new 
AutoValue_WindmillStateInternals_TimestampedValueWithId<>(value, id);
+    }
+
+    static <T> TimestampedValueWithId<T> bound(Instant ts) {
+      return of(TimestampedValue.of(null, ts), Long.MIN_VALUE);
+    }
+  }
+
+  static class WindmillOrderedList<T> extends SimpleWindmillState implements 
OrderedListState<T> {
     private final StateNamespace namespace;
     private final StateTag<OrderedListState<T>> spec;
     private final ByteString stateKey;
     private final String stateFamily;
     private final Coder<T> elemCoder;
+    private boolean complete;
+    private boolean cleared = false;
+    // We need to sort based on timestamp, but we need objects with the same 
timestamp to be treated
+    // as unique. We can't use a MultiSet as we can't construct a comparator 
that uniquely
+    // identifies objects,
+    // so we construct a unique in-memory long ids for each element.
+    private SortedSet<TimestampedValueWithId<T>> pendingAdds =
+        Sets.newTreeSet(TimestampedValueWithId.COMPARATOR);
+
+    private RangeSet<Instant> pendingDeletes = TreeRangeSet.create();
+    private IdTracker idTracker;
+
+    // The default proto values for SortedListRange correspond to the minimum 
and maximum
+    // timestamps.
+    static final long MIN_TS_MICROS = 
SortedListRange.getDefaultInstance().getStart();
+    static final long MAX_TS_MICROS = 
SortedListRange.getDefaultInstance().getLimit();
 
     private WindmillOrderedList(
+        StateTable derivedStateTable,
         StateNamespace namespace,
         StateTag<OrderedListState<T>> spec,
         String stateFamily,
         Coder<T> elemCoder,
         boolean isNewKey) {
       this.namespace = namespace;
       this.spec = spec;
+
       this.stateKey = encodeKey(namespace, spec);
       this.stateFamily = stateFamily;
       this.elemCoder = elemCoder;
+      this.complete = isNewKey;
+      this.idTracker = new IdTracker(derivedStateTable, namespace, spec, 
stateFamily, complete);
     }
 
     @Override
     public Iterable<TimestampedValue<T>> read() {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+      return readRange(null, null);
+    }
+
+    private SortedSet<TimestampedValueWithId<T>> getPendingAddRange(
+        @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+      SortedSet<TimestampedValueWithId<T>> pendingInRange = pendingAdds;
+      if (minTimestamp != null && limitTimestamp != null) {
+        pendingInRange =
+            pendingInRange.subSet(
+                TimestampedValueWithId.bound(minTimestamp),
+                TimestampedValueWithId.bound(limitTimestamp));
+      } else if (minTimestamp == null && limitTimestamp != null) {
+        pendingInRange = 
pendingInRange.headSet(TimestampedValueWithId.bound(limitTimestamp));
+      } else if (limitTimestamp == null && minTimestamp != null) {
+        pendingInRange = 
pendingInRange.tailSet(TimestampedValueWithId.bound(minTimestamp));
+      }
+      return pendingInRange;
     }
 
     @Override
-    public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, 
Instant limitTimestamp) {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+    public Iterable<TimestampedValue<T>> readRange(
+        @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+      idTracker.readLater();
+
+      final Future<Iterable<TimestampedValue<T>>> future = 
getFuture(minTimestamp, limitTimestamp);
+      try (Closeable scope = scopedReadState()) {
+        SortedSet<TimestampedValueWithId<T>> pendingInRange =
+            getPendingAddRange(minTimestamp, limitTimestamp);
+
+        // Transform the return iterator so it has the same type as 
pendingAdds. We need to ensure
+        // that the ids don't overlap with any in pendingAdds, so begin with 
pendingAdfds.size().
+        Iterable<TimestampedValueWithId<T>> data =
+            new Iterable<TimestampedValueWithId<T>>() {
+              private Iterator<TimestampedValue<T>> iter = 
future.get().iterator();
+
+              @Override
+              public Iterator<TimestampedValueWithId<T>> iterator() {
+                return new Iterator<TimestampedValueWithId<T>>() {
+                  private long currentId = pendingAdds.size();
+
+                  @Override
+                  public boolean hasNext() {
+                    return iter.hasNext();
+                  }
+
+                  @Override
+                  public TimestampedValueWithId<T> next() {
+                    return TimestampedValueWithId.of(iter.next(), currentId++);
+                  }
+                };
+              }
+            };
+
+        Iterable<TimestampedValueWithId<T>> includingAdds =
+            Iterables.mergeSorted(
+                ImmutableList.of(data, pendingInRange), 
TimestampedValueWithId.COMPARATOR);
+
+        Iterable<TimestampedValue<T>> fullIterable =
+            Iterables.filter(
+                Iterables.transform(includingAdds, 
TimestampedValueWithId::getValue),
+                tv -> !pendingDeletes.contains(tv.getTimestamp()));
+        // TODO: If we have a known bounded amount of data, cache known ranges.
+        return fullIterable;
+      } catch (InterruptedException | ExecutionException | IOException e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        throw new RuntimeException("Unable to read state", e);
+      }
     }
 
     @Override
     public void clear() {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+      cleared = true;
+      complete = true;
+      pendingAdds.clear();
+      pendingDeletes.clear();
+      try {
+        idTracker.clear();
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
     public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+      getPendingAddRange(minTimestamp, limitTimestamp).clear();
+      pendingDeletes.add(Range.closedOpen(minTimestamp, limitTimestamp));
     }
 
     @Override
     public void add(TimestampedValue<T> value) {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+      // We use the current size of the container as the in-memory id. This 
works because
+      // pendingAdds is completely
+      // cleared when it is processed (otherwise we could end up with 
duplicate elements in the same
+      // container). These
+      // are not the ids that will be sent to windmill.
+      pendingAdds.add(TimestampedValueWithId.of(value, pendingAdds.size()));
+      // Leave pendingDeletes alone. Since we can have multiple values with 
the same timestamp, we
+      // may still need
+      // overlapping deletes to remove previous entries at this timestamp.
     }
 
     @Override
     public ReadableState<Boolean> isEmpty() {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          WindmillOrderedList.this.readLater();
+          return this;
+        }
+
+        @Override
+        public Boolean read() {
+          return Iterables.isEmpty(WindmillOrderedList.this.read());
+        }
+      };
     }
 
     @Override
     public OrderedListState<T> readLater() {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+      return readRangeLater(null, null);
     }
 
     @Override
-    public OrderedListState<T> readRangeLater(Instant minTimestamp, Instant 
limitTimestamp) {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported", 
OrderedListState.class.getSimpleName()));
+    @SuppressWarnings("FutureReturnValueIgnored")
+    public OrderedListState<T> readRangeLater(
+        @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+      idTracker.readLater();
+      getFuture(minTimestamp, limitTimestamp);
+      return this;
     }
 
     @Override
     public WorkItemCommitRequest persistDirectly(ForKey cache) throws 
IOException {
       WorkItemCommitRequest.Builder commitBuilder = 
WorkItemCommitRequest.newBuilder();
+      TagSortedListUpdateRequest.Builder updatesBuilder =
+          
commitBuilder.addSortedListUpdatesBuilder().setStateFamily(stateFamily).setTag(stateKey);
+      try {
+        if (cleared) {
+          // Default range.
+          updatesBuilder.addDeletesBuilder().build();
+          cleared = false;
+        }
+
+        if (!pendingAdds.isEmpty()) {
+          // TODO: Once we start caching data, we should remove this line. We 
have it here now
+          // because once we persist
+          // added data we forget about it from the cache, so the object is no 
longer complete.
+          complete = false;
+
+          TagSortedListInsertRequest.Builder insertBuilder = 
updatesBuilder.addInsertsBuilder();
+          idTracker.add(
+              pendingAdds,
+              (elem, id) -> {
+                try {
+                  ByteString.Output elementStream = ByteString.newOutput();
+                  elemCoder.encode(elem.getValue(), elementStream, 
Context.OUTER);
+                  insertBuilder.addEntries(
+                      SortedListEntry.newBuilder()
+                          .setValue(elementStream.toByteString())
+                          .setSortKey(elem.getTimestamp().getMillis() * 1000)

Review comment:
       For all of these "millis * 1000" calls, use 
WindmillTimeUtils.harnessToWindmillTimestamp()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to