slinkydeveloper commented on a change in pull request #17441:
URL: https://github.com/apache/flink/pull/17441#discussion_r727086466



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
##########
@@ -124,17 +117,86 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
 
     @Override
     public DynamicTableSink copy() {
-        final CollectDynamicSink copy =
-                new CollectDynamicSink(
-                        tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
-        // kind of violates the contract of copy() but should not harm
-        // as it is null during optimization anyway until physical translation
-        copy.iterator = iterator;
-        return copy;
+        return new CollectDynamicSink(
+                tableIdentifier, consumedDataType, maxBatchSize, 
socketTimeout);
     }
 
     @Override
     public String asSummaryString() {
         return String.format("TableToCollect(type=%s)", consumedDataType);
     }
+
+    private final class CollectResultProvider implements ResultProvider {
+
+        private CloseableRowIteratorWrapper<RowData> rowDataIterator;
+        private CloseableRowIteratorWrapper<Row> rowIterator;
+
+        private void initialize() {
+            if (this.rowIterator == null) {
+                this.rowDataIterator =
+                        new CloseableRowIteratorWrapper<>(iterator, 
Function.identity());
+                this.rowIterator =
+                        new CloseableRowIteratorWrapper<>(
+                                iterator, r -> (Row) converter.toExternal(r));
+            }
+        }
+
+        @Override
+        public ResultProvider setJobClient(JobClient jobClient) {
+            iterator.setJobClient(jobClient);
+            return this;
+        }
+
+        @Override
+        public CloseableIterator<RowData> toInternalIterator() {
+            initialize();
+            return this.rowDataIterator;
+        }
+
+        @Override
+        public CloseableIterator<Row> toExternalIterator() {
+            initialize();
+            return this.rowIterator;
+        }
+
+        @Override
+        public boolean isFirstRowReady() {
+            initialize();
+            return this.rowDataIterator.firstRowProcessed
+                    || this.rowIterator.firstRowProcessed
+                    || iterator.hasNext();
+        }
+    }
+
+    private static final class CloseableRowIteratorWrapper<T> implements 
CloseableIterator<T> {
+        private final CloseableIterator<RowData> iterator;
+        private final Function<RowData, T> mapper;

Review comment:
       In this particular case, I can't use `DataStructureConverter` because 
this class is used in two cases: for `Row` iteration (where I need to invoke 
the converter) and for `RowData` iteration (where I don't need any conversion, 
hence the function is the identity).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to