Repository: storm
Updated Branches:
  refs/heads/1.x-branch ae0764769 -> 555146fc6


[STORM-2144] Fix Storm-sql group-by behavior in standalone mode

Fix group-by to not rely on monotonic group-by keys


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9bff7607
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9bff7607
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9bff7607

Branch: refs/heads/1.x-branch
Commit: 9bff7607fb2ed271d4326e81ba66782014e1f418
Parents: 311da95
Author: Arun Mahadevan <ar...@apache.org>
Authored: Mon Oct 10 14:56:56 2016 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Mon Oct 10 14:56:56 2016 +0530

----------------------------------------------------------------------
 .../backends/standalone/PlanCompiler.java       |  1 +
 .../backends/standalone/RelNodeCompiler.java    | 29 ++++++++++----------
 2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9bff7607/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
index 8131934..4c69da1 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
@@ -40,6 +40,7 @@ public class PlanCompiler {
       "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
       "import java.util.Iterator;", "import java.util.Map;", "import 
java.util.HashMap;",
       "import java.util.List;", "import java.util.ArrayList;",
+      "import java.util.LinkedHashMap;",
       "import org.apache.storm.tuple.Values;",
       "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
       "import org.apache.storm.sql.runtime.Channels;",

http://git-wip-us.apache.org/repos/asf/storm/blob/9bff7607/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index c6adb28..5c674ad 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -66,8 +66,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
           "  private static final ChannelHandler %1$s = ",
           "    new AbstractChannelHandler() {",
           "    private final Values EMPTY_VALUES = new Values();",
-          "    private List<Object> prevGroupValues = null;",
-          "    private final Map<String, Object> accumulators = new 
HashMap<>();",
+          "    private final Map<List<Object>, Map<String, Object>> state = 
new LinkedHashMap<>();",
           "    private final int[] groupIndices = new int[] {%2$s};",
           "    private List<Object> getGroupValues(Values _data) {",
           "      List<Object> res = new ArrayList<>();",
@@ -81,11 +80,15 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> 
{
           "    public void flush(ChannelContext ctx) {",
           "      emitAggregateResults(ctx);",
           "      super.flush(ctx);",
-          "      prevGroupValues = null;",
+          "      state.clear();",
           "    }",
           "",
           "    private void emitAggregateResults(ChannelContext ctx) {",
-          "    %3$s",
+          "        for (Map.Entry<List<Object>, Map<String, Object>> entry: 
state.entrySet()) {",
+          "          List<Object> groupValues = entry.getKey();",
+          "          Map<String, Object> accumulators = entry.getValue();",
+          "          %3$s",
+          "        }",
           "    }",
           "",
           "    @Override",
@@ -245,18 +248,16 @@ class RelNodeCompiler extends 
PostOrderRelNodeVisitor<Void> {
   @Override
   public Void visitAggregate(Aggregate aggregate, List<Void> inputStreams) 
throws Exception {
     beginAggregateStage(aggregate);
-    pw.println("        List<Object> curGroupValues = _data == null ? null : 
getGroupValues(_data);");
-    pw.println("        if (prevGroupValues != null && 
!prevGroupValues.equals(curGroupValues)) {");
-    pw.println("          emitAggregateResults(ctx);");
+    pw.println("        if (_data != null) {");
+    pw.println("        List<Object> curGroupValues = getGroupValues(_data);");
+    pw.println("        if (!state.containsKey(curGroupValues)) {");
+    pw.println("          state.put(curGroupValues, new HashMap<String, 
Object>());");
     pw.println("        }");
-    pw.println("        if (curGroupValues != null) {");
+    pw.println("        Map<String, Object> accumulators = 
state.get(curGroupValues);");
     for (AggregateCall call : aggregate.getAggCallList()) {
       aggregate(call);
     }
     pw.println("        }");
-    pw.println("        if (prevGroupValues != curGroupValues) {");
-    pw.println("          prevGroupValues = curGroupValues;");
-    pw.println("        }");
     endStage();
     return null;
   }
@@ -293,10 +294,8 @@ class RelNodeCompiler extends 
PostOrderRelNodeVisitor<Void> {
     }
     return NEW_LINE_JOINER.join(sw.toString(),
                                 String.format("          ctx.emit(new 
Values(%s, %s));",
-                                              
groupValueEmitStr("prevGroupValues", aggregate.getGroupSet().cardinality()),
-                                              Joiner.on(", ").join(res)),
-                                "          accumulators.clear();"
-    );
+                                              groupValueEmitStr("groupValues", 
aggregate.getGroupSet().cardinality()),
+                                              Joiner.on(", ").join(res)));
   }
 
   private String aggregateResult(AggregateCall call, PrintWriter pw) {

Reply via email to