This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6e08cd6e9229d3d313bbae2a8e4b894e0854be8
Author: Timo Walther <[email protected]>
AuthorDate: Mon Apr 12 15:20:59 2021 +0200

    [FLINK-20613][table] Update TableResult.collect() to the new type system
    
    This closes #15588.
---
 .../flink/table/runtime/arrow/ArrowUtils.java      |  18 +--
 ...ultProvider.java => CollectResultProvider.java} |   5 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  13 +-
 ...kOperation.java => CollectModifyOperation.java} |  33 +++--
 .../table/operations/ModifyOperationVisitor.java   |   2 +-
 .../planner/connectors/BatchSelectTableSink.java   |  45 -------
 .../planner/connectors/CollectDynamicSink.java     | 124 +++++++++++++++++++
 .../table/planner/connectors/DynamicSinkUtils.java |  48 +++++++-
 .../planner/connectors/DynamicSourceUtils.java     |   2 +-
 .../planner/connectors/ExternalDynamicSink.java    |   2 +-
 ...alCatalogTable.java => InlineCatalogTable.java} |  22 ++--
 .../planner/connectors/SelectTableSinkBase.java    | 137 ---------------------
 .../connectors/SelectTableSinkSchemaConverter.java |  71 -----------
 .../planner/connectors/StreamSelectTableSink.java  |  60 ---------
 .../table/planner/delegation/BatchPlanner.scala    |   7 +-
 .../table/planner/delegation/PlannerBase.scala     |  35 +-----
 .../table/planner/delegation/StreamPlanner.scala   |   7 +-
 .../runtime/stream/sql/DataStreamJavaITCase.java   |  48 +++++++-
 .../runtime/batch/table/AggregationITCase.scala    |  14 +--
 .../planner/runtime/batch/table/CalcITCase.scala   |   8 +-
 .../runtime/batch/table/DecimalITCase.scala        |   4 +-
 .../flink/table/sinks/BatchSelectTableSink.java    |   6 +-
 .../flink/table/sinks/StreamSelectTableSink.java   |   6 +-
 .../apache/flink/table/planner/StreamPlanner.scala |   2 +-
 24 files changed, 299 insertions(+), 420 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index 3302e343..f381bf3 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.internal.BatchTableEnvImpl;
 import org.apache.flink.table.api.internal.TableEnvImpl;
@@ -35,7 +34,6 @@ import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.data.vector.ColumnVector;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import 
org.apache.flink.table.planner.connectors.SelectTableSinkSchemaConverter;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
 import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
@@ -657,7 +655,9 @@ public final class ArrowUtils {
         checkArrowUsable();
         BufferAllocator allocator =
                 
getRootAllocator().newChildAllocator("collectAsPandasDataFrame", 0, 
Long.MAX_VALUE);
-        RowType rowType = (RowType) 
table.getSchema().toRowDataType().getLogicalType();
+        RowType rowType =
+                (RowType) 
table.getResolvedSchema().toSourceRowDataType().getLogicalType();
+        DataType defaultRowDataType = 
TypeConversions.fromLogicalToDataType(rowType);
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), 
allocator);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -685,19 +685,9 @@ public final class ArrowUtils {
 
                         @Override
                         public RowData next() {
-                            // The SelectTableSink of blink planner will 
convert the table schema
-                            // and we
-                            // need to keep the table schema used here be 
consistent with the
-                            // converted table schema
-                            TableSchema convertedTableSchema =
-                                    SelectTableSinkSchemaConverter
-                                            
.convertTimeAttributeToRegularTimestamp(
-                                                    
SelectTableSinkSchemaConverter
-                                                            
.changeDefaultConversionClass(
-                                                                    
table.getSchema()));
                             DataFormatConverters.DataFormatConverter converter 
=
                                     
DataFormatConverters.getConverterForDataType(
-                                            
convertedTableSchema.toRowDataType());
+                                            defaultRowDataType);
                             return (RowData) 
converter.toInternal(appendOnlyResults.next());
                         }
                     };
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CollectResultProvider.java
similarity index 92%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CollectResultProvider.java
index 087ed39..e3ec02a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CollectResultProvider.java
@@ -20,16 +20,17 @@ package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
 /**
  * An internal class which helps the client to get the execute result from a 
specific sink.
  *
- * <p>This class is generated by specific sink and brings the result info to a 
TableResult.
+ * <p>This class is generated by specific sink and brings the result info to a 
{@link TableResult}.
  */
 @Internal
-public interface SelectResultProvider {
+public interface CollectResultProvider {
     /** Set the job client associated with the select job to retrieve the 
result. */
     void setJobClient(JobClient jobClient);
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1259cdf..730b7ac 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -87,6 +87,7 @@ import org.apache.flink.table.module.ModuleEntry;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.CollectModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
@@ -94,7 +95,6 @@ import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.SelectSinkOperation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
 import org.apache.flink.table.operations.ShowCurrentCatalogOperation;
 import org.apache.flink.table.operations.ShowCurrentDatabaseOperation;
@@ -777,14 +777,21 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     }
 
     private TableResult executeQueryOperation(QueryOperation operation) {
-        SelectSinkOperation sinkOperation = new SelectSinkOperation(operation);
+        final UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(
+                        "Unregistered_Collect_Sink_" + 
CollectModifyOperation.getUniqueId());
+        final ObjectIdentifier objectIdentifier =
+                catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        CollectModifyOperation sinkOperation =
+                new CollectModifyOperation(objectIdentifier, operation);
         List<Transformation<?>> transformations =
                 translate(Collections.singletonList(sinkOperation));
         String jobName = getJobName("collect");
         Pipeline pipeline = execEnv.createPipeline(transformations, 
tableConfig, jobName);
         try {
             JobClient jobClient = execEnv.executeAsync(pipeline);
-            SelectResultProvider resultProvider = 
sinkOperation.getSelectResultProvider();
+            CollectResultProvider resultProvider = 
sinkOperation.getSelectResultProvider();
             resultProvider.setJobClient(jobClient);
             return TableResultImpl.builder()
                     .jobClient(jobClient)
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java
similarity index 63%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java
index 747b58a..1d95b4e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java
@@ -19,31 +19,46 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.internal.SelectResultProvider;
+import org.apache.flink.table.api.internal.CollectResultProvider;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Special, internal kind of {@link ModifyOperation} that collects the content 
of {@link
  * QueryOperation} to local.
  */
 @Internal
-public class SelectSinkOperation implements ModifyOperation {
+public final class CollectModifyOperation implements ModifyOperation {
+
+    private static final AtomicInteger uniqueId = new AtomicInteger(0);
+
+    private final ObjectIdentifier tableIdentifier;
 
     private final QueryOperation child;
+
     // help the client to get the execute result from a specific sink.
-    private SelectResultProvider resultProvider;
+    private CollectResultProvider resultProvider;
 
-    public SelectSinkOperation(QueryOperation child) {
+    public CollectModifyOperation(ObjectIdentifier tableIdentifier, 
QueryOperation child) {
+        this.tableIdentifier = tableIdentifier;
         this.child = child;
     }
 
-    public void setSelectResultProvider(SelectResultProvider resultProvider) {
+    public static int getUniqueId() {
+        return uniqueId.incrementAndGet();
+    }
+
+    public ObjectIdentifier getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    public void setSelectResultProvider(CollectResultProvider resultProvider) {
         this.resultProvider = resultProvider;
     }
 
-    public SelectResultProvider getSelectResultProvider() {
+    public CollectResultProvider getSelectResultProvider() {
         return resultProvider;
     }
 
@@ -60,8 +75,8 @@ public class SelectSinkOperation implements ModifyOperation {
     @Override
     public String asSummaryString() {
         return OperationUtils.formatWithChildren(
-                "SelectSink",
-                new HashMap<>(),
+                "CollectSink",
+                Collections.singletonMap("identifier", tableIdentifier),
                 Collections.singletonList(child),
                 Operation::asSummaryString);
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
index 2d6445f..40428de 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
@@ -34,5 +34,5 @@ public interface ModifyOperationVisitor<T> {
 
     <U> T visit(UnregisteredSinkModifyOperation<U> unregisteredSink);
 
-    T visit(SelectSinkOperation selectOperation);
+    T visit(CollectModifyOperation selectOperation);
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/BatchSelectTableSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/BatchSelectTableSink.java
deleted file mode 100644
index f07b19a..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/BatchSelectTableSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.connectors;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.types.Row;
-
-/** A {@link StreamTableSink} for batch select job to collect the result to 
local. */
-public class BatchSelectTableSink extends SelectTableSinkBase<RowData>
-        implements StreamTableSink<RowData> {
-
-    public BatchSelectTableSink(TableSchema tableSchema) {
-        super(tableSchema, createTypeInfo(tableSchema).toRowSerializer());
-    }
-
-    @Override
-    public TypeInformation<RowData> getOutputType() {
-        return createTypeInfo(getTableSchema());
-    }
-
-    @Override
-    protected Row convertToRow(RowData element) {
-        // convert RowData to Row
-        return converter.toExternal(element);
-    }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
new file mode 100644
index 0000000..ec33f39
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.connectors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.CollectResultProvider;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Table sink for {@link TableResult#collect()}. */
+@Internal
+final class CollectDynamicSink implements DynamicTableSink {
+
+    private final ObjectIdentifier tableIdentifier;
+
+    private final DataType consumedDataType;
+
+    // mutable attributes
+
+    private CollectResultIterator<RowData> iterator;
+
+    CollectDynamicSink(ObjectIdentifier tableIdentifier, DataType 
consumedDataType) {
+        this.tableIdentifier = tableIdentifier;
+        this.consumedDataType = consumedDataType;
+    }
+
+    public CollectResultProvider getSelectResultProvider() {
+        return new CollectResultProvider() {
+            @Override
+            public void setJobClient(JobClient jobClient) {
+                iterator.setJobClient(jobClient);
+            }
+
+            @Override
+            @SuppressWarnings({"unchecked", "rawtypes"})
+            public CloseableIterator<Row> getResultIterator() {
+                // Row after deserialization
+                return (CloseableIterator) iterator;
+            }
+        };
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return requestedMode;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        return (DataStreamSinkProvider)
+                inputStream -> {
+                    final CheckpointConfig checkpointConfig =
+                            
inputStream.getExecutionEnvironment().getCheckpointConfig();
+                    final ExecutionConfig config = 
inputStream.getExecutionConfig();
+
+                    final TypeSerializer<RowData> externalSerializer =
+                            ExternalTypeInfo.<RowData>of(consumedDataType, 
true)
+                                    .createSerializer(config);
+                    final String accumulatorName = 
tableIdentifier.getObjectName();
+
+                    final CollectSinkOperatorFactory<RowData> factory =
+                            new 
CollectSinkOperatorFactory<>(externalSerializer, accumulatorName);
+                    final CollectSinkOperator<RowData> operator =
+                            (CollectSinkOperator<RowData>) 
factory.getOperator();
+
+                    this.iterator =
+                            new CollectResultIterator<>(
+                                    operator.getOperatorIdFuture(),
+                                    externalSerializer,
+                                    accumulatorName,
+                                    checkpointConfig);
+
+                    final CollectStreamSink<RowData> sink =
+                            new CollectStreamSink<>(inputStream, factory);
+                    return sink.name("Collect table sink");
+                };
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        final CollectDynamicSink copy = new 
CollectDynamicSink(tableIdentifier, consumedDataType);
+        // kind of violates the contract of copy() but should not harm
+        // as it is null during optimization anyway until physical translation
+        copy.iterator = iterator;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format("TableToCollect(type=%s)", consumedDataType);
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index d982b70..acd398f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableColumn.MetadataColumn;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -35,6 +36,7 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.CollectModifyOperation;
 import org.apache.flink.table.operations.ExternalModifyOperation;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -48,6 +50,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
@@ -77,6 +80,49 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
 @Internal
 public final class DynamicSinkUtils {
 
+    /** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */
+    public static RelNode convertCollectToRel(
+            FlinkRelBuilder relBuilder,
+            RelNode input,
+            CollectModifyOperation collectModifyOperation) {
+        final DataTypeFactory dataTypeFactory =
+                
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+        final ResolvedSchema childSchema = 
collectModifyOperation.getChild().getResolvedSchema();
+        final ResolvedSchema schema =
+                ResolvedSchema.physical(
+                        childSchema.getColumnNames(), 
childSchema.getColumnDataTypes());
+        final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
+        final ResolvedCatalogTable catalogTable = new 
ResolvedCatalogTable(unresolvedTable, schema);
+
+        final DataType consumedDataType = fixCollectDataType(dataTypeFactory, 
schema);
+
+        final CollectDynamicSink tableSink =
+                new CollectDynamicSink(
+                        collectModifyOperation.getTableIdentifier(), 
consumedDataType);
+        
collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
+        return convertSinkToRel(
+                relBuilder,
+                input,
+                collectModifyOperation.getTableIdentifier(),
+                Collections.emptyMap(),
+                false,
+                tableSink,
+                catalogTable);
+    }
+
+    /** Temporary solution until we drop legacy types. */
+    private static DataType fixCollectDataType(
+            DataTypeFactory dataTypeFactory, ResolvedSchema schema) {
+        final DataType fixedDataType =
+                DataTypeUtils.transform(
+                        dataTypeFactory,
+                        schema.toSourceRowDataType(),
+                        TypeTransformations.legacyRawToTypeInfoRaw(),
+                        TypeTransformations.legacyToNonLegacy());
+        // TODO erase the conversion class earlier when dropping legacy code, 
esp. FLINK-22321
+        return 
TypeConversions.fromLogicalToDataType(fixedDataType.getLogicalType());
+    }
+
     /**
      * Converts an external sink (i.e. further {@link DataStream} 
transformations) to a {@link
      * RelNode}.
@@ -86,7 +132,7 @@ public final class DynamicSinkUtils {
             RelNode input,
             ExternalModifyOperation externalModifyOperation) {
         final ResolvedSchema schema = 
externalModifyOperation.getResolvedSchema();
-        final CatalogTable unresolvedTable = new ExternalCatalogTable(schema);
+        final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
         final ResolvedCatalogTable catalogTable = new 
ResolvedCatalogTable(unresolvedTable, schema);
         final DynamicTableSink tableSink =
                 new ExternalDynamicSink(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index 07ff041..800eca6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -84,7 +84,7 @@ public final class DynamicSourceUtils {
             DataType physicalDataType,
             boolean isTopLevelRecord,
             ChangelogMode changelogMode) {
-        final CatalogTable unresolvedTable = new ExternalCatalogTable(schema);
+        final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
         final ResolvedCatalogTable catalogTable = new 
ResolvedCatalogTable(unresolvedTable, schema);
         final DynamicTableSource tableSource =
                 new ExternalDynamicSource<>(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java
index 9356c52..6e64b81 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java
@@ -39,7 +39,7 @@ final class ExternalDynamicSink implements DynamicTableSink {
 
     private final DataType physicalDataType;
 
-    public ExternalDynamicSink(ChangelogMode changelogMode, DataType 
physicalDataType) {
+    ExternalDynamicSink(ChangelogMode changelogMode, DataType 
physicalDataType) {
         this.changelogMode = changelogMode;
         this.physicalDataType = physicalDataType;
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalCatalogTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/InlineCatalogTable.java
similarity index 79%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalCatalogTable.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/InlineCatalogTable.java
index 494ac53..e14a99b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalCatalogTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/InlineCatalogTable.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -32,15 +33,15 @@ import java.util.Map;
 import java.util.Optional;
 
 /**
- * Helper {@link CatalogTable} for representing a table that is backed by 
external {@link
- * DataStream} API.
+ * Helper {@link CatalogTable} for representing a table that is backed by some 
inline connector
+ * (i.e. {@link DataStream} or {@link TableResult#collect()}).
  */
 @Internal
-final class ExternalCatalogTable implements CatalogTable {
+final class InlineCatalogTable implements CatalogTable {
 
     private final ResolvedSchema schema;
 
-    ExternalCatalogTable(ResolvedSchema schema) {
+    InlineCatalogTable(ResolvedSchema schema) {
         this.schema = schema;
     }
 
@@ -52,18 +53,18 @@ final class ExternalCatalogTable implements CatalogTable {
     @Override
     public Map<String, String> getOptions() {
         throw new TableException(
-                "A catalog table that is backed by a DataStream cannot be 
expressed with "
-                        + "options and can thus also not be persisted.");
+                "A catalog table that is backed by a DataStream or used for 
TableResult.collect() "
+                        + "cannot be expressed with options and can thus also 
not be persisted.");
     }
 
     @Override
     public String getComment() {
-        return "Data Stream API";
+        return "Inline catalog table";
     }
 
     @Override
     public CatalogBaseTable copy() {
-        return new ExternalCatalogTable(schema);
+        return new InlineCatalogTable(schema);
     }
 
     @Override
@@ -89,7 +90,8 @@ final class ExternalCatalogTable implements CatalogTable {
     @Override
     public CatalogTable copy(Map<String, String> options) {
         throw new TableException(
-                "A catalog table that is backed by a DataStream cannot be 
expressed with "
-                        + "options and can thus also not be enriched with 
hints.");
+                "A catalog table that is backed by a DataStream or used for 
TableResult.collect() "
+                        + "cannot be expressed with options and can thus also 
not be enriched "
+                        + "with hints.");
     }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkBase.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkBase.java
deleted file mode 100644
index 64f8796..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkBase.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.connectors;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
-import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
-import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
-import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.SelectResultProvider;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-
-import java.util.UUID;
-
-/**
- * Basic implementation of {@link StreamTableSink} for select job to collect 
the result to local.
- */
-public abstract class SelectTableSinkBase<T> implements StreamTableSink<T> {
-
-    private final TableSchema tableSchema;
-    protected final DataFormatConverters.DataFormatConverter<RowData, Row> 
converter;
-    private final TypeSerializer<T> typeSerializer;
-
-    private CollectResultIterator<T> iterator;
-
-    @SuppressWarnings("unchecked")
-    public SelectTableSinkBase(TableSchema schema, TypeSerializer<T> 
typeSerializer) {
-        this.tableSchema = schema;
-        this.converter =
-                DataFormatConverters.getConverterForDataType(
-                        this.tableSchema.toPhysicalRowDataType());
-        this.typeSerializer = typeSerializer;
-    }
-
-    @Override
-    public TableSchema getTableSchema() {
-        return tableSchema;
-    }
-
-    @Override
-    public TableSink<T> configure(String[] fieldNames, TypeInformation<?>[] 
fieldTypes) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DataStreamSink<?> consumeDataStream(DataStream<T> dataStream) {
-        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
-
-        String accumulatorName = "tableResultCollect_" + UUID.randomUUID();
-        CollectSinkOperatorFactory<T> factory =
-                new CollectSinkOperatorFactory<>(typeSerializer, 
accumulatorName);
-        CollectSinkOperator<Row> operator = (CollectSinkOperator<Row>) 
factory.getOperator();
-        this.iterator =
-                new CollectResultIterator<>(
-                        operator.getOperatorIdFuture(),
-                        typeSerializer,
-                        accumulatorName,
-                        env.getCheckpointConfig());
-
-        CollectStreamSink<?> sink = new CollectStreamSink<>(dataStream, 
factory);
-        env.addOperator(sink.getTransformation());
-        return sink.name("Select table sink");
-    }
-
-    public SelectResultProvider getSelectResultProvider() {
-        return new SelectResultProvider() {
-            @Override
-            public void setJobClient(JobClient jobClient) {
-                iterator.setJobClient(jobClient);
-            }
-
-            @Override
-            public CloseableIterator<Row> getResultIterator() {
-                return new RowIteratorWrapper(iterator);
-            }
-        };
-    }
-
-    /** An Iterator wrapper class that converts Iterator&lt;T&gt; to 
Iterator&lt;Row&gt;. */
-    private class RowIteratorWrapper implements CloseableIterator<Row> {
-        private final CollectResultIterator<T> iterator;
-
-        public RowIteratorWrapper(CollectResultIterator<T> iterator) {
-            this.iterator = iterator;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public Row next() {
-            return convertToRow(iterator.next());
-        }
-
-        @Override
-        public void close() throws Exception {
-            iterator.close();
-        }
-    }
-
-    protected abstract Row convertToRow(T element);
-
-    /** Create {@link InternalTypeInfo} of {@link RowData} based on given 
table schema. */
-    protected static InternalTypeInfo<RowData> createTypeInfo(TableSchema 
tableSchema) {
-        return 
InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
-    }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkSchemaConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkSchemaConverter.java
deleted file mode 100644
index 8e6d950..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkSchemaConverter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.connectors;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TimestampType;
-
-/** An utility class that provides abilities to change {@link TableSchema}. */
-public class SelectTableSinkSchemaConverter {
-
-    /** Change to default conversion class and build a new {@link 
TableSchema}. */
-    public static TableSchema changeDefaultConversionClass(TableSchema 
tableSchema) {
-        DataType[] oldTypes = tableSchema.getFieldDataTypes();
-        String[] fieldNames = tableSchema.getFieldNames();
-
-        TableSchema.Builder builder = TableSchema.builder();
-        for (int i = 0; i < tableSchema.getFieldCount(); i++) {
-            DataType fieldType =
-                    LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
-                            
LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(oldTypes[i]));
-            builder.field(fieldNames[i], fieldType);
-        }
-        return builder.build();
-    }
-
-    /**
-     * Convert time attributes (proc time / event time) to regular timestamp 
and build a new {@link
-     * TableSchema}.
-     */
-    public static TableSchema 
convertTimeAttributeToRegularTimestamp(TableSchema tableSchema) {
-        DataType[] dataTypes = tableSchema.getFieldDataTypes();
-        String[] oldNames = tableSchema.getFieldNames();
-
-        TableSchema.Builder builder = TableSchema.builder();
-        for (int i = 0; i < tableSchema.getFieldCount(); i++) {
-            DataType fieldType = dataTypes[i];
-            String fieldName = oldNames[i];
-            if (fieldType.getLogicalType() instanceof TimestampType) {
-                TimestampType timestampType = (TimestampType) 
fieldType.getLogicalType();
-                if (!timestampType.getKind().equals(TimestampKind.REGULAR)) {
-                    // converts `TIME ATTRIBUTE(ROWTIME)`/`TIME 
ATTRIBUTE(PROCTIME)` to
-                    // `TIMESTAMP(3)`
-                    builder.field(fieldName, DataTypes.TIMESTAMP(3));
-                    continue;
-                }
-            }
-            builder.field(fieldName, fieldType);
-        }
-        return builder.build();
-    }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/StreamSelectTableSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/StreamSelectTableSink.java
deleted file mode 100644
index ea948af..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/StreamSelectTableSink.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.connectors;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-
-/**
- * A {@link RetractStreamTableSink} for streaming select job to collect the 
result to local.
- *
- * <p>{@link RowData} contains {@link RowKind} attribute which can represents 
all kind of changes.
- * The boolean flag is useless here, only because {@link 
RetractStreamTableSink} requires
- * Tuple2&lt;Boolean, T&gt; type.
- */
-public class StreamSelectTableSink extends SelectTableSinkBase<Tuple2<Boolean, 
RowData>>
-        implements RetractStreamTableSink<RowData> {
-
-    public StreamSelectTableSink(TableSchema tableSchema) {
-        super(
-                tableSchema,
-                new TupleTypeInfo<Tuple2<Boolean, RowData>>(
-                                Types.BOOLEAN, createTypeInfo(tableSchema))
-                        .createSerializer(new ExecutionConfig()));
-    }
-
-    @Override
-    public TypeInformation<RowData> getRecordType() {
-        return createTypeInfo(getTableSchema());
-    }
-
-    @Override
-    protected Row convertToRow(Tuple2<Boolean, RowData> tuple2) {
-        // convert Tuple2<Boolean, RowData> to Row
-        return converter.toExternal(tuple2.f1);
-    }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 4edc0c4..0326b18 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -22,11 +22,10 @@ import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ExecutionOptions
 import org.apache.flink.table.api.config.OptimizerConfigOptions
-import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException, 
TableSchema}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, 
ModifyOperation, Operation, QueryOperation}
-import org.apache.flink.table.planner.connectors.{BatchSelectTableSink, 
SelectTableSinkBase}
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph
@@ -85,10 +84,6 @@ class BatchPlanner(
     }
   }
 
-  override protected def createSelectTableSink(tableSchema: TableSchema): 
SelectTableSinkBase[_] = {
-    new BatchSelectTableSink(tableSchema)
-  }
-
   override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     validateAndOverrideConfiguration()
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index eab74f7..58122cc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.config.{ExecutionConfigOptions, 
TableConfigOptions}
-import org.apache.flink.table.api.{PlannerType, SqlDialect, TableConfig, 
TableEnvironment, TableException, TableSchema}
+import org.apache.flink.table.api.{PlannerType, SqlDialect, TableConfig, 
TableEnvironment, TableException}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.delegation.{Executor, Parser, Planner}
@@ -33,8 +33,8 @@ import org.apache.flink.table.operations._
 import org.apache.flink.table.planner.JMap
 import org.apache.flink.table.planner.calcite._
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
+import org.apache.flink.table.planner.connectors.DynamicSinkUtils
 import 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
-import org.apache.flink.table.planner.connectors.{DynamicSinkUtils, 
SelectTableSinkBase, SelectTableSinkSchemaConverter}
 import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
@@ -193,26 +193,9 @@ abstract class PlannerBase(
           "UnregisteredSink",
           ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))
 
-      case s: SelectSinkOperation =>
-        val input = getRelBuilder.queryOperation(s.getChild).build()
-        // convert query schema to sink schema
-        val sinkSchema = 
SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
-          SelectTableSinkSchemaConverter.changeDefaultConversionClass(
-            TableSchema.fromResolvedSchema(s.getChild.getResolvedSchema)))
-        // validate query schema and sink schema, and apply cast if possible
-        val query = validateSchemaAndApplyImplicitCast(
-          input,
-          sinkSchema,
-          null,
-          dataTypeFactory,
-          getTypeFactory)
-        val sink = createSelectTableSink(sinkSchema)
-        s.setSelectResultProvider(sink.getSelectResultProvider)
-        LogicalLegacySink.create(
-          query,
-          sink,
-          "collect",
-          ConnectorCatalogTable.sink(sink, !isStreamingMode))
+      case collectModifyOperation: CollectModifyOperation =>
+        val input = 
getRelBuilder.queryOperation(modifyOperation.getChild).build()
+        DynamicSinkUtils.convertCollectToRel(getRelBuilder, input, 
collectModifyOperation)
 
       case catalogSink: CatalogSinkModifyOperation =>
         val input = 
getRelBuilder.queryOperation(modifyOperation.getChild).build()
@@ -342,14 +325,6 @@ abstract class PlannerBase(
     */
   protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]]
 
-  /**
-   * Creates a [[SelectTableSinkBase]] for a select query.
-   *
-   * @param tableSchema the table schema of select result.
-   * @return The sink to fetch the select result.
-   */
-  protected def createSelectTableSink(tableSchema: TableSchema): 
SelectTableSinkBase[_]
-
   private def getTableSink(
       objectIdentifier: ObjectIdentifier,
       dynamicOptions: JMap[String, String])
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 868f612..c5e96b0 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -21,11 +21,10 @@ package org.apache.flink.table.planner.delegation
 import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ExecutionOptions
-import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException, 
TableSchema}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, 
ModifyOperation, Operation, QueryOperation}
-import org.apache.flink.table.planner.connectors.{SelectTableSinkBase, 
StreamSelectTableSink}
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.`trait`._
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph
@@ -75,10 +74,6 @@ class StreamPlanner(
     }
   }
 
-  override protected def createSelectTableSink(tableSchema: TableSchema): 
SelectTableSinkBase[_] = {
-    new StreamSelectTableSink(tableSchema)
-  }
-
   override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     validateAndOverrideConfiguration()
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index 67cfe26..5f3cbe6 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -21,7 +21,11 @@ package org.apache.flink.table.planner.runtime.stream.sql;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
@@ -35,6 +39,8 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RawType;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
@@ -47,12 +53,15 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
+import java.time.DayOfWeek;
+import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
@@ -184,14 +193,49 @@ public class DataStreamJavaITCase extends 
AbstractTestBase {
 
         tableEnv.createTemporaryView("t", table);
 
-        final TableResult result = tableEnv.executeSql("SELECT p.d, p.b FROM 
t");
+        final TableResult result = tableEnv.executeSql("SELECT p, p.d, p.b 
FROM t");
 
-        testResult(result, Row.of(42.0, null), Row.of(null, null));
+        testResult(
+                result,
+                Row.of(new ImmutablePojo(42.0, null), 42.0, null),
+                Row.of(null, null, null));
 
         testResult(tableEnv.toDataStream(table, ComplexPojo.class), pojos);
     }
 
     @Test
+    public void testFromAndToDataStreamWithRaw() throws Exception {
+        final List<Tuple2<DayOfWeek, ZoneOffset>> rawRecords =
+                Arrays.asList(
+                        Tuple2.of(DayOfWeek.MONDAY, ZoneOffset.UTC),
+                        Tuple2.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
+
+        final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = 
env.fromCollection(rawRecords);
+
+        // verify incoming type information
+        assertThat(dataStream.getType(), instanceOf(TupleTypeInfo.class));
+        final TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) 
dataStream.getType();
+        assertThat(tupleInfo.getFieldTypes()[0], 
instanceOf(EnumTypeInfo.class));
+        assertThat(tupleInfo.getFieldTypes()[1], 
instanceOf(GenericTypeInfo.class));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        // verify schema conversion
+        final List<DataType> columnDataTypes = 
table.getResolvedSchema().getColumnDataTypes();
+        assertThat(columnDataTypes.get(0).getLogicalType(), 
instanceOf(RawType.class));
+        assertThat(columnDataTypes.get(1).getLogicalType(), 
instanceOf(RawType.class));
+
+        // test reverse operation
+        testResult(
+                table.execute(),
+                Row.of(DayOfWeek.MONDAY, ZoneOffset.UTC),
+                Row.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
+        testResult(
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType())),
+                rawRecords.toArray(new Tuple2[0]));
+    }
+
+    @Test
     public void testFromAndToDataStreamEventTime() throws Exception {
         final DataStream<Tuple3<Long, Integer, String>> dataStream = 
getWatermarkedDataStream();
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
index 527083d..5370cf1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
@@ -358,15 +358,15 @@ class AggregationITCase extends BatchTestBase {
 
     val t = CollectionBatchExecTable.get3TupleDataSet(tEnv, "a, b, c")
       .groupBy('b)
-      .select('b, top10Fun('b.cast(Types.INT), 'a.cast(Types.FLOAT)))
+      .select('b, top10Fun('b.cast(DataTypes.INT()).ifNull(0), 
'a.cast(DataTypes.FLOAT).ifNull(0f)))
 
     val expected =
-      "1,[1,1.0, null, null, null, null, null, null, null, null, null]\n" +
-        "2,[2,3.0, 2,2.0, null, null, null, null, null, null, null, null]\n" +
-        "3,[3,6.0, 3,5.0, 3,4.0, null, null, null, null, null, null, null]\n" +
-        "4,[4,10.0, 4,9.0, 4,8.0, 4,7.0, null, null, null, null, null, 
null]\n" +
-        "5,[5,15.0, 5,14.0, 5,13.0, 5,12.0, 5,11.0, null, null, null, null, 
null]\n" +
-        "6,[6,21.0, 6,20.0, 6,19.0, 6,18.0, 6,17.0, 6,16.0, null, null, null, 
null]"
+      "1,[(1,1.0), null, null, null, null, null, null, null, null, null]\n" +
+        "2,[(2,3.0), (2,2.0), null, null, null, null, null, null, null, 
null]\n" +
+        "3,[(3,6.0), (3,5.0), (3,4.0), null, null, null, null, null, null, 
null]\n" +
+        "4,[(4,10.0), (4,9.0), (4,8.0), (4,7.0), null, null, null, null, null, 
null]\n" +
+        "5,[(5,15.0), (5,14.0), (5,13.0), (5,12.0), (5,11.0), null, null, 
null, null, null]\n" +
+        "6,[(6,21.0), (6,20.0), (6,19.0), (6,18.0), (6,17.0), (6,16.0), null, 
null, null, null]"
     val results = executeQuery(t)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
index 7dcb2e9..c0b56f0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
@@ -587,8 +587,6 @@ class CalcITCase extends BatchTestBase {
   @Test
   def testSelectStarFromNestedTable(): Unit = {
 
-    val sqlQuery = "SELECT * FROM MyTable"
-
     val table = BatchTableEnvUtil.fromCollection(tEnv, Seq(
       ((0, 0), "0"),
       ((1, 1), "1"),
@@ -598,9 +596,9 @@ class CalcITCase extends BatchTestBase {
     val results = executeQuery(table)
     results.zipWithIndex.foreach {
       case (row, i) =>
-        val nestedRow = row.getField(0).asInstanceOf[Row]
-        assertEquals(i, nestedRow.getField(0))
-        assertEquals(i, nestedRow.getField(1))
+        val nestedRow = row.getField(0).asInstanceOf[(Int, Int)]
+        assertEquals(i, nestedRow._1)
+        assertEquals(i, nestedRow._2)
         assertEquals(i.toString, row.getField(1))
     }
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
index bcf7605..5b22ac5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
 import org.junit.Assert.assertEquals
-import org.junit.{Assert, Test}
+import org.junit.Test
 
 import java.math.{BigDecimal => JBigDecimal}
 
@@ -52,7 +52,7 @@ class DecimalITCase extends BatchTestBase {
     // check result schema
     val resultTable = tableTransfer(t)
     val ts2 = resultTable.getResolvedSchema.getColumnDataTypes.asScala
-    Assert.assertEquals(expectedColTypes.length, ts2.length)
+    assertEquals(expectedColTypes.length, ts2.length)
 
     expectedColTypes.zip(ts2).foreach {
       case (t1, t2) => assertEquals(t1, t2)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java
index f9c576d..5799a5a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.SelectResultProvider;
+import org.apache.flink.table.api.internal.CollectResultProvider;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.AbstractID;
@@ -76,8 +76,8 @@ public class BatchSelectTableSink implements 
BatchTableSink<Row> {
                 .setParallelism(1);
     }
 
-    public SelectResultProvider getSelectResultProvider() {
-        return new SelectResultProvider() {
+    public CollectResultProvider getSelectResultProvider() {
+        return new CollectResultProvider() {
             private JobClient jobClient;
 
             @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java
index 13096c0..17a67bf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
 import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
 import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.SelectResultProvider;
+import org.apache.flink.table.api.internal.CollectResultProvider;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
@@ -93,8 +93,8 @@ public class StreamSelectTableSink implements 
RetractStreamTableSink<Row> {
         return sink.name("Streaming select table sink");
     }
 
-    public SelectResultProvider getSelectResultProvider() {
-        return new SelectResultProvider() {
+    public CollectResultProvider getSelectResultProvider() {
+        return new CollectResultProvider() {
 
             @Override
             public void setJobClient(JobClient jobClient) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 6164e62..3c79184 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -191,7 +191,7 @@ class StreamPlanner(
       case s: UnregisteredSinkModifyOperation[_] =>
         writeToSink(s.getChild, s.getSink, "UnregisteredSink")
 
-      case s: SelectSinkOperation =>
+      case s: CollectModifyOperation =>
         val sink = new StreamSelectTableSink(
           TableSchema.fromResolvedSchema(s.getChild.getResolvedSchema))
         s.setSelectResultProvider(sink.getSelectResultProvider)

Reply via email to