[ 
https://issues.apache.org/jira/browse/BEAM-3247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269722#comment-16269722
 ] 

ASF GitHub Bot commented on BEAM-3247:
--------------------------------------

asfgit closed pull request #4175: [BEAM-3247] fix Sample.any performance
URL: https://github.com/apache/beam/pull/4175
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index f3bd07a27ac..bda30816f6b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -27,12 +27,10 @@
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * {@code PTransform}s for taking samples of the elements in a
@@ -57,10 +55,6 @@
    * <p>If limit is greater than or equal to the size of the input
    * {@code PCollection}, then all the input's elements will be selected.
    *
-   * <p>All of the elements of the output {@code PCollection} should fit into
-   * main memory of a single worker machine.  This operation does not
-   * run in parallel.
-   *
    * <p>Example of use:
    * <pre> {@code
    * PCollection<String> input = ...;
@@ -149,11 +143,9 @@ private Any(long limit) {
 
     @Override
     public PCollection<T> expand(PCollection<T> in) {
-      PCollectionView<Iterable<T>> iterableView = 
in.apply(View.<T>asIterable());
-      return in.getPipeline()
-          .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-          .apply(ParDo.of(new SampleAnyDoFn<>(limit, 
iterableView)).withSideInputs(iterableView))
-          .setCoder(in.getCoder());
+      return in
+          .apply(Combine.globally(new SampleAnyCombineFn<T>(limit)))
+          .apply(Flatten.<T>iterables());
     }
 
     @Override
@@ -209,25 +201,45 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
   }
 
   /**
-   * A {@link DoFn} that returns up to limit elements from the side input 
PCollection.
+   * A {@link CombineFn} that combines into a {@link List} of up to limit 
elements.
    */
-  private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
-    long limit;
-    final PCollectionView<Iterable<T>> iterableView;
+  private static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>, 
Iterable<T>> {
+    private final long limit;
 
-    public SampleAnyDoFn(long limit, PCollectionView<Iterable<T>> 
iterableView) {
+    private SampleAnyCombineFn(long limit) {
       this.limit = limit;
-      this.iterableView = iterableView;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      for (T i : c.sideInput(iterableView)) {
-        if (limit-- <= 0) {
-          break;
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      if (accumulator.size() < limit) {
+        accumulator.add(input);
+      }
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> merged = new ArrayList<>();
+      for (List<T> accumulator : accumulators) {
+        for (T t : accumulator) {
+          merged.add(t);
+          if (merged.size() >= limit) {
+            return merged;
+          }
         }
-        c.output(i);
       }
+      return merged;
+    }
+
+    @Override
+    public Iterable<T> extractOutput(List<T> accumulator) {
+      return accumulator;
     }
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 80f361f1c8a..0677a7919b7 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -40,7 +40,12 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -234,6 +239,103 @@ public Void apply(Iterable<T> in) {
       }
     }
 
+    private class TimestampAndWindow
+        extends PTransform<PCollection<Integer>, PCollection<Integer>> {
+      private final int windowSize;
+
+      private TimestampAndWindow(int windowSize) {
+        this.windowSize = windowSize;
+      }
+
+      @Override
+      public PCollection<Integer> expand(PCollection<Integer> input) {
+        return input
+            .apply(ParDo.of(new DoFn<Integer, Integer>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                Integer elem = c.element();
+                c.outputWithTimestamp(elem, new Instant(elem * 1000));
+              }
+            }))
+            
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(windowSize))));
+      }
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAny() {
+      PCollection<Integer> input =
+          pipeline.apply(
+              Create.of(ImmutableList.of(1, 2, 3, 4, 5, 
6)).withCoder(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(new TimestampAndWindow(3))
+          .apply(Sample.<Integer>any(2));
+
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(1000), 
Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(2, 1, 2, 3));
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(4000), 
Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(2, 4, 5, 6));
+      pipeline.run();
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAnyEmpty() {
+      PCollection<Integer> input = 
pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(new TimestampAndWindow(3))
+          .apply(Sample.<Integer>any(2));
+
+      PAssert.that(output).satisfies(new VerifyCorrectSample<>(0, EMPTY));
+      pipeline.run();
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAnyZero() {
+      PCollection<Integer> input = 
pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(new TimestampAndWindow(3))
+          .apply(Sample.<Integer>any(0));
+
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(1000), 
Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(0, DATA));
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(4000), 
Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(0, DATA));
+      pipeline.run();
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAnyInsufficientElements() {
+      PCollection<Integer> input = 
pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(new TimestampAndWindow(3))
+          .apply(Sample.<Integer>any(10));
+
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(1000), 
Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(3, DATA));
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(4000), 
Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(3, DATA));
+      pipeline.run();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSampleAnyNegative() {
+      pipeline.enableAbandonedNodeEnforcement(false);
+
+      PCollection<Integer> input = 
pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(new TimestampAndWindow(3))
+          .apply(Sample.<Integer>any(-10));
+    }
+
     @Test
     @Category(ValidatesRunner.class)
     public void testSample() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Sample.any memory constraint
> ----------------------------
>
>                 Key: BEAM-3247
>                 URL: https://issues.apache.org/jira/browse/BEAM-3247
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>    Affects Versions: 2.1.0
>            Reporter: Neville Li
>            Assignee: Neville Li
>            Priority: Minor
>
> Right now {{Sample.any}} converts the collection to an iterable view and take 
> first n in a side input. This may require materializing the entire collection 
> to disk and is potentially inefficient.
> https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74
> It can be fixed by applying a truncating `DoFn` first, then a combine into 
> `List<T>` which limits the list size, and finally flattening the list.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to