reuvenlax commented on a change in pull request #12864: URL: https://github.com/apache/beam/pull/12864#discussion_r493143979
########## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java ########## @@ -470,81 +541,514 @@ protected WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKey cache) } } - private static class WindmillOrderedList<T> extends SimpleWindmillState - implements OrderedListState<T> { + // Coder for closed-open ranges. + private static class RangeCoder<T extends Comparable> extends CustomCoder<Range<T>> { + private Coder<T> boundCoder; + + RangeCoder(Coder<T> boundCoder) { + this.boundCoder = NullableCoder.of(boundCoder); + } + + @Override + public void encode(Range<T> value, OutputStream outStream) throws CoderException, IOException { + Preconditions.checkState( + value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " + value); + Preconditions.checkState( + value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + value); + boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, outStream); + boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, outStream); + } + + @Override + public Range<T> decode(InputStream inStream) throws CoderException, IOException { + @Nullable T lower = boundCoder.decode(inStream); + @Nullable T upper = boundCoder.decode(inStream); + if (lower == null) { + return upper != null ? Range.lessThan(upper) : Range.all(); + } else if (upper == null) { + return Range.atLeast(lower); + } else { + return Range.closedOpen(lower, upper); + } + } + } + + private static class RangeSetCoder<T extends Comparable> extends CustomCoder<RangeSet<T>> { + private SetCoder<Range<T>> rangesCoder; + + RangeSetCoder(Coder<T> boundCoder) { + this.rangesCoder = SetCoder.of(new RangeCoder<>(boundCoder)); + } + + @Override + public void encode(RangeSet<T> value, OutputStream outStream) throws IOException { + rangesCoder.encode(value.asRanges(), outStream); + } + + @Override + public RangeSet<T> decode(InputStream inStream) throws CoderException, IOException { + return TreeRangeSet.create(rangesCoder.decode(inStream)); + } + } + + /** + * Tracker for the ids used in an ordered list. + * + * <p>Windmill accepts an int64 id for each timestamped-element in the list. Unique elements are + * identified by the pair of timestamp and id. This means that tow unique elements e1, e2 must + * have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket time into five-minute Review comment: I'm not sure I understand. These ids are not visible to the user, rather they are an internal implementation detail. ########## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java ########## @@ -470,81 +541,514 @@ protected WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKey cache) } } - private static class WindmillOrderedList<T> extends SimpleWindmillState - implements OrderedListState<T> { + // Coder for closed-open ranges. + private static class RangeCoder<T extends Comparable> extends CustomCoder<Range<T>> { + private Coder<T> boundCoder; + + RangeCoder(Coder<T> boundCoder) { + this.boundCoder = NullableCoder.of(boundCoder); + } + + @Override + public void encode(Range<T> value, OutputStream outStream) throws CoderException, IOException { + Preconditions.checkState( + value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " + value); + Preconditions.checkState( + value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + value); + boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, outStream); + boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, outStream); + } + + @Override + public Range<T> decode(InputStream inStream) throws CoderException, IOException { + @Nullable T lower = boundCoder.decode(inStream); + @Nullable T upper = boundCoder.decode(inStream); + if (lower == null) { + return upper != null ? Range.lessThan(upper) : Range.all(); + } else if (upper == null) { + return Range.atLeast(lower); + } else { + return Range.closedOpen(lower, upper); + } + } + } + + private static class RangeSetCoder<T extends Comparable> extends CustomCoder<RangeSet<T>> { + private SetCoder<Range<T>> rangesCoder; + + RangeSetCoder(Coder<T> boundCoder) { + this.rangesCoder = SetCoder.of(new RangeCoder<>(boundCoder)); + } + + @Override + public void encode(RangeSet<T> value, OutputStream outStream) throws IOException { + rangesCoder.encode(value.asRanges(), outStream); + } + + @Override + public RangeSet<T> decode(InputStream inStream) throws CoderException, IOException { + return TreeRangeSet.create(rangesCoder.decode(inStream)); + } + } + + /** + * Tracker for the ids used in an ordered list. + * + * <p>Windmill accepts an int64 id for each timestamped-element in the list. Unique elements are + * identified by the pair of timestamp and id. This means that tow unique elements e1, e2 must + * have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket time into five-minute + * buckets, and store a free list of ids available for each bucket. + * + * <p>When a timestamp range is deleted, we remove id tracking for elements in that range. In + * order to handle the case where a range is deleted piecemeal, we track sub-range deletions for + * each range. For example: + * + * <p>12:00 - 12:05 ids 12:05 - 12:10 ids + * + * <p>delete 12:00-12:06 + * + * <p>12:00 - 12:05 *removed* 12:05 - 12:10 ids subranges deleted 12:05-12:06 + * + * <p>delete 12:06 - 12:07 + * + * <p>12:05 - 12:10 ids subranges deleted 12:05-12:07 + * + * <p>delete 12:07 - 12:10 + * + * <p>12:05 - 12:10 *removed* + */ + static final class IdTracker { + static final String IDS_AVAILABLE_STR = "IdsAvailable"; + static final String DELETIONS_STR = "Deletions"; + + static final long MIN_ID = Long.MIN_VALUE; + static final long MAX_ID = Long.MAX_VALUE; + + // We track ids on five-minute boundaries. + private static final Duration RESOLUTION = Duration.standardMinutes(5); + static final MapCoder<Range<Instant>, RangeSet<Long>> IDS_AVAILABLE_CODER = + MapCoder.of(new RangeCoder<>(InstantCoder.of()), new RangeSetCoder<>(VarLongCoder.of())); + static final MapCoder<Range<Instant>, RangeSet<Instant>> SUBRANGE_DELETIONS_CODER = + MapCoder.of(new RangeCoder<>(InstantCoder.of()), new RangeSetCoder<>(InstantCoder.of())); + private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Long>>>> idsAvailableTag; + // A map from five-minute ranges to the set of ids available in that interval. + final ValueState<Map<Range<Instant>, RangeSet<Long>>> idsAvailableValue; + private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Instant>>>> subRangeDeletionsTag; + // If a timestamp-range in the map has been partially cleared, the cleared intervals are stored + // here. + final ValueState<Map<Range<Instant>, RangeSet<Instant>>> subRangeDeletionsValue; + + IdTracker( + StateTable stateTable, + StateNamespace namespace, + StateTag<?> spec, + String stateFamily, + boolean complete) { + this.idsAvailableTag = + StateTags.makeSystemTagInternal( + StateTags.value(spec.getId() + IDS_AVAILABLE_STR, IDS_AVAILABLE_CODER)); + this.idsAvailableValue = + stateTable.get(namespace, idsAvailableTag, StateContexts.nullContext()); + this.subRangeDeletionsTag = + StateTags.makeSystemTagInternal( + StateTags.value(spec.getId() + DELETIONS_STR, SUBRANGE_DELETIONS_CODER)); + this.subRangeDeletionsValue = + stateTable.get(namespace, subRangeDeletionsTag, StateContexts.nullContext()); + } + + static <ValueT extends Comparable<? super ValueT>> + Map<Range<Instant>, RangeSet<ValueT>> newSortedRangeMap(Class<ValueT> valueClass) { + return Maps.newTreeMap( + Comparator.<Range<Instant>, Instant>comparing(Range::lowerEndpoint) + .thenComparing(Range::upperEndpoint)); + } + + private Range<Instant> getTrackedRange(Instant ts) { + Instant snapped = + new Instant(ts.getMillis() - ts.plus(RESOLUTION).getMillis() % RESOLUTION.getMillis()); + return Range.closedOpen(snapped, snapped.plus(RESOLUTION)); + } + + @SuppressWarnings("FutureReturnValueIgnored") + void readLater() { + idsAvailableValue.readLater(); + subRangeDeletionsValue.readLater(); + } + + Map<Range<Instant>, RangeSet<Long>> readIdsAvailable() { + Map<Range<Instant>, RangeSet<Long>> idsAvailable = idsAvailableValue.read(); + return idsAvailable != null ? idsAvailable : newSortedRangeMap(Long.class); + } + + Map<Range<Instant>, RangeSet<Instant>> readSubRangeDeletions() { + Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = subRangeDeletionsValue.read(); + return subRangeDeletions != null ? subRangeDeletions : newSortedRangeMap(Instant.class); + } + + void clear() throws ExecutionException, InterruptedException { + idsAvailableValue.clear(); + subRangeDeletionsValue.clear(); + } + + <T> void add( + SortedSet<TimestampedValueWithId<T>> elements, BiConsumer<TimestampedValue<T>, Long> output) + throws ExecutionException, InterruptedException { + Range<Long> currentIdRange = null; + long currentId = 0; + + Range<Instant> currentTsRange = null; + RangeSet<Instant> currentTsRangeDeletions = null; + + Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable(); + Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = readSubRangeDeletions(); + + RangeSet<Long> availableIdsForTsRange = null; + Iterator<Range<Long>> idRangeIter = null; + RangeSet<Long> idsUsed = TreeRangeSet.create(); + for (TimestampedValueWithId<T> pendingAdd : elements) { + // Since elements are in increasing ts order, often we'll be able to reuse the previous + // iteration's range. + if (currentTsRange == null + || !currentTsRange.contains(pendingAdd.getValue().getTimestamp())) { + if (availableIdsForTsRange != null) { + // We're moving onto a new ts range. Remove all used ids + availableIdsForTsRange.removeAll(idsUsed); + idsUsed = TreeRangeSet.create(); + } + + // Lookup the range for the current timestamp. + currentTsRange = getTrackedRange(pendingAdd.getValue().getTimestamp()); + // Lookup available ids for this timestamp range. If nothing there, we default to all ids + // available. + availableIdsForTsRange = + idsAvailable.computeIfAbsent( + currentTsRange, + r -> TreeRangeSet.create(ImmutableList.of(Range.closedOpen(MIN_ID, MAX_ID)))); + idRangeIter = availableIdsForTsRange.asRanges().iterator(); + currentIdRange = null; + currentTsRangeDeletions = subRangeDeletions.get(currentTsRange); + } + if (currentIdRange == null || currentId >= currentIdRange.upperEndpoint()) { + // Move to the next range of free ids, and start assigning ranges from there. + currentIdRange = idRangeIter.next(); + currentId = currentIdRange.lowerEndpoint(); + } + + if (currentTsRangeDeletions != null) { + currentTsRangeDeletions.remove( + Range.closedOpen( + pendingAdd.getValue().getTimestamp(), + pendingAdd.getValue().getTimestamp().plus(1))); + } + idsUsed.add(Range.closedOpen(currentId, currentId + 1)); + output.accept(pendingAdd.getValue(), currentId++); + } + if (availableIdsForTsRange != null) { + availableIdsForTsRange.removeAll(idsUsed); + } + idsAvailableValue.write(idsAvailable); + subRangeDeletionsValue.write(subRangeDeletions); + } + + // Remove a timestamp range. Returns ids freed up. + void remove(Range<Instant> tsRange) throws ExecutionException, InterruptedException { + Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable(); + Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = readSubRangeDeletions(); + + for (Range<Instant> current = getTrackedRange(tsRange.lowerEndpoint()); + current.lowerEndpoint().isBefore(tsRange.upperEndpoint()); + current = getTrackedRange(current.lowerEndpoint().plus(RESOLUTION))) { + // TODO: shouldn't need to iterate over all ranges. + boolean rangeCleared; + if (!tsRange.encloses(current)) { + // This can happen if the beginning or the end of tsRange doesn't fall on a RESOLUTION + // boundary. Since we + // are deleting a portion of a tracked range, track what we are deleting. + RangeSet<Instant> rangeDeletions = + subRangeDeletions.computeIfAbsent(current, r -> TreeRangeSet.create()); + rangeDeletions.add(tsRange.intersection(current)); + // If we ended up deleting the whole range, than we can simply remove it from the tracking + // map. + rangeCleared = rangeDeletions.encloses(current); + } else { + rangeCleared = true; + } + if (rangeCleared) { + // Remove the range from both maps. + idsAvailable.remove(current); + subRangeDeletions.remove(current); + } + } + idsAvailableValue.write(idsAvailable); + subRangeDeletionsValue.write(subRangeDeletions); + } + } + + @AutoValue + abstract static class TimestampedValueWithId<T> { + private static final Comparator<TimestampedValueWithId<?>> COMPARATOR = + Comparator.<TimestampedValueWithId<?>, Instant>comparing(v -> v.getValue().getTimestamp()) + .thenComparingLong(TimestampedValueWithId::getId); + + abstract TimestampedValue<T> getValue(); + + abstract long getId(); + + static <T> TimestampedValueWithId<T> of(TimestampedValue<T> value, long id) { + return new AutoValue_WindmillStateInternals_TimestampedValueWithId<>(value, id); + } + + static <T> TimestampedValueWithId<T> bound(Instant ts) { + return of(TimestampedValue.of(null, ts), Long.MIN_VALUE); + } + } + + static class WindmillOrderedList<T> extends SimpleWindmillState implements OrderedListState<T> { private final StateNamespace namespace; private final StateTag<OrderedListState<T>> spec; private final ByteString stateKey; private final String stateFamily; private final Coder<T> elemCoder; + private boolean complete; + private boolean cleared = false; + // We need to sort based on timestamp, but we need objects with the same timestamp to be treated + // as unique. We can't use a MultiSet as we can't construct a comparator that uniquely + // identifies objects, + // so we construct a unique in-memory long ids for each element. + private SortedSet<TimestampedValueWithId<T>> pendingAdds = + Sets.newTreeSet(TimestampedValueWithId.COMPARATOR); + + private RangeSet<Instant> pendingDeletes = TreeRangeSet.create(); + private IdTracker idTracker; + + // The default proto values for SortedListRange correspond to the minimum and maximum + // timestamps. + static final long MIN_TS_MICROS = SortedListRange.getDefaultInstance().getStart(); + static final long MAX_TS_MICROS = SortedListRange.getDefaultInstance().getLimit(); private WindmillOrderedList( + StateTable derivedStateTable, StateNamespace namespace, StateTag<OrderedListState<T>> spec, String stateFamily, Coder<T> elemCoder, boolean isNewKey) { this.namespace = namespace; this.spec = spec; + this.stateKey = encodeKey(namespace, spec); this.stateFamily = stateFamily; this.elemCoder = elemCoder; + this.complete = isNewKey; + this.idTracker = new IdTracker(derivedStateTable, namespace, spec, stateFamily, complete); } @Override public Iterable<TimestampedValue<T>> read() { - throw new UnsupportedOperationException( - String.format("%s is not supported", OrderedListState.class.getSimpleName())); + return readRange(null, null); + } + + private SortedSet<TimestampedValueWithId<T>> getPendingAddRange( + @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) { + SortedSet<TimestampedValueWithId<T>> pendingInRange = pendingAdds; + if (minTimestamp != null && limitTimestamp != null) { + pendingInRange = + pendingInRange.subSet( + TimestampedValueWithId.bound(minTimestamp), + TimestampedValueWithId.bound(limitTimestamp)); + } else if (minTimestamp == null && limitTimestamp != null) { + pendingInRange = pendingInRange.headSet(TimestampedValueWithId.bound(limitTimestamp)); + } else if (limitTimestamp == null && minTimestamp != null) { + pendingInRange = pendingInRange.tailSet(TimestampedValueWithId.bound(minTimestamp)); + } + return pendingInRange; } @Override - public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant limitTimestamp) { - throw new UnsupportedOperationException( - String.format("%s is not supported", OrderedListState.class.getSimpleName())); + public Iterable<TimestampedValue<T>> readRange( Review comment: maybe, but right now I think that would be premature. Users only get charged for bytes transferred, so it's unclear that this would save end users much. ########## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java ########## @@ -470,81 +541,514 @@ protected WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKey cache) } } - private static class WindmillOrderedList<T> extends SimpleWindmillState - implements OrderedListState<T> { + // Coder for closed-open ranges. + private static class RangeCoder<T extends Comparable> extends CustomCoder<Range<T>> { + private Coder<T> boundCoder; + + RangeCoder(Coder<T> boundCoder) { + this.boundCoder = NullableCoder.of(boundCoder); + } + + @Override + public void encode(Range<T> value, OutputStream outStream) throws CoderException, IOException { + Preconditions.checkState( + value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " + value); + Preconditions.checkState( + value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + value); + boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, outStream); + boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, outStream); + } + + @Override + public Range<T> decode(InputStream inStream) throws CoderException, IOException { + @Nullable T lower = boundCoder.decode(inStream); + @Nullable T upper = boundCoder.decode(inStream); + if (lower == null) { + return upper != null ? Range.lessThan(upper) : Range.all(); + } else if (upper == null) { + return Range.atLeast(lower); + } else { + return Range.closedOpen(lower, upper); + } + } + } + + private static class RangeSetCoder<T extends Comparable> extends CustomCoder<RangeSet<T>> { + private SetCoder<Range<T>> rangesCoder; + + RangeSetCoder(Coder<T> boundCoder) { + this.rangesCoder = SetCoder.of(new RangeCoder<>(boundCoder)); + } + + @Override + public void encode(RangeSet<T> value, OutputStream outStream) throws IOException { + rangesCoder.encode(value.asRanges(), outStream); + } + + @Override + public RangeSet<T> decode(InputStream inStream) throws CoderException, IOException { + return TreeRangeSet.create(rangesCoder.decode(inStream)); + } + } + + /** + * Tracker for the ids used in an ordered list. + * + * <p>Windmill accepts an int64 id for each timestamped-element in the list. Unique elements are + * identified by the pair of timestamp and id. This means that tow unique elements e1, e2 must + * have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket time into five-minute Review comment: No. A set of available id ranges is stored in an (internal) ValueState, and this class picks a free id from the range. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org