This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 29c81a0b1d3e3ae27eef386001db4db6e4a6b4ca Author: Timo Walther <[email protected]> AuthorDate: Mon Apr 12 15:15:14 2021 +0200 [FLINK-20613][table-planner-blink] Support unidirectional ExternalSerializer --- ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 8 ++-- .../testIncrementalAggregate.out | 54 +++++++++++----------- .../testEventTimeTumbleWindow.out | 12 ++--- .../runtime/typeutils/ExternalSerializer.java | 47 +++++++++++++++---- .../table/runtime/typeutils/ExternalTypeInfo.java | 25 ++++++++-- 5 files changed, 97 insertions(+), 49 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out index b543285..5768acf 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out @@ -262,9 +262,9 @@ }, { "count$5" : "BIGINT" }, { - "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] }, { - "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] } ] }, "description" : "LocalGroupAggregate(groupBy=[c], select=[c, COUNT(distinct$0 a) FILTER $f2 AS count$0, COUNT(distinct$0 a) AS count$1, SUM(distinct$0 a) AS sum$2, SUM(distinct$1 b) AS sum$3, AVG(b) AS (sum$4, count$5), DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])" @@ -297,9 +297,9 @@ }, { "count$5" : "BIGINT" }, { - "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] }, { - "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] } ] }, "description" : "Exchange(distribution=[hash[c]])" diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out index 7d1af16..a54cb42 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out @@ -34,7 +34,7 @@ } } ] }, - "id" : 1, + "id" : 18, "outputType" : { "type" : "ROW", "nullable" : true, @@ -52,7 +52,7 @@ "interval" : 10000, "mode" : "ProcTime" }, - "id" : 2, + "id" : 19, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -128,7 +128,7 @@ } } ], "condition" : null, - "id" : 3, + "id" : 20, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -170,7 +170,7 @@ } ], "aggCallNeedRetractions" : [ false ], "needRetraction" : false, - "id" : 4, + "id" : 21, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -188,13 +188,13 @@ }, { "count$0" : "BIGINT" }, { - "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] } ] }, "description" : "LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])" }, { "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", - "id" : 5, + "id" : 22, "inputProperties" : [ { "requiredDistribution" : { "type" : "HASH", @@ -213,7 +213,7 @@ }, { "count$0" : "BIGINT" }, { - "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] } ] }, "description" : "Exchange(distribution=[hash[a, $f2]])" @@ -251,7 +251,7 @@ } ] }, "partialAggNeedRetraction" : false, - "id" : 6, + "id" : 23, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -271,7 +271,7 @@ "description" : "IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0])" }, { "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", - "id" : 7, + "id" : 24, "inputProperties" : [ { "requiredDistribution" : { "type" : "HASH", @@ -324,7 +324,7 @@ }, "generateUpdateBefore" : true, "needRetraction" : false, - "id" : 8, + "id" : 25, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -361,7 +361,7 @@ } }, "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "id" : 9, + "id" : 26, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -381,57 +381,57 @@ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, $f1])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 18, + "target" : 19, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 19, + "target" : 20, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 20, + "target" : 21, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 21, + "target" : 22, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 5, - "target" : 6, + "source" : 22, + "target" : 23, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 6, - "target" : 7, + "source" : 23, + "target" : 24, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 7, - "target" : 8, + "source" : 24, + "target" : 25, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 8, - "target" : 9, + "source" : 25, + "target" : 26, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out index 267d2a4..328331f 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out @@ -377,17 +377,17 @@ "implementationClass" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatAcc", "attributes" : [ { "name" : "list", - "logicalType" : "RAW('org.apache.flink.table.api.dataview.ListView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3 [...] + "logicalType" : "RAW('org.apache.flink.table.api.dataview.ListView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3 [...] }, { "name" : "map", - "logicalType" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J [...] + "logicalType" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J [...] } ], "final" : true, "instantiable" : true, "comparision" : "NONE" } }, { - "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] }, { "$slice_end" : "BIGINT" } ] @@ -422,17 +422,17 @@ "implementationClass" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatAcc", "attributes" : [ { "name" : "list", - "logicalType" : "RAW('org.apache.flink.table.api.dataview.ListView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3 [...] + "logicalType" : "RAW('org.apache.flink.table.api.dataview.ListView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3 [...] }, { "name" : "map", - "logicalType" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J [...] + "logicalType" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J [...] } ], "final" : true, "instantiable" : true, "comparision" : "NONE" } }, { - "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw [...] }, { "$slice_end" : "BIGINT" } ] diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java index 9993bb3..a277659 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java @@ -59,15 +59,19 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { private final TypeSerializer<I> internalSerializer; + private final boolean isInternalInput; + private final boolean isReuseEnabled; private transient I reuse; private transient DataStructureConverter<I, E> converter; - private ExternalSerializer(DataType dataType, TypeSerializer<I> internalSerializer) { + private ExternalSerializer( + DataType dataType, TypeSerializer<I> internalSerializer, boolean isInternalInput) { this.dataType = dataType; this.internalSerializer = internalSerializer; + this.isInternalInput = isInternalInput; // if no data structures that use memory segments are exposed in the external data // structure, we can reuse intermediate internal data structures this.isReuseEnabled = !hasBinaryData(dataType); @@ -78,8 +82,15 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { * Creates an instance of a {@link ExternalSerializer} defined by the given {@link DataType}. */ public static <I, E> ExternalSerializer<I, E> of(DataType dataType) { + return of(dataType, false); + } + + /** + * Creates an instance of a {@link ExternalSerializer} defined by the given {@link DataType}. + */ + public static <I, E> ExternalSerializer<I, E> of(DataType dataType, boolean isInternalInput) { return new ExternalSerializer<>( - dataType, InternalSerializers.create(dataType.getLogicalType())); + dataType, InternalSerializers.create(dataType.getLogicalType()), isInternalInput); } @Override @@ -89,7 +100,7 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { @Override public TypeSerializer<E> duplicate() { - return new ExternalSerializer<>(dataType, internalSerializer.duplicate()); + return new ExternalSerializer<>(dataType, internalSerializer.duplicate(), isInternalInput); } @Override @@ -105,8 +116,14 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { } @Override + @SuppressWarnings("unchecked") public E copy(E from) { - final I internalFrom = converter.toInternal(from); + final I internalFrom; + if (isInternalInput) { + internalFrom = (I) from; + } else { + internalFrom = converter.toInternal(from); + } final I copy = internalSerializer.copy(internalFrom); return converter.toExternal(copy); } @@ -122,8 +139,14 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { } @Override + @SuppressWarnings("unchecked") public void serialize(E record, DataOutputView target) throws IOException { - final I internalRecord = converter.toInternal(record); + final I internalRecord; + if (isInternalInput) { + internalRecord = (I) record; + } else { + internalRecord = converter.toInternal(record); + } internalSerializer.serialize(internalRecord, target); } @@ -157,12 +180,14 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { return false; } ExternalSerializer<?, ?> that = (ExternalSerializer<?, ?>) o; - return dataType.equals(that.dataType) && internalSerializer.equals(that.internalSerializer); + return dataType.equals(that.dataType) + && internalSerializer.equals(that.internalSerializer) + && isInternalInput == that.isInternalInput; } @Override public int hashCode() { - return Objects.hash(dataType, internalSerializer); + return Objects.hash(dataType, internalSerializer, isInternalInput); } @Override @@ -213,6 +238,8 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { private DataType dataType; + private boolean isInternalInput; + public ExternalSerializerSnapshot() { super(ExternalSerializer.class); } @@ -220,6 +247,7 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { public ExternalSerializerSnapshot(ExternalSerializer<I, E> externalSerializer) { super(externalSerializer); this.dataType = externalSerializer.dataType; + this.isInternalInput = externalSerializer.isInternalInput; } @Override @@ -231,6 +259,7 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { protected void writeOuterSnapshot(DataOutputView out) throws IOException { final DataOutputViewStream stream = new DataOutputViewStream(out); InstantiationUtil.serializeObject(stream, dataType); + out.writeBoolean(isInternalInput); } @Override @@ -243,6 +272,7 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { } catch (ClassNotFoundException e) { throw new IOException(e); } + isInternalInput = in.readBoolean(); } @Override @@ -255,7 +285,8 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { @SuppressWarnings("unchecked") protected ExternalSerializer<I, E> createOuterSerializerWithNestedSerializers( TypeSerializer<?>[] nestedSerializers) { - return new ExternalSerializer<>(dataType, (TypeSerializer<I>) nestedSerializers[0]); + return new ExternalSerializer<>( + dataType, (TypeSerializer<I>) nestedSerializers[0], isInternalInput); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java index ad8e6e5..afe48d0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java @@ -64,20 +64,37 @@ public final class ExternalTypeInfo<T> extends TypeInformation<T> implements Dat * structures but serialized and deserialized into external data structures. */ public static <T> ExternalTypeInfo<T> of(DataType dataType) { - final TypeSerializer<T> serializer = createExternalTypeSerializer(dataType); + final TypeSerializer<T> serializer = createExternalTypeSerializer(dataType, false); + return new ExternalTypeInfo<>(dataType, serializer); + } + + /** + * Creates type information for a {@link DataType} that is possibly represented by internal data + * structures but serialized and deserialized into external data structures. + * + * @param isInternalInput allows for a non-bidirectional serializer from internal to external + */ + public static <T> ExternalTypeInfo<T> of(DataType dataType, boolean isInternalInput) { + final TypeSerializer<T> serializer = + createExternalTypeSerializer(dataType, isInternalInput); return new ExternalTypeInfo<>(dataType, serializer); } @SuppressWarnings("unchecked") - private static <T> TypeSerializer<T> createExternalTypeSerializer(DataType dataType) { + private static <T> TypeSerializer<T> createExternalTypeSerializer( + DataType dataType, boolean isInternalInput) { final LogicalType logicalType = dataType.getLogicalType(); - if (logicalType instanceof RawType) { + if (logicalType instanceof RawType && !isInternalInput) { final RawType<?> rawType = (RawType<?>) logicalType; if (dataType.getConversionClass() == rawType.getOriginatingClass()) { return (TypeSerializer<T>) rawType.getTypeSerializer(); } } - return ExternalSerializer.of(dataType); + // note: we can add more special cases in the future to make the serialization more + // efficient, for example we can translate to RowTypeInfo if we know that field types can + // be mapped to type information as well, the external serializer in its current shape is + // the most general serializer implementation for all conversion classes + return ExternalSerializer.of(dataType, isInternalInput); } // --------------------------------------------------------------------------------------------
