This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new b813e72  [FLINK-22463][table-planner-blink] Fix 
IllegalArgumentException in WindowAttachedWindowingStrategy when two phase is 
enabled for distinct agg
b813e72 is described below

commit b813e7289b013fe077f048562ef5f4118bae4a3c
Author: godfreyhe <[email protected]>
AuthorDate: Mon Apr 26 11:55:02 2021 +0800

    [FLINK-22463][table-planner-blink] Fix IllegalArgumentException in 
WindowAttachedWindowingStrategy when two phase is enabled for distinct agg
    
    This closes #15759
    
    (cherry picked from commit 1f31505a7ab1cca31a99282f1ccc4703a102abcb)
---
 .../logical/WindowAttachedWindowingStrategy.java   |    2 +-
 .../exec/stream/WindowAggregateJsonPlanTest.java   |   33 +
 .../stream/jsonplan/WindowAggregateJsonITCase.java |   45 +-
 .../testDistinctSplitEnabled.out                   | 1261 ++++++++++++++++++++
 4 files changed, 1339 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
index a28c77a..cb3cf7c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
@@ -48,7 +48,7 @@ public class WindowAttachedWindowingStrategy extends 
WindowingStrategy {
             @JsonProperty(FIELD_NAME_WINDOW_START) int windowStart,
             @JsonProperty(FIELD_NAME_WINDOW_END) int windowEnd) {
         super(window, timeAttributeType);
-        checkArgument(windowEnd >= 0 && windowStart >= 0);
+        checkArgument(windowEnd >= 0);
         this.windowStart = windowStart;
         this.windowEnd = windowEnd;
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
index a0bc8d9..f71f991 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -185,4 +186,36 @@ public class WindowAggregateJsonPlanTest extends 
TableTestBase {
                         + "     INTERVAL '15' SECOND))\n"
                         + "GROUP BY b, window_start, window_end");
     }
+
+    @Test
+    public void testDistinctSplitEnabled() {
+        tEnv.getConfig()
+                .getConfiguration()
+                .setBoolean(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  window_start timestamp(3),\n"
+                        + "  window_end timestamp(3),\n"
+                        + "  cnt_star bigint,\n"
+                        + "  sum_b bigint,\n"
+                        + "  cnt_distinct_c bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+
+        util.verifyJsonPlan(
+                "insert into MySink select a, "
+                        + "   window_start, "
+                        + "   window_end, "
+                        + "   count(*), "
+                        + "   sum(b), "
+                        + "   count(distinct c) AS uv "
+                        + "FROM TABLE ("
+                        + "   CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), 
INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) "
+                        + "GROUP BY a, window_start, window_end");
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
index 1e3e122..e18d14a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
@@ -42,7 +42,7 @@ public class WindowAggregateJsonITCase extends 
JsonPlanTestBase {
     public static Object[] parameters() {
         return new Object[][] {
             new Object[] {AggregatePhaseStrategy.ONE_PHASE},
-            new Object[] {AggregatePhaseStrategy.ONE_PHASE}
+            new Object[] {AggregatePhaseStrategy.TWO_PHASE}
         };
     }
 
@@ -180,4 +180,47 @@ public class WindowAggregateJsonITCase extends 
JsonPlanTestBase {
                         "+I[null, 1]"),
                 result);
     }
+
+    @Test
+    public void testDistinctSplitEnabled() throws Exception {
+        tableEnv.getConfig()
+                .getConfiguration()
+                .setBoolean(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+        createTestValuesSinkTable(
+                "MySink", "name STRING", "max_double DOUBLE", 
"cnt_distinct_int BIGINT");
+
+        String jsonPlan =
+                tableEnv.getJsonPlan(
+                        "insert into MySink select name, "
+                                + "   max(`double`),\n"
+                                + "   count(distinct `int`) "
+                                + "FROM TABLE ("
+                                + "  CUMULATE(\n"
+                                + "     TABLE MyTable,\n"
+                                + "     DESCRIPTOR(rowtime),\n"
+                                + "     INTERVAL '5' SECOND,\n"
+                                + "     INTERVAL '15' SECOND))"
+                                + "GROUP BY name, window_start, window_end");
+        tableEnv.executeJsonPlan(jsonPlan).await();
+
+        List<String> result = TestValuesTableFactory.getResults("MySink");
+        assertResult(
+                Arrays.asList(
+                        "+I[a, 5.0, 3]",
+                        "+I[a, 5.0, 4]",
+                        "+I[a, 5.0, 4]",
+                        "+I[b, 3.0, 1]",
+                        "+I[b, 3.0, 1]",
+                        "+I[b, 3.0, 1]",
+                        "+I[b, 4.0, 1]",
+                        "+I[b, 4.0, 1]",
+                        "+I[b, 4.0, 1]",
+                        "+I[b, 6.0, 2]",
+                        "+I[b, 6.0, 2]",
+                        "+I[null, 7.0, 1]",
+                        "+I[null, 7.0, 1]",
+                        "+I[null, 7.0, 1]"),
+                result);
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
new file mode 100644
index 0000000..f6b73fe
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
@@ -0,0 +1,1261 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+    "scanTableSource" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MyTable"
+      },
+      "catalogTable" : {
+        "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND",
+        "schema.4.expr" : "PROCTIME()",
+        "schema.0.data-type" : "INT",
+        "schema.2.name" : "c",
+        "schema.1.name" : "b",
+        "schema.4.name" : "proctime",
+        "schema.1.data-type" : "BIGINT",
+        "schema.3.data-type" : "TIMESTAMP(3)",
+        "schema.2.data-type" : "VARCHAR(2147483647)",
+        "schema.3.name" : "rowtime",
+        "connector" : "values",
+        "schema.watermark.0.rowtime" : "rowtime",
+        "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)",
+        "schema.3.expr" : "TO_TIMESTAMP(`c`)",
+        "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL",
+        "schema.0.name" : "a"
+      }
+    },
+    "id" : 1,
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "BIGINT"
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "typeName" : "VARCHAR",
+        "nullable" : true,
+        "precision" : 2147483647
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "TO_TIMESTAMP",
+        "kind" : "OTHER_FUNCTION",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : {
+          "typeName" : "VARCHAR",
+          "nullable" : true,
+          "precision" : 2147483647
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "PROCTIME",
+        "kind" : "OTHER_FUNCTION",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ ],
+      "type" : {
+        "timestampKind" : "PROCTIME",
+        "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false
+      }
+    } ],
+    "condition" : null,
+    "id" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "BIGINT"
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "proctime" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, 
PROCTIME() AS proctime])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner",
+    "watermarkExpr" : {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "-",
+        "kind" : "MINUS",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : {
+          "typeName" : "TIMESTAMP",
+          "nullable" : true,
+          "precision" : 3
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1000,
+        "type" : {
+          "typeName" : "INTERVAL_SECOND",
+          "nullable" : false,
+          "precision" : 2,
+          "scale" : 6
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    },
+    "rowtimeFieldIndex" : 3,
+    "id" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "BIGINT"
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "proctime" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "typeName" : "VARCHAR",
+        "nullable" : true,
+        "precision" : 2147483647
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "MOD",
+        "kind" : "MOD",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ {
+        "kind" : "REX_CALL",
+        "operator" : {
+          "name" : "HASH_CODE",
+          "kind" : "OTHER_FUNCTION",
+          "syntax" : "FUNCTION"
+        },
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : {
+            "typeName" : "VARCHAR",
+            "nullable" : true,
+            "precision" : 2147483647
+          }
+        } ],
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1024",
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : false
+        }
+      } ],
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : {
+        "timestampKind" : "ROWTIME",
+        "typeName" : "TIMESTAMP",
+        "nullable" : true
+      }
+    } ],
+    "condition" : null,
+    "id" : 4,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "BIGINT"
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "$f5" : "INT"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, b, c, MOD(HASH_CODE(c), 1024) AS $f5, 
rowtime])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate",
+    "grouping" : [ 0, 3 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "COUNT",
+        "kind" : "COUNT",
+        "syntax" : "FUNCTION_STAR"
+      },
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "SUM",
+        "kind" : "SUM",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "COUNT",
+        "kind" : "COUNT",
+        "syntax" : "FUNCTION_STAR"
+      },
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    } ],
+    "windowing" : {
+      "strategy" : "TimeAttribute",
+      "window" : {
+        "type" : "CumulativeWindow",
+        "maxSize" : "PT1H",
+        "step" : "PT10M"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "nullable" : true,
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "timeAttributeIndex" : 4,
+      "isRowtime" : true
+    },
+    "id" : 5,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "$f5" : "INT"
+      }, {
+        "count1$0" : "BIGINT"
+      }, {
+        "sum$1" : "BIGINT"
+      }, {
+        "count$2" : "BIGINT"
+      }, {
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+      }, {
+        "$slice_end" : "BIGINT"
+      } ]
+    },
+    "description" : "LocalWindowAggregate(groupBy=[a, $f5], 
window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], 
select=[a, $f5, COUNT(*) AS count1$0, SUM(b) AS sum$1, COUNT(distinct$0 c) AS 
count$2, DISTINCT(c) AS distinct$0, slice_end('w$) AS $slice_end])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
+    "id" : 6,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "$f5" : "INT"
+      }, {
+        "count1$0" : "BIGINT"
+      }, {
+        "sum$1" : "BIGINT"
+      }, {
+        "count$2" : "BIGINT"
+      }, {
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+      }, {
+        "$slice_end" : "BIGINT"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a, $f5]])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate",
+    "grouping" : [ 0, 1 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "COUNT",
+        "kind" : "COUNT",
+        "syntax" : "FUNCTION_STAR"
+      },
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "SUM",
+        "kind" : "SUM",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "COUNT",
+        "kind" : "COUNT",
+        "syntax" : "FUNCTION_STAR"
+      },
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    } ],
+    "windowing" : {
+      "strategy" : "SliceAttached",
+      "window" : {
+        "type" : "CumulativeWindow",
+        "maxSize" : "PT1H",
+        "step" : "PT10M"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "nullable" : true,
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "sliceEnd" : 6,
+      "isRowtime" : true
+    },
+    "namedWindowProperties" : [ {
+      "name" : "window_start",
+      "property" : {
+        "kind" : "WindowStart",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "nullable" : true,
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "window_end",
+      "property" : {
+        "kind" : "WindowEnd",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "nullable" : true,
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    } ],
+    "id" : 7,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "localAggInputRowType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "BIGINT"
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "$f5" : "INT"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "$f5" : "INT"
+      }, {
+        "$f2" : "BIGINT NOT NULL"
+      }, {
+        "$f3" : "BIGINT"
+      }, {
+        "$f4" : "BIGINT NOT NULL"
+      }, {
+        "window_start" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "window_end" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      } ]
+    },
+    "description" : "GlobalWindowAggregate(groupBy=[a, $f5], 
window=[CUMULATE(slice_end=[$slice_end], max_size=[1 h], step=[10 min])], 
select=[a, $f5, COUNT(count1$0) AS $f2, SUM(sum$1) AS $f3, COUNT(distinct$0 
count$2) AS $f4, start('w$) AS window_start, end('w$) AS window_end])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : false,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : false,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    } ],
+    "condition" : null,
+    "id" : 8,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "window_start" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "window_end" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "$f5" : "INT"
+      }, {
+        "$f4" : "BIGINT NOT NULL"
+      }, {
+        "$f5_0" : "BIGINT"
+      }, {
+        "$f6" : "BIGINT NOT NULL"
+      } ]
+    },
+    "description" : "Calc(select=[a, window_start, window_end, $f5, $f2 AS 
$f4, $f3 AS $f5_0, $f4 AS $f6])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "$SUM0",
+        "kind" : "SUM0",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 4 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "SUM",
+        "kind" : "SUM",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 5 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "$SUM0",
+        "kind" : "SUM0",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 6 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    } ],
+    "windowing" : {
+      "strategy" : "WindowAttached",
+      "window" : {
+        "type" : "CumulativeWindow",
+        "maxSize" : "PT1H",
+        "step" : "PT10M"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "nullable" : true,
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "windowStart" : 1,
+      "windowEnd" : 2,
+      "isRowtime" : true
+    },
+    "id" : 9,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "sum$0" : "BIGINT"
+      }, {
+        "sum$1" : "BIGINT"
+      }, {
+        "sum$2" : "BIGINT"
+      }, {
+        "$window_end" : "BIGINT"
+      } ]
+    },
+    "description" : "LocalWindowAggregate(groupBy=[a], 
window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 
h], step=[10 min])], select=[a, $SUM0($f4) AS sum$0, SUM($f5_0) AS sum$1, 
$SUM0($f6) AS sum$2, slice_end('w$) AS $window_end])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
+    "id" : 10,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "sum$0" : "BIGINT"
+      }, {
+        "sum$1" : "BIGINT"
+      }, {
+        "sum$2" : "BIGINT"
+      }, {
+        "$window_end" : "BIGINT"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "$SUM0",
+        "kind" : "SUM0",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 4 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "SUM",
+        "kind" : "SUM",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 5 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "name" : null,
+      "aggFunction" : {
+        "name" : "$SUM0",
+        "kind" : "SUM0",
+        "syntax" : "FUNCTION"
+      },
+      "argList" : [ 6 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : false
+      }
+    } ],
+    "windowing" : {
+      "strategy" : "WindowAttached",
+      "window" : {
+        "type" : "CumulativeWindow",
+        "maxSize" : "PT1H",
+        "step" : "PT10M"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "nullable" : true,
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "windowStart" : -1,
+      "windowEnd" : 4,
+      "isRowtime" : true
+    },
+    "namedWindowProperties" : [ {
+      "name" : "window_start",
+      "property" : {
+        "kind" : "WindowStart",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "nullable" : true,
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "window_end",
+      "property" : {
+        "kind" : "WindowEnd",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "nullable" : true,
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    } ],
+    "id" : 11,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "localAggInputRowType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "window_start" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "window_end" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "$f5" : "INT"
+      }, {
+        "$f4" : "BIGINT NOT NULL"
+      }, {
+        "$f5_0" : "BIGINT"
+      }, {
+        "$f6" : "BIGINT NOT NULL"
+      } ]
+    },
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "$f1" : "BIGINT NOT NULL"
+      }, {
+        "$f2" : "BIGINT"
+      }, {
+        "$f3" : "BIGINT NOT NULL"
+      }, {
+        "window_start" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "window_end" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      } ]
+    },
+    "description" : "GlobalWindowAggregate(groupBy=[a], 
window=[CUMULATE(win_end=[$window_end], max_size=[1 h], step=[10 min])], 
select=[a, $SUM0(sum$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3, 
start('w$) AS window_start, end('w$) AS window_end])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "CAST",
+        "kind" : "CAST",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      } ],
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "CAST",
+        "kind" : "CAST",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : {
+          "typeName" : "TIMESTAMP",
+          "nullable" : false,
+          "precision" : 3
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "CAST",
+        "kind" : "CAST",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 5,
+        "type" : {
+          "typeName" : "TIMESTAMP",
+          "nullable" : false,
+          "precision" : 3
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "CAST",
+        "kind" : "CAST",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : {
+          "typeName" : "BIGINT",
+          "nullable" : false
+        }
+      } ],
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "CAST",
+        "kind" : "CAST",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : {
+          "typeName" : "BIGINT",
+          "nullable" : false
+        }
+      } ],
+      "type" : {
+        "typeName" : "BIGINT",
+        "nullable" : true
+      }
+    } ],
+    "condition" : null,
+    "id" : 12,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "BIGINT"
+      }, {
+        "window_start" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "window_end" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "cnt_star" : "BIGINT"
+      }, {
+        "sum_b" : "BIGINT"
+      }, {
+        "cnt_distinct_c" : "BIGINT"
+      } ]
+    },
+    "description" : "Calc(select=[CAST(a) AS a, CAST(window_start) AS 
window_start, CAST(window_end) AS window_end, CAST($f1) AS cnt_star, $f2 AS 
sum_b, CAST($f3) AS cnt_distinct_c])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+    "dynamicTableSink" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MySink"
+      },
+      "catalogTable" : {
+        "schema.5.name" : "cnt_distinct_c",
+        "sink-insert-only" : "false",
+        "schema.0.data-type" : "BIGINT",
+        "schema.2.name" : "window_end",
+        "schema.1.name" : "window_start",
+        "schema.4.name" : "sum_b",
+        "schema.1.data-type" : "TIMESTAMP(3)",
+        "schema.3.data-type" : "BIGINT",
+        "table-sink-class" : "DEFAULT",
+        "schema.2.data-type" : "TIMESTAMP(3)",
+        "schema.3.name" : "cnt_star",
+        "connector" : "values",
+        "schema.5.data-type" : "BIGINT",
+        "schema.4.data-type" : "BIGINT",
+        "schema.0.name" : "a"
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "id" : 13,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "BIGINT"
+      }, {
+        "window_start" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "window_end" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "cnt_star" : "BIGINT"
+      }, {
+        "sum_b" : "BIGINT"
+      }, {
+        "cnt_distinct_c" : "BIGINT"
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, window_start, window_end, cnt_star, sum_b, cnt_distinct_c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to