This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29c81a0b1d3e3ae27eef386001db4db6e4a6b4ca
Author: Timo Walther <[email protected]>
AuthorDate: Mon Apr 12 15:15:14 2021 +0200

    [FLINK-20613][table-planner-blink] Support unidirectional ExternalSerializer
---
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out |  8 ++--
 .../testIncrementalAggregate.out                   | 54 +++++++++++-----------
 .../testEventTimeTumbleWindow.out                  | 12 ++---
 .../runtime/typeutils/ExternalSerializer.java      | 47 +++++++++++++++----
 .../table/runtime/typeutils/ExternalTypeInfo.java  | 25 ++++++++--
 5 files changed, 97 insertions(+), 49 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
index b543285..5768acf 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
@@ -262,9 +262,9 @@
       }, {
         "count$5" : "BIGINT"
       }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       }, {
-        "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       } ]
     },
     "description" : "LocalGroupAggregate(groupBy=[c], select=[c, 
COUNT(distinct$0 a) FILTER $f2 AS count$0, COUNT(distinct$0 a) AS count$1, 
SUM(distinct$0 a) AS sum$2, SUM(distinct$1 b) AS sum$3, AVG(b) AS (sum$4, 
count$5), DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1])"
@@ -297,9 +297,9 @@
       }, {
         "count$5" : "BIGINT"
       }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       }, {
-        "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$1" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       } ]
     },
     "description" : "Exchange(distribution=[hash[c]])"
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
index 7d1af16..a54cb42 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
@@ -34,7 +34,7 @@
         }
       } ]
     },
-    "id" : 1,
+    "id" : 18,
     "outputType" : {
       "type" : "ROW",
       "nullable" : true,
@@ -52,7 +52,7 @@
       "interval" : 10000,
       "mode" : "ProcTime"
     },
-    "id" : 2,
+    "id" : 19,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -128,7 +128,7 @@
       }
     } ],
     "condition" : null,
-    "id" : 3,
+    "id" : 20,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -170,7 +170,7 @@
     } ],
     "aggCallNeedRetractions" : [ false ],
     "needRetraction" : false,
-    "id" : 4,
+    "id" : 21,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -188,13 +188,13 @@
       }, {
         "count$0" : "BIGINT"
       }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       } ]
     },
     "description" : "LocalGroupAggregate(groupBy=[a, $f2], 
partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, 
DISTINCT(c) AS distinct$0])"
   }, {
     "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
-    "id" : 5,
+    "id" : 22,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "HASH",
@@ -213,7 +213,7 @@
       }, {
         "count$0" : "BIGINT"
       }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       } ]
     },
     "description" : "Exchange(distribution=[hash[a, $f2]])"
@@ -251,7 +251,7 @@
       } ]
     },
     "partialAggNeedRetraction" : false,
-    "id" : 6,
+    "id" : 23,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -271,7 +271,7 @@
     "description" : "IncrementalGroupAggregate(partialAggGrouping=[a, $f2], 
finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0])"
   }, {
     "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
-    "id" : 7,
+    "id" : 24,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "HASH",
@@ -324,7 +324,7 @@
     },
     "generateUpdateBefore" : true,
     "needRetraction" : false,
-    "id" : 8,
+    "id" : 25,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -361,7 +361,7 @@
       }
     },
     "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "id" : 9,
+    "id" : 26,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -381,57 +381,57 @@
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, $f1])"
   } ],
   "edges" : [ {
-    "source" : 1,
-    "target" : 2,
+    "source" : 18,
+    "target" : 19,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 2,
-    "target" : 3,
+    "source" : 19,
+    "target" : 20,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 3,
-    "target" : 4,
+    "source" : 20,
+    "target" : 21,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 4,
-    "target" : 5,
+    "source" : 21,
+    "target" : 22,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 5,
-    "target" : 6,
+    "source" : 22,
+    "target" : 23,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 6,
-    "target" : 7,
+    "source" : 23,
+    "target" : 24,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 7,
-    "target" : 8,
+    "source" : 24,
+    "target" : 25,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 8,
-    "target" : 9,
+    "source" : 25,
+    "target" : 26,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index 267d2a4..328331f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -377,17 +377,17 @@
           "implementationClass" : 
"org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatAcc",
           "attributes" : [ {
             "name" : "list",
-            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.ListView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3
 [...]
+            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.ListView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3
 [...]
           }, {
             "name" : "map",
-            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J
 [...]
+            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J
 [...]
           } ],
           "final" : true,
           "instantiable" : true,
           "comparision" : "NONE"
         }
       }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       }, {
         "$slice_end" : "BIGINT"
       } ]
@@ -422,17 +422,17 @@
           "implementationClass" : 
"org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatAcc",
           "attributes" : [ {
             "name" : "list",
-            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.ListView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3
 [...]
+            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.ListView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3
 [...]
           }, {
             "name" : "map",
-            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J
 [...]
+            "logicalType" : 
"RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3J
 [...]
           } ],
           "final" : true,
           "instantiable" : true,
           "comparision" : "NONE"
         }
       }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
+        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
       }, {
         "$slice_end" : "BIGINT"
       } ]
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
index 9993bb3..a277659 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
@@ -59,15 +59,19 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
 
     private final TypeSerializer<I> internalSerializer;
 
+    private final boolean isInternalInput;
+
     private final boolean isReuseEnabled;
 
     private transient I reuse;
 
     private transient DataStructureConverter<I, E> converter;
 
-    private ExternalSerializer(DataType dataType, TypeSerializer<I> 
internalSerializer) {
+    private ExternalSerializer(
+            DataType dataType, TypeSerializer<I> internalSerializer, boolean 
isInternalInput) {
         this.dataType = dataType;
         this.internalSerializer = internalSerializer;
+        this.isInternalInput = isInternalInput;
         // if no data structures that use memory segments are exposed in the 
external data
         // structure, we can reuse intermediate internal data structures
         this.isReuseEnabled = !hasBinaryData(dataType);
@@ -78,8 +82,15 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
      * Creates an instance of a {@link ExternalSerializer} defined by the 
given {@link DataType}.
      */
     public static <I, E> ExternalSerializer<I, E> of(DataType dataType) {
+        return of(dataType, false);
+    }
+
+    /**
+     * Creates an instance of a {@link ExternalSerializer} defined by the 
given {@link DataType}.
+     */
+    public static <I, E> ExternalSerializer<I, E> of(DataType dataType, 
boolean isInternalInput) {
         return new ExternalSerializer<>(
-                dataType, 
InternalSerializers.create(dataType.getLogicalType()));
+                dataType, 
InternalSerializers.create(dataType.getLogicalType()), isInternalInput);
     }
 
     @Override
@@ -89,7 +100,7 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
 
     @Override
     public TypeSerializer<E> duplicate() {
-        return new ExternalSerializer<>(dataType, 
internalSerializer.duplicate());
+        return new ExternalSerializer<>(dataType, 
internalSerializer.duplicate(), isInternalInput);
     }
 
     @Override
@@ -105,8 +116,14 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public E copy(E from) {
-        final I internalFrom = converter.toInternal(from);
+        final I internalFrom;
+        if (isInternalInput) {
+            internalFrom = (I) from;
+        } else {
+            internalFrom = converter.toInternal(from);
+        }
         final I copy = internalSerializer.copy(internalFrom);
         return converter.toExternal(copy);
     }
@@ -122,8 +139,14 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void serialize(E record, DataOutputView target) throws IOException {
-        final I internalRecord = converter.toInternal(record);
+        final I internalRecord;
+        if (isInternalInput) {
+            internalRecord = (I) record;
+        } else {
+            internalRecord = converter.toInternal(record);
+        }
         internalSerializer.serialize(internalRecord, target);
     }
 
@@ -157,12 +180,14 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
             return false;
         }
         ExternalSerializer<?, ?> that = (ExternalSerializer<?, ?>) o;
-        return dataType.equals(that.dataType) && 
internalSerializer.equals(that.internalSerializer);
+        return dataType.equals(that.dataType)
+                && internalSerializer.equals(that.internalSerializer)
+                && isInternalInput == that.isInternalInput;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(dataType, internalSerializer);
+        return Objects.hash(dataType, internalSerializer, isInternalInput);
     }
 
     @Override
@@ -213,6 +238,8 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
 
         private DataType dataType;
 
+        private boolean isInternalInput;
+
         public ExternalSerializerSnapshot() {
             super(ExternalSerializer.class);
         }
@@ -220,6 +247,7 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
         public ExternalSerializerSnapshot(ExternalSerializer<I, E> 
externalSerializer) {
             super(externalSerializer);
             this.dataType = externalSerializer.dataType;
+            this.isInternalInput = externalSerializer.isInternalInput;
         }
 
         @Override
@@ -231,6 +259,7 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
         protected void writeOuterSnapshot(DataOutputView out) throws 
IOException {
             final DataOutputViewStream stream = new DataOutputViewStream(out);
             InstantiationUtil.serializeObject(stream, dataType);
+            out.writeBoolean(isInternalInput);
         }
 
         @Override
@@ -243,6 +272,7 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
             } catch (ClassNotFoundException e) {
                 throw new IOException(e);
             }
+            isInternalInput = in.readBoolean();
         }
 
         @Override
@@ -255,7 +285,8 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
         @SuppressWarnings("unchecked")
         protected ExternalSerializer<I, E> 
createOuterSerializerWithNestedSerializers(
                 TypeSerializer<?>[] nestedSerializers) {
-            return new ExternalSerializer<>(dataType, (TypeSerializer<I>) 
nestedSerializers[0]);
+            return new ExternalSerializer<>(
+                    dataType, (TypeSerializer<I>) nestedSerializers[0], 
isInternalInput);
         }
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
index ad8e6e5..afe48d0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalTypeInfo.java
@@ -64,20 +64,37 @@ public final class ExternalTypeInfo<T> extends 
TypeInformation<T> implements Dat
      * structures but serialized and deserialized into external data 
structures.
      */
     public static <T> ExternalTypeInfo<T> of(DataType dataType) {
-        final TypeSerializer<T> serializer = 
createExternalTypeSerializer(dataType);
+        final TypeSerializer<T> serializer = 
createExternalTypeSerializer(dataType, false);
+        return new ExternalTypeInfo<>(dataType, serializer);
+    }
+
+    /**
+     * Creates type information for a {@link DataType} that is possibly 
represented by internal data
+     * structures but serialized and deserialized into external data 
structures.
+     *
+     * @param isInternalInput allows for a non-bidirectional serializer from 
internal to external
+     */
+    public static <T> ExternalTypeInfo<T> of(DataType dataType, boolean 
isInternalInput) {
+        final TypeSerializer<T> serializer =
+                createExternalTypeSerializer(dataType, isInternalInput);
         return new ExternalTypeInfo<>(dataType, serializer);
     }
 
     @SuppressWarnings("unchecked")
-    private static <T> TypeSerializer<T> createExternalTypeSerializer(DataType 
dataType) {
+    private static <T> TypeSerializer<T> createExternalTypeSerializer(
+            DataType dataType, boolean isInternalInput) {
         final LogicalType logicalType = dataType.getLogicalType();
-        if (logicalType instanceof RawType) {
+        if (logicalType instanceof RawType && !isInternalInput) {
             final RawType<?> rawType = (RawType<?>) logicalType;
             if (dataType.getConversionClass() == 
rawType.getOriginatingClass()) {
                 return (TypeSerializer<T>) rawType.getTypeSerializer();
             }
         }
-        return ExternalSerializer.of(dataType);
+        // note: we can add more special cases in the future to make the 
serialization more
+        // efficient, for example we can translate to RowTypeInfo if we know 
that field types can
+        // be mapped to type information as well, the external serializer in 
its current shape is
+        // the most general serializer implementation for all conversion 
classes
+        return ExternalSerializer.of(dataType, isInternalInput);
     }
 
     // 
--------------------------------------------------------------------------------------------

Reply via email to