twalthr commented on a change in pull request #17441:
URL: https://github.com/apache/flink/pull/17441#discussion_r726925586



##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResultTest.java
##########
@@ -82,38 +109,53 @@ public void testLimitedSnapshot() throws Exception {
                         new String[] {"f0", "f1"},
                         new DataType[] {DataTypes.STRING(), 
DataTypes.BIGINT()});
 
+        DataStructureConverter<Object, Object> dataStructureConverter =
+                
DataStructureConverters.getConverter(schema.toPhysicalRowDataType());
+        Function<Row, RowData> converter =
+                row -> (RowData) dataStructureConverter.toInternalOrNull(row);
+
         try (TestMaterializedCollectBatchResult result =
                 new TestMaterializedCollectBatchResult(
                         new TestTableResult(ResultKind.SUCCESS_WITH_CONTENT, 
schema),
                         2, // limit the materialized table to 2 rows
                         3)) { // with 3 rows overcommitment
             result.isRetrieving = true;
 
-            result.processRecord(Row.of("D", 1));
-            result.processRecord(Row.of("A", 1));
-            result.processRecord(Row.of("B", 1));
-            result.processRecord(Row.of("A", 1));
+            result.processRecord(converter.apply(Row.of("D", 1)));

Review comment:
       instead of calling `converter.apply` we can introduce a helper method 
`toRowData`, this could even be put in a SQL Client test utils class

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
##########
@@ -466,8 +476,14 @@ private void executeStreamQueryTable(
                 IntStream.rangeClosed(1, result.getPayload())
                         .forEach(
                                 (page) -> {
-                                    for (Row row : 
executor.retrieveResultPage(resultID, page)) {
-                                        actualResults.add(row.toString());
+                                    for (RowData row :
+                                            
executor.retrieveResultPage(resultID, page)) {
+                                        actualResults.add(
+                                                StringUtils.arrayAwareToString(

Review comment:
       nit: Use `Arrays.toString` we don't expect nested arrays here

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StaticResultProvider.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.List;
+import java.util.function.Function;
+
+/** Create result provider from a static set of data using external types. */
+@Internal
+public class StaticResultProvider implements ResultProvider {

Review comment:
       package private?

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
##########
@@ -76,87 +83,91 @@ public void setUp() {
                         Column.physical("decimal(10, 5)", 
DataTypes.DECIMAL(10, 5)),
                         Column.physical("timestamp", DataTypes.TIMESTAMP(6)));
 
-        data = new ArrayList<>();
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        null,
-                        1,
-                        2,
-                        "abc",
-                        BigDecimal.valueOf(1.23),
-                        Timestamp.valueOf("2020-03-01 18:39:14")));
-        data.add(
-                Row.ofKind(
-                        RowKind.UPDATE_BEFORE,
-                        false,
-                        null,
-                        0,
-                        "",
-                        BigDecimal.valueOf(1),
-                        Timestamp.valueOf("2020-03-01 18:39:14.1")));
-        data.add(
-                Row.ofKind(
-                        RowKind.UPDATE_AFTER,
-                        true,
-                        Integer.MAX_VALUE,
-                        null,
-                        "abcdefg",
-                        BigDecimal.valueOf(1234567890),
-                        Timestamp.valueOf("2020-03-01 18:39:14.12")));
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        false,
-                        Integer.MIN_VALUE,
-                        Long.MAX_VALUE,
-                        null,
-                        BigDecimal.valueOf(12345.06789),
-                        Timestamp.valueOf("2020-03-01 18:39:14.123")));
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        true,
-                        100,
-                        Long.MIN_VALUE,
-                        "abcdefg111",
-                        null,
-                        Timestamp.valueOf("2020-03-01 18:39:14.123456")));
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        null,
-                        -1,
-                        -1,
-                        "abcdefghijklmnopqrstuvwxyz",
-                        BigDecimal.valueOf(-12345.06789),
-                        null));
-
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        null,
-                        -1,
-                        -1,
-                        "这是一段中文",
-                        BigDecimal.valueOf(-12345.06789),
-                        Timestamp.valueOf("2020-03-04 18:39:14")));
-
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        null,
-                        -1,
-                        -1,
-                        "これは日本語をテストするための文です",
-                        BigDecimal.valueOf(-12345.06789),
-                        Timestamp.valueOf("2020-03-04 18:39:14")));
-
-        streamingData = new ArrayList<>();
-        for (Row datum : data) {
-            Row row = Row.copy(datum);
-            streamingData.add(row);
-        }
+        List<Row> rows =
+                Arrays.asList(
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                1,
+                                2L,
+                                "abc",
+                                BigDecimal.valueOf(1.23),
+                                Timestamp.valueOf("2020-03-01 18:39:14")),
+                        Row.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                false,
+                                null,
+                                0L,
+                                "",
+                                BigDecimal.valueOf(1),
+                                Timestamp.valueOf("2020-03-01 18:39:14.1")),
+                        Row.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                Integer.MAX_VALUE,
+                                null,
+                                "abcdefg",
+                                BigDecimal.valueOf(12345),
+                                Timestamp.valueOf("2020-03-01 18:39:14.12")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                false,
+                                Integer.MIN_VALUE,
+                                Long.MAX_VALUE,
+                                null,
+                                BigDecimal.valueOf(12345.06789),
+                                Timestamp.valueOf("2020-03-01 18:39:14.123")),
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                true,
+                                100,
+                                Long.MIN_VALUE,
+                                "abcdefg111",
+                                null,
+                                Timestamp.valueOf("2020-03-01 
18:39:14.123456")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "abcdefghijklmnopqrstuvwxyz",
+                                BigDecimal.valueOf(-12345.06789),
+                                null),
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                -1,
+                                -1L,
+                                "这是一段中文",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "これは日本語をテストするための文です",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14")));
+
+        Function<Row, RowData> toInternalMapper =

Review comment:
       ideally, the data should have been serialized once to model the 
production behavior, because it will be unlikely that `GenericRowData` will be 
used as input for the view

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StaticResultProvider.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.List;
+import java.util.function.Function;
+
+/** Create result provider from a static set of data using external types. */
+@Internal
+public class StaticResultProvider implements ResultProvider {
+
+    private final List<Row> rows;
+    private final Function<Row, RowData> externalToInternalConverter;

Review comment:
       why not using internal directly?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
##########
@@ -269,6 +273,18 @@ private static Object formattedTimestamp(
                                 formattedTimestamp(array.get(i), elementType, 
sessionTimeZone);
                     }
                     return formattedArray;
+                } else if (field instanceof ArrayData) {

Review comment:
       a similar change should be necessary for nested `RowData`. how do 
structured types behave with this change?

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
##########
@@ -76,87 +83,91 @@ public void setUp() {
                         Column.physical("decimal(10, 5)", 
DataTypes.DECIMAL(10, 5)),
                         Column.physical("timestamp", DataTypes.TIMESTAMP(6)));
 
-        data = new ArrayList<>();
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        null,
-                        1,
-                        2,
-                        "abc",
-                        BigDecimal.valueOf(1.23),
-                        Timestamp.valueOf("2020-03-01 18:39:14")));
-        data.add(
-                Row.ofKind(
-                        RowKind.UPDATE_BEFORE,
-                        false,
-                        null,
-                        0,
-                        "",
-                        BigDecimal.valueOf(1),
-                        Timestamp.valueOf("2020-03-01 18:39:14.1")));
-        data.add(
-                Row.ofKind(
-                        RowKind.UPDATE_AFTER,
-                        true,
-                        Integer.MAX_VALUE,
-                        null,
-                        "abcdefg",
-                        BigDecimal.valueOf(1234567890),
-                        Timestamp.valueOf("2020-03-01 18:39:14.12")));
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        false,
-                        Integer.MIN_VALUE,
-                        Long.MAX_VALUE,
-                        null,
-                        BigDecimal.valueOf(12345.06789),
-                        Timestamp.valueOf("2020-03-01 18:39:14.123")));
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        true,
-                        100,
-                        Long.MIN_VALUE,
-                        "abcdefg111",
-                        null,
-                        Timestamp.valueOf("2020-03-01 18:39:14.123456")));
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        null,
-                        -1,
-                        -1,
-                        "abcdefghijklmnopqrstuvwxyz",
-                        BigDecimal.valueOf(-12345.06789),
-                        null));
-
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        null,
-                        -1,
-                        -1,
-                        "这是一段中文",
-                        BigDecimal.valueOf(-12345.06789),
-                        Timestamp.valueOf("2020-03-04 18:39:14")));
-
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        null,
-                        -1,
-                        -1,
-                        "これは日本語をテストするための文です",
-                        BigDecimal.valueOf(-12345.06789),
-                        Timestamp.valueOf("2020-03-04 18:39:14")));
-
-        streamingData = new ArrayList<>();
-        for (Row datum : data) {
-            Row row = Row.copy(datum);
-            streamingData.add(row);
-        }
+        List<Row> rows =
+                Arrays.asList(
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                1,
+                                2L,
+                                "abc",
+                                BigDecimal.valueOf(1.23),
+                                Timestamp.valueOf("2020-03-01 18:39:14")),
+                        Row.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                false,
+                                null,
+                                0L,
+                                "",
+                                BigDecimal.valueOf(1),
+                                Timestamp.valueOf("2020-03-01 18:39:14.1")),
+                        Row.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                Integer.MAX_VALUE,
+                                null,
+                                "abcdefg",
+                                BigDecimal.valueOf(12345),
+                                Timestamp.valueOf("2020-03-01 18:39:14.12")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                false,
+                                Integer.MIN_VALUE,
+                                Long.MAX_VALUE,
+                                null,
+                                BigDecimal.valueOf(12345.06789),
+                                Timestamp.valueOf("2020-03-01 18:39:14.123")),
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                true,
+                                100,
+                                Long.MIN_VALUE,
+                                "abcdefg111",
+                                null,
+                                Timestamp.valueOf("2020-03-01 
18:39:14.123456")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "abcdefghijklmnopqrstuvwxyz",
+                                BigDecimal.valueOf(-12345.06789),
+                                null),
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                -1,
+                                -1L,
+                                "这是一段中文",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "これは日本語をテストするための文です",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14")));
+
+        Function<Row, RowData> toInternalMapper =

Review comment:
       either use `DataStructureConverters` to improve the readability of the 
code or better update the test data to use `List<RowData>` directly

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TestTableResult.java
##########
@@ -97,4 +101,27 @@ public ResultKind getResultKind() {
     public void print() {
         // do nothing
     }
+
+    @Override
+    public CloseableIterator<RowData> collectInternal() {
+        DataStructureConverter<Object, Object> converter =

Review comment:
       call the `open()` method of the converter

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
##########
@@ -43,7 +42,7 @@
      * @return used session identifier to track the session.
      * @throws SqlExecutionException if any error happen
      */
-    String openSession(@Nullable String sessionId) throws 
SqlExecutionException;
+    String openSession(@javax.annotation.Nullable String sessionId) throws 
SqlExecutionException;

Review comment:
       undo this change

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/TestTableResult.java
##########
@@ -97,4 +101,27 @@ public ResultKind getResultKind() {
     public void print() {
         // do nothing
     }
+
+    @Override
+    public CloseableIterator<RowData> collectInternal() {
+        DataStructureConverter<Object, Object> converter =
+                
DataStructureConverters.getConverter(resolvedSchema.toPhysicalRowDataType());
+        return new CloseableIterator<RowData>() {

Review comment:
       nit: avoid anonymous inner classes and use static class instead

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResultTest.java
##########
@@ -44,34 +48,57 @@ public void testSnapshot() throws Exception {
                         new String[] {"f0", "f1"},
                         new DataType[] {DataTypes.STRING(), 
DataTypes.BIGINT()});
 
+        DataStructureConverter<Object, Object> dataStructureConverter =
+                
DataStructureConverters.getConverter(schema.toPhysicalRowDataType());
+        Function<Row, RowData> converter =
+                row -> (RowData) dataStructureConverter.toInternalOrNull(row);
+
         try (TestMaterializedCollectBatchResult result =
                 new TestMaterializedCollectBatchResult(
                         new TestTableResult(ResultKind.SUCCESS_WITH_CONTENT, 
schema),
                         Integer.MAX_VALUE)) {
 
             result.isRetrieving = true;
 
-            result.processRecord(Row.of("A", 1));
-            result.processRecord(Row.of("B", 1));
-            result.processRecord(Row.of("A", 1));
-            result.processRecord(Row.of("C", 2));
+            result.processRecord(converter.apply(Row.of("A", 1)));
+            result.processRecord(converter.apply(Row.of("B", 1)));
+            result.processRecord(converter.apply(Row.of("A", 1)));
+            result.processRecord(converter.apply(Row.of("C", 2)));
 
             assertEquals(TypedResult.payload(4), result.snapshot(1));
 
-            assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(1));
-            assertEquals(Collections.singletonList(Row.of("B", 1)), 
result.retrievePage(2));
-            assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(3));
-            assertEquals(Collections.singletonList(Row.of("C", 2)), 
result.retrievePage(4));
+            assertEquals(
+                    Collections.singletonList(converter.apply(Row.of("A", 1))),
+                    result.retrievePage(1));
+            assertEquals(
+                    Collections.singletonList(converter.apply(Row.of("B", 1))),
+                    result.retrievePage(2));
+            assertEquals(
+                    Collections.singletonList(converter.apply(Row.of("A", 1))),
+                    result.retrievePage(3));
+            assertEquals(
+                    Collections.singletonList(converter.apply(Row.of("C", 2))),
+                    result.retrievePage(4));
 
-            result.processRecord(Row.of("A", 1));
+            result.processRecord(converter.apply(Row.of("A", 1)));
 
             assertEquals(TypedResult.payload(5), result.snapshot(1));
 
-            assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(1));
-            assertEquals(Collections.singletonList(Row.of("B", 1)), 
result.retrievePage(2));
-            assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(3));
-            assertEquals(Collections.singletonList(Row.of("C", 2)), 
result.retrievePage(4));
-            assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(5));
+            assertEquals(
+                    Collections.singletonList(converter.apply(Row.of("A", 1))),

Review comment:
       instead of calling 
`assertEquals(Collections.singletonList(converter.apply(Row.of` many times, we 
can introduce a helper method that makes the code more readable

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/InsertResultProvider.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.util.NoSuchElementException;
+import java.util.function.Supplier;
+
+/** A {@link ResultProvider} with custom wait logic for insert operation 
result. */
+@Internal
+class InsertResultProvider implements ResultProvider {
+
+    private final Long[] affectedRowCountsRow;
+
+    @Nullable private Boolean hasNext = null;

Review comment:
       nit: for team consistency, can we write this as `private @Nullable 
Boolean hasNext`, next to the data type

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
##########
@@ -145,12 +138,17 @@ public ResultKind getResultKind() {
 
     @Override
     public CloseableIterator<Row> collect() {
-        return data;
+        return resultProvider.toExternalIterator();
+    }
+
+    @Override
+    public CloseableIterator<RowData> collectInternal() {
+        return resultProvider.toInternalIterator();
     }
 
     @Override
     public void print() {
-        Iterator<Row> it = collect();
+        Iterator<RowData> it = this.resultProvider.toInternalIterator();

Review comment:
       nit: `this.` can be omitted

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ResultProvider.java
##########
@@ -21,19 +21,39 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
 /**
  * An internal class which helps the client to get the execute result from a 
specific sink.
  *
+ * <p>The two iterators are exclusive, that is you can use only one of the two 
at the same time.
+ *
  * <p>This class is generated by specific sink and brings the result info to a 
{@link TableResult}.
  */
 @Internal
-public interface CollectResultProvider {
+public interface ResultProvider {
     /** Set the job client associated with the select job to retrieve the 
result. */
-    void setJobClient(JobClient jobClient);
+    ResultProvider setJobClient(JobClient jobClient);

Review comment:
       why has the result type changed from `void` to `ResultProvider`?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1462,7 +1478,17 @@ private TableResult buildDescribeResult(ResolvedSchema 
schema) {
         return new String[] {"name", "type", "null", "key", "extras", 
"watermark"};
     }
 
-    private TableResult buildShowColumnsResult(
+    private static RowData generateTableColumnsDataTypesConverter(Row row) {
+        return GenericRowData.of(

Review comment:
       don't create a `Row` in the first place

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
##########
@@ -124,17 +117,86 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
 
     @Override
     public DynamicTableSink copy() {
-        final CollectDynamicSink copy =
-                new CollectDynamicSink(
-                        tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
-        // kind of violates the contract of copy() but should not harm
-        // as it is null during optimization anyway until physical translation
-        copy.iterator = iterator;
-        return copy;
+        return new CollectDynamicSink(
+                tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
     }
 
     @Override
     public String asSummaryString() {
         return String.format("TableToCollect(type=%s)", consumedDataType);
     }
+
+    private final class CollectResultProvider implements ResultProvider {
+
+        private CloseableRowIteratorWrapper<RowData> rowDataIterator;
+        private CloseableRowIteratorWrapper<Row> rowIterator;
+
+        private void initialize() {
+            if (this.rowIterator == null) {
+                this.rowDataIterator =
+                        new CloseableRowIteratorWrapper<>(iterator, 
Function.identity());
+                this.rowIterator =
+                        new CloseableRowIteratorWrapper<>(
+                                iterator, r -> (Row) converter.toExternal(r));
+            }
+        }
+
+        @Override
+        public ResultProvider setJobClient(JobClient jobClient) {
+            iterator.setJobClient(jobClient);
+            return this;
+        }
+
+        @Override
+        public CloseableIterator<RowData> toInternalIterator() {
+            initialize();
+            return this.rowDataIterator;
+        }
+
+        @Override
+        public CloseableIterator<Row> toExternalIterator() {
+            initialize();
+            return this.rowIterator;
+        }
+
+        @Override
+        public boolean isFirstRowReady() {
+            initialize();
+            return this.rowDataIterator.firstRowProcessed
+                    || this.rowIterator.firstRowProcessed
+                    || iterator.hasNext();
+        }
+    }
+
+    private static final class CloseableRowIteratorWrapper<T> implements 
CloseableIterator<T> {
+        private final CloseableIterator<RowData> iterator;
+        private final Function<RowData, T> mapper;

Review comment:
       we should avoid too many generic `Function<RowData, T>`, here we could 
simple use our `DataStructureConverter` class

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
##########
@@ -124,17 +117,86 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
 
     @Override
     public DynamicTableSink copy() {
-        final CollectDynamicSink copy =
-                new CollectDynamicSink(
-                        tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
-        // kind of violates the contract of copy() but should not harm
-        // as it is null during optimization anyway until physical translation
-        copy.iterator = iterator;
-        return copy;
+        return new CollectDynamicSink(
+                tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
     }
 
     @Override
     public String asSummaryString() {
         return String.format("TableToCollect(type=%s)", consumedDataType);
     }
+
+    private final class CollectResultProvider implements ResultProvider {
+
+        private CloseableRowIteratorWrapper<RowData> rowDataIterator;
+        private CloseableRowIteratorWrapper<Row> rowIterator;
+
+        private void initialize() {
+            if (this.rowIterator == null) {
+                this.rowDataIterator =
+                        new CloseableRowIteratorWrapper<>(iterator, 
Function.identity());
+                this.rowIterator =
+                        new CloseableRowIteratorWrapper<>(
+                                iterator, r -> (Row) converter.toExternal(r));
+            }
+        }
+
+        @Override
+        public ResultProvider setJobClient(JobClient jobClient) {
+            iterator.setJobClient(jobClient);
+            return this;
+        }
+
+        @Override
+        public CloseableIterator<RowData> toInternalIterator() {
+            initialize();
+            return this.rowDataIterator;
+        }
+
+        @Override
+        public CloseableIterator<Row> toExternalIterator() {
+            initialize();
+            return this.rowIterator;
+        }
+
+        @Override
+        public boolean isFirstRowReady() {
+            initialize();
+            return this.rowDataIterator.firstRowProcessed
+                    || this.rowIterator.firstRowProcessed
+                    || iterator.hasNext();
+        }
+    }
+
+    private static final class CloseableRowIteratorWrapper<T> implements 
CloseableIterator<T> {
+        private final CloseableIterator<RowData> iterator;
+        private final Function<RowData, T> mapper;

Review comment:
       the converter needs to be opened on the client side

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
##########
@@ -76,87 +83,91 @@ public void setUp() {
                         Column.physical("decimal(10, 5)", 
DataTypes.DECIMAL(10, 5)),
                         Column.physical("timestamp", DataTypes.TIMESTAMP(6)));
 
-        data = new ArrayList<>();
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        null,
-                        1,
-                        2,
-                        "abc",
-                        BigDecimal.valueOf(1.23),
-                        Timestamp.valueOf("2020-03-01 18:39:14")));
-        data.add(
-                Row.ofKind(
-                        RowKind.UPDATE_BEFORE,
-                        false,
-                        null,
-                        0,
-                        "",
-                        BigDecimal.valueOf(1),
-                        Timestamp.valueOf("2020-03-01 18:39:14.1")));
-        data.add(
-                Row.ofKind(
-                        RowKind.UPDATE_AFTER,
-                        true,
-                        Integer.MAX_VALUE,
-                        null,
-                        "abcdefg",
-                        BigDecimal.valueOf(1234567890),
-                        Timestamp.valueOf("2020-03-01 18:39:14.12")));
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        false,
-                        Integer.MIN_VALUE,
-                        Long.MAX_VALUE,
-                        null,
-                        BigDecimal.valueOf(12345.06789),
-                        Timestamp.valueOf("2020-03-01 18:39:14.123")));
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        true,
-                        100,
-                        Long.MIN_VALUE,
-                        "abcdefg111",
-                        null,
-                        Timestamp.valueOf("2020-03-01 18:39:14.123456")));
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        null,
-                        -1,
-                        -1,
-                        "abcdefghijklmnopqrstuvwxyz",
-                        BigDecimal.valueOf(-12345.06789),
-                        null));
-
-        data.add(
-                Row.ofKind(
-                        RowKind.INSERT,
-                        null,
-                        -1,
-                        -1,
-                        "这是一段中文",
-                        BigDecimal.valueOf(-12345.06789),
-                        Timestamp.valueOf("2020-03-04 18:39:14")));
-
-        data.add(
-                Row.ofKind(
-                        RowKind.DELETE,
-                        null,
-                        -1,
-                        -1,
-                        "これは日本語をテストするための文です",
-                        BigDecimal.valueOf(-12345.06789),
-                        Timestamp.valueOf("2020-03-04 18:39:14")));
-
-        streamingData = new ArrayList<>();
-        for (Row datum : data) {
-            Row row = Row.copy(datum);
-            streamingData.add(row);
-        }
+        List<Row> rows =
+                Arrays.asList(
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                1,
+                                2L,
+                                "abc",
+                                BigDecimal.valueOf(1.23),
+                                Timestamp.valueOf("2020-03-01 18:39:14")),
+                        Row.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                false,
+                                null,
+                                0L,
+                                "",
+                                BigDecimal.valueOf(1),
+                                Timestamp.valueOf("2020-03-01 18:39:14.1")),
+                        Row.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                Integer.MAX_VALUE,
+                                null,
+                                "abcdefg",
+                                BigDecimal.valueOf(12345),
+                                Timestamp.valueOf("2020-03-01 18:39:14.12")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                false,
+                                Integer.MIN_VALUE,
+                                Long.MAX_VALUE,
+                                null,
+                                BigDecimal.valueOf(12345.06789),
+                                Timestamp.valueOf("2020-03-01 18:39:14.123")),
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                true,
+                                100,
+                                Long.MIN_VALUE,
+                                "abcdefg111",
+                                null,
+                                Timestamp.valueOf("2020-03-01 
18:39:14.123456")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "abcdefghijklmnopqrstuvwxyz",
+                                BigDecimal.valueOf(-12345.06789),
+                                null),
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                -1,
+                                -1L,
+                                "这是一段中文",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14")),
+                        Row.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "これは日本語をテストするための文です",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14")));
+
+        Function<Row, RowData> toInternalMapper =

Review comment:
       also test a more complex type such as array, map, or nested row

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1298,7 +1304,12 @@ public TableResult executeInternal(Operation operation) {
         }
     }
 
-    private TableResult createCatalog(CreateCatalogOperation operation) {
+    private static RowData convertOneColumnStringRow(Row row) {

Review comment:
       don't create a `Row` in the first place




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to