lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440428475



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -278,6 +528,398 @@ public IterableViewFn(TypeDescriptorSupplier<T> 
typeDescriptorSupplier) {
    * <p>Instantiate via {@link PCollectionViews#listView}.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
+  @VisibleForTesting
+  static class ListViewFn2<T> extends ViewFn<MultimapView<Long, MetaOr<T, 
OffsetRange>>, List<T>> {
+    private TypeDescriptorSupplier<T> typeDescriptorSupplier;
+
+    public ListViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
+      this.typeDescriptorSupplier = typeDescriptorSupplier;
+    }
+
+    @Override
+    public Materialization<MultimapView<Long, MetaOr<T, OffsetRange>>> 
getMaterialization() {
+      return Materializations.multimap();
+    }
+
+    @Override
+    public List<T> apply(MultimapView<Long, MetaOr<T, OffsetRange>> 
primitiveViewT) {
+      return Collections.unmodifiableList(new 
ListOverMultimapView<>(primitiveViewT));
+    }
+
+    @Override
+    public TypeDescriptor<List<T>> getTypeDescriptor() {
+      return TypeDescriptors.lists(typeDescriptorSupplier.get());
+    }
+
+    /**
+     * A {@link List} adapter over a {@link MultimapView}.
+     *
+     * <p>See {@link View.AsList} for a description of the materialized format 
and {@code index} to
+     * {@code (position, sub-position)} mapping details.
+     */
+    private static class ListOverMultimapView<T> extends AbstractList<T> 
implements RandomAccess {
+      private final MultimapView<Long, MetaOr<T, OffsetRange>> primitiveView;
+      /**
+       * A mapping from non over-lapping ranges to the number of elements at 
each position within
+       * that range. Ranges not specified in the mapping implicitly have 0 
elements at those
+       * positions.
+       *
+       * <p>Used to quickly compute the {@code index} -> {@code (position, 
sub-position} within the
+       * map.
+       */
+      private final Supplier<SortedMap<OffsetRange, Integer>>
+          nonOverlappingRangesToNumElementsPerPosition;
+
+      private final Supplier<Integer> size;
+
+      private ListOverMultimapView(MultimapView<Long, MetaOr<T, OffsetRange>> 
primitiveView) {
+        this.primitiveView = primitiveView;
+        this.nonOverlappingRangesToNumElementsPerPosition =
+            Suppliers.memoize(
+                () ->
+                    computeOverlappingRanges(
+                        Iterables.transform(
+                            primitiveView.get(Long.MIN_VALUE), (value) -> 
value.getMetadata())));
+        this.size =
+            Suppliers.memoize(
+                () -> 
computeTotalNumElements(nonOverlappingRangesToNumElementsPerPosition.get()));
+      }
+
+      @Override
+      public T get(int index) {
+        if (index < 0 || index >= size.get()) {
+          throw new IndexOutOfBoundsException();
+        }
+        KV<Long, Integer> position =
+            
computePositionForIndex(nonOverlappingRangesToNumElementsPerPosition.get(), 
index);
+        return Iterables.get(primitiveView.get(position.getKey()), 
position.getValue()).get();
+      }
+
+      @Override
+      public int size() {
+        return size.get();
+      }
+
+      @Override
+      public Iterator<T> iterator() {
+        return listIterator();
+      }
+
+      @Override
+      public ListIterator<T> listIterator() {
+        return super.listIterator();
+      }
+
+      /** A {@link ListIterator} over {@link MultimapView} adapter. */
+      private class ListIteratorOverMultimapView implements ListIterator<T> {
+        private int position;
+
+        @Override
+        public boolean hasNext() {
+          return position < size();
+        }
+
+        @Override
+        public T next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          T rval = get(position);
+          position += 1;
+          return rval;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+          return position > 0;
+        }
+
+        @Override
+        public T previous() {
+          if (!hasPrevious()) {
+            throw new NoSuchElementException();
+          }
+          position -= 1;
+          return get(position);
+        }
+
+        @Override
+        public int nextIndex() {
+          return position;
+        }
+
+        @Override
+        public int previousIndex() {
+          return position - 1;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void set(T e) {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void add(T e) {
+          throw new UnsupportedOperationException();
+        }
+      }
+    }
+  }
+
+  /**
+   * Compares {@link OffsetRange}s such that ranges are ordered by the 
smallest {@code from} and in
+   * case of a tie the smallest {@code to}.
+   */
+  @VisibleForTesting
+  static class OffsetRangeComparator implements Comparator<OffsetRange> {
+    private static final OffsetRangeComparator INSTANCE = new 
OffsetRangeComparator();
+
+    @Override
+    public int compare(OffsetRange o1, OffsetRange o2) {
+      int fromComparison = Longs.compare(o1.getFrom(), o2.getFrom());
+      if (fromComparison != 0) {
+        return fromComparison;
+      }
+      return Longs.compare(o1.getTo(), o2.getTo());
+    }
+  }
+
+  @VisibleForTesting
+  static SortedMap<OffsetRange, Integer> 
computeOverlappingRanges(Iterable<OffsetRange> ranges) {
+    ImmutableSortedMap.Builder<OffsetRange, Integer> rval =
+        ImmutableSortedMap.orderedBy(OffsetRangeComparator.INSTANCE);
+    List<OffsetRange> sortedRanges = Lists.newArrayList(ranges);
+    if (sortedRanges.isEmpty()) {
+      return rval.build();
+    }
+    Collections.sort(sortedRanges, OffsetRangeComparator.INSTANCE);
+
+    // Stores ranges in smallest 'from' and then smallest 'to' order
+    // e.g. [2, 7), [3, 4), [3, 5), [3, 5), [3, 6), [4, 0)
+    PriorityQueue<OffsetRange> rangesWithSameFrom =
+        new PriorityQueue<>(OffsetRangeComparator.INSTANCE);
+    Iterator<OffsetRange> iterator = sortedRanges.iterator();
+
+    // Stored in reverse sorted order so that when we iterate and re-add them 
back to
+    // overlappingRanges they are stored in sorted order from smallest to 
largest range.to
+    List<OffsetRange> rangesToProcess = new ArrayList<>();
+    while (iterator.hasNext()) {
+      OffsetRange current = iterator.next();
+      // Skip empty ranges
+      if (current.getFrom() == current.getTo()) {
+        continue;
+      }
+
+      // If the current range has a different 'from' then a prior range then 
we must produce
+      // ranges in [rangesWithSameFrom.from, current.from)
+      while (!rangesWithSameFrom.isEmpty()
+          && rangesWithSameFrom.peek().getFrom() != current.getFrom()) {
+        rangesToProcess.addAll(rangesWithSameFrom);
+        Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
+        rangesWithSameFrom.clear();
+
+        int i = 0;
+        long lastTo = rangesToProcess.get(i).getFrom();
+        // Output all the ranges that are strictly less then current.from
+        // e.g. current.to := 7 for [3, 4), [3, 5), [3, 5), [3, 6) will produce
+        // [3, 4) := 4
+        // [4, 5) := 3
+        // [5, 6) := 1
+        for (; i < rangesToProcess.size(); ++i) {
+          if (rangesToProcess.get(i).getTo() > current.getFrom()) {
+            break;
+          }
+          // Output only the first of any subsequent duplicate ranges
+          if (i == 0 || rangesToProcess.get(i - 1).getTo() != 
rangesToProcess.get(i).getTo()) {
+            rval.put(
+                new OffsetRange(lastTo, rangesToProcess.get(i).getTo()),
+                rangesToProcess.size() - i);
+            lastTo = rangesToProcess.get(i).getTo();
+          }
+        }
+
+        // We exitted the loop with 'to' > current.from, we must add the range 
[lastTo,
+        // current.from) if it is non-empty
+        if (lastTo < current.getFrom() && i != rangesToProcess.size()) {
+          rval.put(new OffsetRange(lastTo, current.getFrom()), 
rangesToProcess.size() - i);
+        }
+
+        // The remaining ranges have a 'to' that is greater then 
'current.from' and will overlap
+        // with current so add them back to rangesWithSameFrom with the 
updated 'from'
+        for (; i < rangesToProcess.size(); ++i) {
+          rangesWithSameFrom.add(
+              new OffsetRange(current.getFrom(), 
rangesToProcess.get(i).getTo()));
+        }
+
+        rangesToProcess.clear();
+      }
+      rangesWithSameFrom.add(current);
+    }
+
+    // Process the last chunk of overlapping ranges
+    while (!rangesWithSameFrom.isEmpty()) {
+      // This range always represents the range with with the smallest 'to'
+      OffsetRange current = rangesWithSameFrom.remove();
+
+      rangesToProcess.addAll(rangesWithSameFrom);
+      Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
+      rangesWithSameFrom.clear();
+
+      rval.put(current, rangesToProcess.size() + 1 /* include current */);
+
+      // Shorten all the remaining ranges such that they start with current.to
+      for (OffsetRange rangeWithDifferentFrom : rangesToProcess) {
+        // Skip any duplicates of current
+        if (rangeWithDifferentFrom.getTo() > current.getTo()) {
+          rangesWithSameFrom.add(new OffsetRange(current.getTo(), 
rangeWithDifferentFrom.getTo()));
+        }
+      }
+      rangesToProcess.clear();
+    }
+    return rval.build();
+  }
+
+  @VisibleForTesting
+  static int computeTotalNumElements(
+      Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition) {
+    long sum = 0;
+    for (Map.Entry<OffsetRange, Integer> range :
+        nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
+      sum +=
+          Math.multiplyExact(
+              Math.subtractExact(range.getKey().getTo(), 
range.getKey().getFrom()),
+              range.getValue());
+    }
+    return Ints.checkedCast(sum);
+  }
+
+  @VisibleForTesting
+  static KV<Long, Integer> computePositionForIndex(
+      Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition, 
int index) {
+    if (index < 0) {
+      throw new IndexOutOfBoundsException(
+          String.format(
+              "Position %s was out of bounds for ranges %s.",
+              index, nonOverlappingRangesToNumElementsPerPosition));
+    }
+    for (Map.Entry<OffsetRange, Integer> range :
+        nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
+      int numElementsInRange =
+          Ints.checkedCast(
+              Math.multiplyExact(
+                  Math.subtractExact(range.getKey().getTo(), 
range.getKey().getFrom()),
+                  range.getValue()));
+      if (numElementsInRange <= index) {
+        index -= numElementsInRange;
+        continue;
+      }
+      long position = range.getKey().getFrom() + index / range.getValue();
+      int subPosition = index % range.getValue();
+      return KV.of(position, subPosition);
+    }
+    throw new IndexOutOfBoundsException(
+        String.format(
+            "Position %s was out of bounds for ranges %s.",
+            index, nonOverlappingRangesToNumElementsPerPosition));
+  }
+
+  /** Stores values or metadata about values. */
+  public static class MetaOr<T, MetaT> {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to