twalthr commented on a change in pull request #17441:
URL: https://github.com/apache/flink/pull/17441#discussion_r731066521



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
##########
@@ -115,6 +105,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
                                     externalSerializer,
                                     accumulatorName,
                                     checkpointConfig);
+                    this.converter = 
context.createDataStructureConverter(consumedDataType);
+                    this.converter.open(
+                            
RuntimeConverter.Context.create(config.getClass().getClassLoader()));

Review comment:
       ideally, the classloader of the session (table environment) is used 
here. currently, we don't have a central way of declaring the classloader in 
`EnvironmentSettings` when initializing the environment but this might happen 
soon. for now, we should use `Thread.currentThread().getContextClassloader()`. 
But we should prepare the stack already to pass the classloader from table 
environment to this location? If this is too complicated here, I would call the 
`open()` later.

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
##########
@@ -437,10 +463,28 @@ private ResolvedSchema getSchema() {
                         RowKind.DELETE,
                         null,
                         -1,
-                        -1,
+                        -1L,
                         "これは日本語をテストするための文です",
                         BigDecimal.valueOf(-12345.06789),
                         Timestamp.valueOf("2020-03-04 18:39:14")));
-        return data;
+        return data.stream()

Review comment:
       use also `DataStructureConverter` here, similar to the other test with 
addressed feedback

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
##########
@@ -124,17 +117,86 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
 
     @Override
     public DynamicTableSink copy() {
-        final CollectDynamicSink copy =
-                new CollectDynamicSink(
-                        tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
-        // kind of violates the contract of copy() but should not harm
-        // as it is null during optimization anyway until physical translation
-        copy.iterator = iterator;
-        return copy;
+        return new CollectDynamicSink(
+                tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
     }
 
     @Override
     public String asSummaryString() {
         return String.format("TableToCollect(type=%s)", consumedDataType);
     }
+
+    private final class CollectResultProvider implements ResultProvider {
+
+        private CloseableRowIteratorWrapper<RowData> rowDataIterator;
+        private CloseableRowIteratorWrapper<Row> rowIterator;
+
+        private void initialize() {
+            if (this.rowIterator == null) {
+                this.rowDataIterator =
+                        new CloseableRowIteratorWrapper<>(iterator, 
Function.identity());
+                this.rowIterator =
+                        new CloseableRowIteratorWrapper<>(
+                                iterator, r -> (Row) converter.toExternal(r));
+            }
+        }
+
+        @Override
+        public ResultProvider setJobClient(JobClient jobClient) {
+            iterator.setJobClient(jobClient);
+            return this;
+        }
+
+        @Override
+        public CloseableIterator<RowData> toInternalIterator() {
+            initialize();

Review comment:
       how about we introduce dedicated `initializeRowDataIterator()` and 
`initializeRowIterator()`? instead of instantiating both where one is unused.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/BaseMaterializedResultTest.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class BaseMaterializedResultTest {
+
+    static Function<Row, BinaryRowData> 
createInternalBinaryRowDataConverter(DataType dataType) {
+        DataStructureConverter<Object, Object> converter =
+                DataStructureConverters.getConverter(dataType);
+        RowDataSerializer serializer = new RowDataSerializer((RowType) 
dataType.getLogicalType());
+
+        return row -> serializer.toBinaryRow((RowData) 
converter.toInternalOrNull(row)).copy();
+    }
+
+    static void assertRowEquals(
+            List<Row> expected,
+            List<RowData> actual,
+            DataStructureConverter<Object, Object> converter) {

Review comment:
       use generics already here? `DataStructureConverter<Row, RowData>`

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResultTest.java
##########
@@ -80,41 +97,55 @@ public void testLimitedSnapshot() throws Exception {
         ResolvedSchema schema =
                 ResolvedSchema.physical(
                         new String[] {"f0", "f1"},
-                        new DataType[] {DataTypes.STRING(), 
DataTypes.BIGINT()});
+                        new DataType[] {DataTypes.STRING(), DataTypes.INT()});
+
+        DataStructureConverter<Object, Object> dataStructureConverter =

Review comment:
       cast here already to `DataStructureConverter<Row, RowData>`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to