[ 
https://issues.apache.org/jira/browse/BEAM-5064?focusedWorklogId=135201&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135201
 ]

ASF GitHub Bot logged work on BEAM-5064:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Aug/18 00:02
            Start Date: 16/Aug/18 00:02
    Worklog Time Spent: 10m 
      Work Description: amaliujia closed pull request #6154: [BEAM-5064][SQL] 
Support Order By in Global Window
URL: https://github.com/apache/beam/pull/6154
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index cfcb16291ac..7a37ff9e8a0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -31,6 +31,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
@@ -75,6 +76,7 @@
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
+import org.junit.Assert;
 
 /**
  * An assertion on the contents of a {@link PCollection} incorporated into the 
pipeline. Such an
@@ -281,6 +283,14 @@ public int hashCode() {
      */
     IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements);
 
+    /**
+     * Asserts that the iterable in question contains the provided elements in 
the same order.
+     *
+     * @param expectedElements
+     * @return
+     */
+    IterableAssert<T> containsInSameOrder(Iterable<T> expectedElements);
+
     /**
      * Asserts that the iterable in question is empty.
      *
@@ -554,6 +564,11 @@ public PCollectionContentsAssert(
       return satisfies(new AssertContainsInAnyOrderRelation<>(), 
expectedElements);
     }
 
+    @Override
+    public IterableAssert<T> containsInSameOrder(Iterable<T> expectedElements) 
{
+      return satisfies(new AssertContainsInSameOrderRelation<>(), 
expectedElements);
+    }
+
     @Override
     public PCollectionContentsAssert<T> empty() {
       containsInAnyOrder(Collections.emptyList());
@@ -737,6 +752,11 @@ public PCollectionSingletonIterableAssert(
       return satisfies(new AssertContainsInAnyOrderRelation<>(), 
expectedElements);
     }
 
+    @Override
+    public IterableAssert<T> containsInSameOrder(Iterable<T> expectedElements) 
{
+      return satisfies(new AssertContainsInSameOrderRelation<>(), 
expectedElements);
+    }
+
     @Override
     public PCollectionSingletonIterableAssert<T> satisfies(
         SerializableFunction<Iterable<T>, Void> checkerFn) {
@@ -1315,6 +1335,22 @@ public Void apply(Iterable<T> actual) {
     }
   }
 
+  private static class AssertContainsInSameOrder<T>
+      implements SerializableFunction<Iterable<T>, Void> {
+    private List<T> expected;
+
+    public AssertContainsInSameOrder(Iterable<T> expected) {
+      this.expected = Lists.newArrayList(expected);
+    }
+
+    @Override
+    @Nullable
+    public Void apply(Iterable<T> actual) {
+      Assert.assertEquals(expected, Lists.newArrayList(actual));
+      return null;
+    }
+  }
+
   ////////////////////////////////////////////////////////////
 
   /**
@@ -1354,6 +1390,18 @@ public Void apply(Iterable<T> actual) {
     }
   }
 
+  /**
+   * An {@code AssertRelation} implementing the binary predicate that two 
collections are equal
+   * modulo in the same order.
+   */
+  private static class AssertContainsInSameOrderRelation<T>
+      implements AssertRelation<Iterable<T>, Iterable<T>> {
+    @Override
+    public SerializableFunction<Iterable<T>, Void> assertFor(Iterable<T> 
expectedElements) {
+      return new AssertContainsInSameOrder<T>(expectedElements);
+    }
+  }
+
   
////////////////////////////////////////////////////////////////////////////////////////////////
 
   /**
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 466543568e3..0d10aa56f6c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -20,12 +20,20 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.TreeMultiset;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -35,6 +43,9 @@
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
+import 
org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -120,13 +131,13 @@ public BeamSortRel(
       nullsFirst.add(rawNullDirection == 
RelFieldCollation.NullDirection.FIRST);
     }
 
-    if (fetch == null) {
-      throw new UnsupportedOperationException("ORDER BY without a LIMIT is not 
supported!");
+    if (fetch != null) {
+      RexLiteral fetchLiteral = (RexLiteral) fetch;
+      count = ((BigDecimal) fetchLiteral.getValue()).intValue();
+    } else {
+      count = -1;
     }
 
-    RexLiteral fetchLiteral = (RexLiteral) fetch;
-    count = ((BigDecimal) fetchLiteral.getValue()).intValue();
-
     if (offset != null) {
       RexLiteral offsetLiteral = (RexLiteral) offset;
       startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
@@ -161,7 +172,7 @@ public int getCount() {
       //  - GroupByKey (used in Top) is not allowed on unbounded data in 
global window so ORDER BY ... LIMIT
       //    works only on bounded data.
       //  - Just LIMIT operates on unbounded data, but across windows.
-      if (fieldIndices.size() == 0) {
+      if (fieldIndices.size() == 0) { // LIMIT
         // TODO(https://issues.apache.org/jira/projects/BEAM/issues/BEAM-4702)
         // Figure out which operations are per-window and which are not.
         return upstream
@@ -169,7 +180,6 @@ public int getCount() {
             .apply(new LimitTransform<>())
             .setRowSchema(CalciteUtils.toSchema(getRowType()));
       } else {
-
         WindowingStrategy<?, ?> windowingStrategy = 
upstream.getWindowingStrategy();
         if (!(windowingStrategy.getWindowFn() instanceof GlobalWindows)) {
           throw new UnsupportedOperationException(
@@ -178,25 +188,37 @@ public int getCount() {
                   GlobalWindows.class.getSimpleName(), windowingStrategy));
         }
 
-        BeamSqlRowComparator comparator =
-            new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst);
+        PCollection<List<Row>> rawStream;
+        if (count == -1) { // ORDER BY
+          BeamSqlRowComparator comparator =
+              new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst);
 
-        // first find the top (offset + count)
-        PCollection<List<Row>> rawStream =
-            upstream
-                .apply(
-                    "extractTopOffsetAndFetch",
-                    Top.of(startIndex + count, comparator).withoutDefaults())
-                .setCoder(ListCoder.of(upstream.getCoder()));
+          rawStream =
+              upstream
+                  .apply("sort", Combine.globally(new SortFn(comparator)))
+                  .setCoder(ListCoder.of(upstream.getCoder()));
 
-        // strip the `leading offset`
-        if (startIndex > 0) {
+        } else { // ORDER BY LIMIT
+          ReversedBeamSqlRowComparator comparator =
+              new ReversedBeamSqlRowComparator(fieldIndices, orientation, 
nullsFirst);
+
+          // first find the top (offset + count)
           rawStream =
-              rawStream
+              upstream
                   .apply(
-                      "stripLeadingOffset",
-                      ParDo.of(new SubListFn<>(startIndex, startIndex + 
count)))
+                      "extractTopOffsetAndFetch",
+                      Top.of(startIndex + count, comparator).withoutDefaults())
                   .setCoder(ListCoder.of(upstream.getCoder()));
+
+          // strip the `leading offset`
+          if (startIndex > 0) {
+            rawStream =
+                rawStream
+                    .apply(
+                        "stripLeadingOffset",
+                        ParDo.of(new SubListFn<>(startIndex, startIndex + 
count)))
+                    .setCoder(ListCoder.of(upstream.getCoder()));
+          }
         }
 
         return rawStream
@@ -256,6 +278,86 @@ public void processElement(ProcessContext ctx) {
     }
   }
 
+  /** All the elements of the result's {@code List} must fit into the memory 
of a single machine. */
+  private static class SortFunAccumulator
+      implements Accumulator<Row, SortFunAccumulator, List<Row>> {
+
+    public TreeMultiset<Row> treeMultiset;
+
+    public SortFunAccumulator(BeamSqlRowComparator comparator) {
+      treeMultiset = TreeMultiset.create(comparator);
+    }
+
+    public SortFunAccumulator(List<Row> rows, BeamSqlRowComparator comparator) 
{
+      treeMultiset = TreeMultiset.create(comparator);
+      treeMultiset.addAll(rows);
+    }
+
+    @Override
+    public void addInput(Row input) {
+      treeMultiset.add(input);
+    }
+
+    @Override
+    public void mergeAccumulator(SortFunAccumulator other) {
+      treeMultiset.addAll(other.treeMultiset);
+    }
+
+    @Override
+    public List<Row> extractOutput() {
+      return toList();
+    }
+
+    public List<Row> toList() {
+      List<Row> ret = new ArrayList<>();
+      Iterator<Row> iterator = treeMultiset.iterator();
+      while (iterator.hasNext()) {
+        ret.add(iterator.next());
+      }
+      return ret;
+    }
+  }
+
+  private static class SortFunAccumulatorCoder extends 
CustomCoder<SortFunAccumulator> {
+    private final ListCoder<Row> listCoder;
+    private final BeamSqlRowComparator comparator;
+
+    public SortFunAccumulatorCoder(BeamSqlRowComparator comparator, Coder<Row> 
inputCoder) {
+      this.comparator = comparator;
+      this.listCoder = ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public void encode(SortFunAccumulator value, OutputStream outStream)
+        throws CoderException, IOException {
+      listCoder.encode(value.toList(), outStream);
+    }
+
+    @Override
+    public SortFunAccumulator decode(InputStream inStream) throws 
CoderException, IOException {
+      return new SortFunAccumulator(listCoder.decode(inStream), comparator);
+    }
+  }
+
+  private static class SortFn extends AccumulatingCombineFn<Row, 
SortFunAccumulator, List<Row>> {
+    private BeamSqlRowComparator comparator;
+
+    public SortFn(BeamSqlRowComparator comparator) {
+      this.comparator = comparator;
+    }
+
+    @Override
+    public SortFunAccumulator createAccumulator() {
+      return new SortFunAccumulator(comparator);
+    }
+
+    @Override
+    public Coder<SortFunAccumulator> getAccumulatorCoder(
+        CoderRegistry registry, Coder<Row> inputCoder) {
+      return new SortFunAccumulatorCoder(comparator, inputCoder);
+    }
+  }
+
   @Override
   public Sort copy(
       RelTraitSet traitSet,
@@ -317,7 +419,7 @@ public int compare(Row row1, Row row2) {
           }
         }
 
-        fieldRet *= (orientation.get(i) ? -1 : 1);
+        fieldRet *= (orientation.get(i) ? 1 : -1);
         if (fieldRet != 0) {
           return fieldRet;
         }
@@ -325,4 +427,22 @@ public int compare(Row row1, Row row2) {
       return 0;
     }
   }
+
+  private static class ReversedBeamSqlRowComparator implements 
Comparator<Row>, Serializable {
+    private BeamSqlRowComparator comparator;
+
+    public ReversedBeamSqlRowComparator(
+        List<Integer> fieldsIndices, List<Boolean> orientation, List<Boolean> 
nullsFirst) {
+      comparator = new BeamSqlRowComparator(fieldsIndices, orientation, 
nullsFirst);
+    }
+
+    @Override
+    public int compare(Row row1, Row row2) {
+      int ret = comparator.compare(row1, row2);
+      if (ret != 0) {
+        ret *= -1;
+      }
+      return ret;
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index 10dace36327..6dad990d937 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -95,6 +95,8 @@ public void prepare() {
                 .addField("site_id", Schema.FieldType.INT32)
                 .addNullableField("price", Schema.FieldType.DOUBLE)
                 .build()));
+
+    registerTable("ORDER_ID_TABLE", 
MockedBoundedTable.of(Schema.FieldType.INT64, "order_id"));
   }
 
   @Test
@@ -117,6 +119,40 @@ public void testOrderBy_basic() throws Exception {
     pipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testOnlyOrderBy_asc() {
+    String sql =
+        "INSERT INTO ORDER_ID_TABLE(order_id)  SELECT "
+            + " order_id "
+            + "FROM ORDER_DETAILS "
+            + "ORDER BY order_id ASC";
+
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows)
+        .containsInSameOrder(
+            TestUtils.RowsBuilder.of(Schema.FieldType.INT64, "order_id")
+                .addRows(1L, 1L, 2L, 2L, 5L, 6L, 7L, 8L, 8L, 10L)
+                .getRows());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testOnlyOrderBy_desc() {
+    String sql =
+        "INSERT INTO ORDER_ID_TABLE(order_id)  SELECT "
+            + " order_id "
+            + "FROM ORDER_DETAILS "
+            + "ORDER BY order_id DESC";
+
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows)
+        .containsInSameOrder(
+            TestUtils.RowsBuilder.of(Schema.FieldType.INT64, "order_id")
+                .addRows(10L, 8L, 8L, 7L, 6L, 5L, 2L, 2L, 1L, 1L)
+                .getRows());
+    pipeline.run().waitUntilFinish();
+  }
+
   @Test
   public void testOrderBy_timestamp() throws Exception {
     String sql =


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 135201)
    Time Spent: 1h  (was: 50m)

> Support ORDER BY in Global window
> ---------------------------------
>
>                 Key: BEAM-5064
>                 URL: https://issues.apache.org/jira/browse/BEAM-5064
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Beam does not support `ORDER BY` in global window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to