This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new b813e72 [FLINK-22463][table-planner-blink] Fix
IllegalArgumentException in WindowAttachedWindowingStrategy when two phase is
enabled for distinct agg
b813e72 is described below
commit b813e7289b013fe077f048562ef5f4118bae4a3c
Author: godfreyhe <[email protected]>
AuthorDate: Mon Apr 26 11:55:02 2021 +0800
[FLINK-22463][table-planner-blink] Fix IllegalArgumentException in
WindowAttachedWindowingStrategy when two phase is enabled for distinct agg
This closes #15759
(cherry picked from commit 1f31505a7ab1cca31a99282f1ccc4703a102abcb)
---
.../logical/WindowAttachedWindowingStrategy.java | 2 +-
.../exec/stream/WindowAggregateJsonPlanTest.java | 33 +
.../stream/jsonplan/WindowAggregateJsonITCase.java | 45 +-
.../testDistinctSplitEnabled.out | 1261 ++++++++++++++++++++
4 files changed, 1339 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
index a28c77a..cb3cf7c 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java
@@ -48,7 +48,7 @@ public class WindowAttachedWindowingStrategy extends
WindowingStrategy {
@JsonProperty(FIELD_NAME_WINDOW_START) int windowStart,
@JsonProperty(FIELD_NAME_WINDOW_END) int windowEnd) {
super(window, timeAttributeType);
- checkArgument(windowEnd >= 0 && windowStart >= 0);
+ checkArgument(windowEnd >= 0);
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
index a0bc8d9..f71f991 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
import
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
@@ -185,4 +186,36 @@ public class WindowAggregateJsonPlanTest extends
TableTestBase {
+ " INTERVAL '15' SECOND))\n"
+ "GROUP BY b, window_start, window_end");
}
+
+ @Test
+ public void testDistinctSplitEnabled() {
+ tEnv.getConfig()
+ .getConfiguration()
+ .setBoolean(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " window_start timestamp(3),\n"
+ + " window_end timestamp(3),\n"
+ + " cnt_star bigint,\n"
+ + " sum_b bigint,\n"
+ + " cnt_distinct_c bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+
+ util.verifyJsonPlan(
+ "insert into MySink select a, "
+ + " window_start, "
+ + " window_end, "
+ + " count(*), "
+ + " sum(b), "
+ + " count(distinct c) AS uv "
+ + "FROM TABLE ("
+ + " CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime),
INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) "
+ + "GROUP BY a, window_start, window_end");
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
index 1e3e122..e18d14a 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
@@ -42,7 +42,7 @@ public class WindowAggregateJsonITCase extends
JsonPlanTestBase {
public static Object[] parameters() {
return new Object[][] {
new Object[] {AggregatePhaseStrategy.ONE_PHASE},
- new Object[] {AggregatePhaseStrategy.ONE_PHASE}
+ new Object[] {AggregatePhaseStrategy.TWO_PHASE}
};
}
@@ -180,4 +180,47 @@ public class WindowAggregateJsonITCase extends
JsonPlanTestBase {
"+I[null, 1]"),
result);
}
+
+ @Test
+ public void testDistinctSplitEnabled() throws Exception {
+ tableEnv.getConfig()
+ .getConfiguration()
+ .setBoolean(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
+ createTestValuesSinkTable(
+ "MySink", "name STRING", "max_double DOUBLE",
"cnt_distinct_int BIGINT");
+
+ String jsonPlan =
+ tableEnv.getJsonPlan(
+ "insert into MySink select name, "
+ + " max(`double`),\n"
+ + " count(distinct `int`) "
+ + "FROM TABLE ("
+ + " CUMULATE(\n"
+ + " TABLE MyTable,\n"
+ + " DESCRIPTOR(rowtime),\n"
+ + " INTERVAL '5' SECOND,\n"
+ + " INTERVAL '15' SECOND))"
+ + "GROUP BY name, window_start, window_end");
+ tableEnv.executeJsonPlan(jsonPlan).await();
+
+ List<String> result = TestValuesTableFactory.getResults("MySink");
+ assertResult(
+ Arrays.asList(
+ "+I[a, 5.0, 3]",
+ "+I[a, 5.0, 4]",
+ "+I[a, 5.0, 4]",
+ "+I[b, 3.0, 1]",
+ "+I[b, 3.0, 1]",
+ "+I[b, 3.0, 1]",
+ "+I[b, 4.0, 1]",
+ "+I[b, 4.0, 1]",
+ "+I[b, 4.0, 1]",
+ "+I[b, 6.0, 2]",
+ "+I[b, 6.0, 2]",
+ "+I[null, 7.0, 1]",
+ "+I[null, 7.0, 1]",
+ "+I[null, 7.0, 1]"),
+ result);
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
new file mode 100644
index 0000000..f6b73fe
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
@@ -0,0 +1,1261 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+ "scanTableSource" : {
+ "identifier" : {
+ "catalogName" : "default_catalog",
+ "databaseName" : "default_database",
+ "tableName" : "MyTable"
+ },
+ "catalogTable" : {
+ "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND",
+ "schema.4.expr" : "PROCTIME()",
+ "schema.0.data-type" : "INT",
+ "schema.2.name" : "c",
+ "schema.1.name" : "b",
+ "schema.4.name" : "proctime",
+ "schema.1.data-type" : "BIGINT",
+ "schema.3.data-type" : "TIMESTAMP(3)",
+ "schema.2.data-type" : "VARCHAR(2147483647)",
+ "schema.3.name" : "rowtime",
+ "connector" : "values",
+ "schema.watermark.0.rowtime" : "rowtime",
+ "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)",
+ "schema.3.expr" : "TO_TIMESTAMP(`c`)",
+ "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL",
+ "schema.0.name" : "a"
+ }
+ },
+ "id" : 1,
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "BIGINT"
+ }, {
+ "c" : "VARCHAR(2147483647)"
+ } ]
+ },
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "VARCHAR",
+ "nullable" : true,
+ "precision" : 2147483647
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "TO_TIMESTAMP",
+ "kind" : "OTHER_FUNCTION",
+ "syntax" : "FUNCTION"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "VARCHAR",
+ "nullable" : true,
+ "precision" : 2147483647
+ }
+ } ],
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : true,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "PROCTIME",
+ "kind" : "OTHER_FUNCTION",
+ "syntax" : "FUNCTION"
+ },
+ "operands" : [ ],
+ "type" : {
+ "timestampKind" : "PROCTIME",
+ "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false
+ }
+ } ],
+ "condition" : null,
+ "id" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "BIGINT"
+ }, {
+ "c" : "VARCHAR(2147483647)"
+ }, {
+ "rowtime" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "proctime" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime,
PROCTIME() AS proctime])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner",
+ "watermarkExpr" : {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "-",
+ "kind" : "MINUS",
+ "syntax" : "SPECIAL"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : true,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1000,
+ "type" : {
+ "typeName" : "INTERVAL_SECOND",
+ "nullable" : false,
+ "precision" : 2,
+ "scale" : 6
+ }
+ } ],
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : true,
+ "precision" : 3
+ }
+ },
+ "rowtimeFieldIndex" : 3,
+ "id" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "BIGINT"
+ }, {
+ "c" : "VARCHAR(2147483647)"
+ }, {
+ "rowtime" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "proctime" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime
- 1000:INTERVAL SECOND)])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "VARCHAR",
+ "nullable" : true,
+ "precision" : 2147483647
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "MOD",
+ "kind" : "MOD",
+ "syntax" : "FUNCTION"
+ },
+ "operands" : [ {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "HASH_CODE",
+ "kind" : "OTHER_FUNCTION",
+ "syntax" : "FUNCTION"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "VARCHAR",
+ "nullable" : true,
+ "precision" : 2147483647
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1024",
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : false
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : {
+ "timestampKind" : "ROWTIME",
+ "typeName" : "TIMESTAMP",
+ "nullable" : true
+ }
+ } ],
+ "condition" : null,
+ "id" : 4,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "BIGINT"
+ }, {
+ "c" : "VARCHAR(2147483647)"
+ }, {
+ "$f5" : "INT"
+ }, {
+ "rowtime" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "Calc(select=[a, b, c, MOD(HASH_CODE(c), 1024) AS $f5,
rowtime])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate",
+ "grouping" : [ 0, 3 ],
+ "aggCalls" : [ {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "COUNT",
+ "kind" : "COUNT",
+ "syntax" : "FUNCTION_STAR"
+ },
+ "argList" : [ ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "SUM",
+ "kind" : "SUM",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "COUNT",
+ "kind" : "COUNT",
+ "syntax" : "FUNCTION_STAR"
+ },
+ "argList" : [ 2 ],
+ "filterArg" : -1,
+ "distinct" : true,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ } ],
+ "windowing" : {
+ "strategy" : "TimeAttribute",
+ "window" : {
+ "type" : "CumulativeWindow",
+ "maxSize" : "PT1H",
+ "step" : "PT10M"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "timeAttributeIndex" : 4,
+ "isRowtime" : true
+ },
+ "id" : 5,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "$f5" : "INT"
+ }, {
+ "count1$0" : "BIGINT"
+ }, {
+ "sum$1" : "BIGINT"
+ }, {
+ "count$2" : "BIGINT"
+ }, {
+ "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView',
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
[...]
+ }, {
+ "$slice_end" : "BIGINT"
+ } ]
+ },
+ "description" : "LocalWindowAggregate(groupBy=[a, $f5],
window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])],
select=[a, $f5, COUNT(*) AS count1$0, SUM(b) AS sum$1, COUNT(distinct$0 c) AS
count$2, DISTINCT(c) AS distinct$0, slice_end('w$) AS $slice_end])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
+ "id" : 6,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "$f5" : "INT"
+ }, {
+ "count1$0" : "BIGINT"
+ }, {
+ "sum$1" : "BIGINT"
+ }, {
+ "count$2" : "BIGINT"
+ }, {
+ "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView',
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
[...]
+ }, {
+ "$slice_end" : "BIGINT"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[a, $f5]])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate",
+ "grouping" : [ 0, 1 ],
+ "aggCalls" : [ {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "COUNT",
+ "kind" : "COUNT",
+ "syntax" : "FUNCTION_STAR"
+ },
+ "argList" : [ ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "SUM",
+ "kind" : "SUM",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "COUNT",
+ "kind" : "COUNT",
+ "syntax" : "FUNCTION_STAR"
+ },
+ "argList" : [ 2 ],
+ "filterArg" : -1,
+ "distinct" : true,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ } ],
+ "windowing" : {
+ "strategy" : "SliceAttached",
+ "window" : {
+ "type" : "CumulativeWindow",
+ "maxSize" : "PT1H",
+ "step" : "PT10M"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "sliceEnd" : 6,
+ "isRowtime" : true
+ },
+ "namedWindowProperties" : [ {
+ "name" : "window_start",
+ "property" : {
+ "kind" : "WindowStart",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ }, {
+ "name" : "window_end",
+ "property" : {
+ "kind" : "WindowEnd",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ } ],
+ "id" : 7,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "localAggInputRowType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "BIGINT"
+ }, {
+ "c" : "VARCHAR(2147483647)"
+ }, {
+ "$f5" : "INT"
+ }, {
+ "rowtime" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "$f5" : "INT"
+ }, {
+ "$f2" : "BIGINT NOT NULL"
+ }, {
+ "$f3" : "BIGINT"
+ }, {
+ "$f4" : "BIGINT NOT NULL"
+ }, {
+ "window_start" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "window_end" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ } ]
+ },
+ "description" : "GlobalWindowAggregate(groupBy=[a, $f5],
window=[CUMULATE(slice_end=[$slice_end], max_size=[1 h], step=[10 min])],
select=[a, $f5, COUNT(count1$0) AS $f2, SUM(sum$1) AS $f3, COUNT(distinct$0
count$2) AS $f4, start('w$) AS window_start, end('w$) AS window_end])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : false,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : false,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ } ],
+ "condition" : null,
+ "id" : 8,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "window_start" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "window_end" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "$f5" : "INT"
+ }, {
+ "$f4" : "BIGINT NOT NULL"
+ }, {
+ "$f5_0" : "BIGINT"
+ }, {
+ "$f6" : "BIGINT NOT NULL"
+ } ]
+ },
+ "description" : "Calc(select=[a, window_start, window_end, $f5, $f2 AS
$f4, $f3 AS $f5_0, $f4 AS $f6])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "$SUM0",
+ "kind" : "SUM0",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 4 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "SUM",
+ "kind" : "SUM",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 5 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "$SUM0",
+ "kind" : "SUM0",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 6 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ } ],
+ "windowing" : {
+ "strategy" : "WindowAttached",
+ "window" : {
+ "type" : "CumulativeWindow",
+ "maxSize" : "PT1H",
+ "step" : "PT10M"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "windowStart" : 1,
+ "windowEnd" : 2,
+ "isRowtime" : true
+ },
+ "id" : 9,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "sum$0" : "BIGINT"
+ }, {
+ "sum$1" : "BIGINT"
+ }, {
+ "sum$2" : "BIGINT"
+ }, {
+ "$window_end" : "BIGINT"
+ } ]
+ },
+ "description" : "LocalWindowAggregate(groupBy=[a],
window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1
h], step=[10 min])], select=[a, $SUM0($f4) AS sum$0, SUM($f5_0) AS sum$1,
$SUM0($f6) AS sum$2, slice_end('w$) AS $window_end])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
+ "id" : 10,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "sum$0" : "BIGINT"
+ }, {
+ "sum$1" : "BIGINT"
+ }, {
+ "sum$2" : "BIGINT"
+ }, {
+ "$window_end" : "BIGINT"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "$SUM0",
+ "kind" : "SUM0",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 4 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "SUM",
+ "kind" : "SUM",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 5 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "name" : null,
+ "aggFunction" : {
+ "name" : "$SUM0",
+ "kind" : "SUM0",
+ "syntax" : "FUNCTION"
+ },
+ "argList" : [ 6 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ } ],
+ "windowing" : {
+ "strategy" : "WindowAttached",
+ "window" : {
+ "type" : "CumulativeWindow",
+ "maxSize" : "PT1H",
+ "step" : "PT10M"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "windowStart" : -1,
+ "windowEnd" : 4,
+ "isRowtime" : true
+ },
+ "namedWindowProperties" : [ {
+ "name" : "window_start",
+ "property" : {
+ "kind" : "WindowStart",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ }, {
+ "name" : "window_end",
+ "property" : {
+ "kind" : "WindowEnd",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ } ],
+ "id" : 11,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "localAggInputRowType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "window_start" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "window_end" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "$f5" : "INT"
+ }, {
+ "$f4" : "BIGINT NOT NULL"
+ }, {
+ "$f5_0" : "BIGINT"
+ }, {
+ "$f6" : "BIGINT NOT NULL"
+ } ]
+ },
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "$f1" : "BIGINT NOT NULL"
+ }, {
+ "$f2" : "BIGINT"
+ }, {
+ "$f3" : "BIGINT NOT NULL"
+ }, {
+ "window_start" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "window_end" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ } ]
+ },
+ "description" : "GlobalWindowAggregate(groupBy=[a],
window=[CUMULATE(win_end=[$window_end], max_size=[1 h], step=[10 min])],
select=[a, $SUM0(sum$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3,
start('w$) AS window_start, end('w$) AS window_end])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "CAST",
+ "kind" : "CAST",
+ "syntax" : "SPECIAL"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "CAST",
+ "kind" : "CAST",
+ "syntax" : "SPECIAL"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : false,
+ "precision" : 3
+ }
+ } ],
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : true,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "CAST",
+ "kind" : "CAST",
+ "syntax" : "SPECIAL"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : false,
+ "precision" : 3
+ }
+ } ],
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : true,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "CAST",
+ "kind" : "CAST",
+ "syntax" : "SPECIAL"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ } ],
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "CAST",
+ "kind" : "CAST",
+ "syntax" : "SPECIAL"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : false
+ }
+ } ],
+ "type" : {
+ "typeName" : "BIGINT",
+ "nullable" : true
+ }
+ } ],
+ "condition" : null,
+ "id" : 12,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "BIGINT"
+ }, {
+ "window_start" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "window_end" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "cnt_star" : "BIGINT"
+ }, {
+ "sum_b" : "BIGINT"
+ }, {
+ "cnt_distinct_c" : "BIGINT"
+ } ]
+ },
+ "description" : "Calc(select=[CAST(a) AS a, CAST(window_start) AS
window_start, CAST(window_end) AS window_end, CAST($f1) AS cnt_star, $f2 AS
sum_b, CAST($f3) AS cnt_distinct_c])"
+ }, {
+ "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+ "dynamicTableSink" : {
+ "identifier" : {
+ "catalogName" : "default_catalog",
+ "databaseName" : "default_database",
+ "tableName" : "MySink"
+ },
+ "catalogTable" : {
+ "schema.5.name" : "cnt_distinct_c",
+ "sink-insert-only" : "false",
+ "schema.0.data-type" : "BIGINT",
+ "schema.2.name" : "window_end",
+ "schema.1.name" : "window_start",
+ "schema.4.name" : "sum_b",
+ "schema.1.data-type" : "TIMESTAMP(3)",
+ "schema.3.data-type" : "BIGINT",
+ "table-sink-class" : "DEFAULT",
+ "schema.2.data-type" : "TIMESTAMP(3)",
+ "schema.3.name" : "cnt_star",
+ "connector" : "values",
+ "schema.5.data-type" : "BIGINT",
+ "schema.4.data-type" : "BIGINT",
+ "schema.0.name" : "a"
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "id" : 13,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "BIGINT"
+ }, {
+ "window_start" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "window_end" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "cnt_star" : "BIGINT"
+ }, {
+ "sum_b" : "BIGINT"
+ }, {
+ "cnt_distinct_c" : "BIGINT"
+ } ]
+ },
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[a, window_start, window_end, cnt_star, sum_b, cnt_distinct_c])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 12,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file