This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6e08cd6e9229d3d313bbae2a8e4b894e0854be8 Author: Timo Walther <[email protected]> AuthorDate: Mon Apr 12 15:20:59 2021 +0200 [FLINK-20613][table] Update TableResult.collect() to the new type system This closes #15588. --- .../flink/table/runtime/arrow/ArrowUtils.java | 18 +-- ...ultProvider.java => CollectResultProvider.java} | 5 +- .../table/api/internal/TableEnvironmentImpl.java | 13 +- ...kOperation.java => CollectModifyOperation.java} | 33 +++-- .../table/operations/ModifyOperationVisitor.java | 2 +- .../planner/connectors/BatchSelectTableSink.java | 45 ------- .../planner/connectors/CollectDynamicSink.java | 124 +++++++++++++++++++ .../table/planner/connectors/DynamicSinkUtils.java | 48 +++++++- .../planner/connectors/DynamicSourceUtils.java | 2 +- .../planner/connectors/ExternalDynamicSink.java | 2 +- ...alCatalogTable.java => InlineCatalogTable.java} | 22 ++-- .../planner/connectors/SelectTableSinkBase.java | 137 --------------------- .../connectors/SelectTableSinkSchemaConverter.java | 71 ----------- .../planner/connectors/StreamSelectTableSink.java | 60 --------- .../table/planner/delegation/BatchPlanner.scala | 7 +- .../table/planner/delegation/PlannerBase.scala | 35 +----- .../table/planner/delegation/StreamPlanner.scala | 7 +- .../runtime/stream/sql/DataStreamJavaITCase.java | 48 +++++++- .../runtime/batch/table/AggregationITCase.scala | 14 +-- .../planner/runtime/batch/table/CalcITCase.scala | 8 +- .../runtime/batch/table/DecimalITCase.scala | 4 +- .../flink/table/sinks/BatchSelectTableSink.java | 6 +- .../flink/table/sinks/StreamSelectTableSink.java | 6 +- .../apache/flink/table/planner/StreamPlanner.scala | 2 +- 24 files changed, 299 insertions(+), 420 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java index 3302e343..f381bf3 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.table.api.internal.BatchTableEnvImpl; import org.apache.flink.table.api.internal.TableEnvImpl; @@ -35,7 +34,6 @@ import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.data.vector.ColumnVector; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.operations.OutputConversionModifyOperation; -import org.apache.flink.table.planner.connectors.SelectTableSinkSchemaConverter; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader; import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader; @@ -657,7 +655,9 @@ public final class ArrowUtils { checkArrowUsable(); BufferAllocator allocator = getRootAllocator().newChildAllocator("collectAsPandasDataFrame", 0, Long.MAX_VALUE); - RowType rowType = (RowType) table.getSchema().toRowDataType().getLogicalType(); + RowType rowType = + (RowType) table.getResolvedSchema().toSourceRowDataType().getLogicalType(); + DataType defaultRowDataType = TypeConversions.fromLogicalToDataType(rowType); VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -685,19 +685,9 @@ public final class ArrowUtils { @Override public RowData next() { - // The SelectTableSink of blink planner will convert the table schema - // and we - // need to keep the table schema used here be consistent with the - // converted table schema - TableSchema convertedTableSchema = - SelectTableSinkSchemaConverter - .convertTimeAttributeToRegularTimestamp( - SelectTableSinkSchemaConverter - .changeDefaultConversionClass( - table.getSchema())); DataFormatConverters.DataFormatConverter converter = DataFormatConverters.getConverterForDataType( - convertedTableSchema.toRowDataType()); + defaultRowDataType); return (RowData) converter.toInternal(appendOnlyResults.next()); } }; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CollectResultProvider.java similarity index 92% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CollectResultProvider.java index 087ed39..e3ec02a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CollectResultProvider.java @@ -20,16 +20,17 @@ package org.apache.flink.table.api.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; /** * An internal class which helps the client to get the execute result from a specific sink. * - * <p>This class is generated by specific sink and brings the result info to a TableResult. + * <p>This class is generated by specific sink and brings the result info to a {@link TableResult}. */ @Internal -public interface SelectResultProvider { +public interface CollectResultProvider { /** Set the job client associated with the select job to retrieve the result. */ void setJobClient(JobClient jobClient); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 1259cdf..730b7ac 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -87,6 +87,7 @@ import org.apache.flink.table.module.ModuleEntry; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogQueryOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.CollectModifyOperation; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; @@ -94,7 +95,6 @@ import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.NopOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.operations.SelectSinkOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; import org.apache.flink.table.operations.ShowCurrentCatalogOperation; import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; @@ -777,14 +777,21 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } private TableResult executeQueryOperation(QueryOperation operation) { - SelectSinkOperation sinkOperation = new SelectSinkOperation(operation); + final UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of( + "Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId()); + final ObjectIdentifier objectIdentifier = + catalogManager.qualifyIdentifier(unresolvedIdentifier); + + CollectModifyOperation sinkOperation = + new CollectModifyOperation(objectIdentifier, operation); List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation)); String jobName = getJobName("collect"); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); - SelectResultProvider resultProvider = sinkOperation.getSelectResultProvider(); + CollectResultProvider resultProvider = sinkOperation.getSelectResultProvider(); resultProvider.setJobClient(jobClient); return TableResultImpl.builder() .jobClient(jobClient) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java similarity index 63% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java index 747b58a..1d95b4e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java @@ -19,31 +19,46 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.internal.SelectResultProvider; +import org.apache.flink.table.api.internal.CollectResultProvider; +import org.apache.flink.table.catalog.ObjectIdentifier; import java.util.Collections; -import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * Special, internal kind of {@link ModifyOperation} that collects the content of {@link * QueryOperation} to local. */ @Internal -public class SelectSinkOperation implements ModifyOperation { +public final class CollectModifyOperation implements ModifyOperation { + + private static final AtomicInteger uniqueId = new AtomicInteger(0); + + private final ObjectIdentifier tableIdentifier; private final QueryOperation child; + // help the client to get the execute result from a specific sink. - private SelectResultProvider resultProvider; + private CollectResultProvider resultProvider; - public SelectSinkOperation(QueryOperation child) { + public CollectModifyOperation(ObjectIdentifier tableIdentifier, QueryOperation child) { + this.tableIdentifier = tableIdentifier; this.child = child; } - public void setSelectResultProvider(SelectResultProvider resultProvider) { + public static int getUniqueId() { + return uniqueId.incrementAndGet(); + } + + public ObjectIdentifier getTableIdentifier() { + return tableIdentifier; + } + + public void setSelectResultProvider(CollectResultProvider resultProvider) { this.resultProvider = resultProvider; } - public SelectResultProvider getSelectResultProvider() { + public CollectResultProvider getSelectResultProvider() { return resultProvider; } @@ -60,8 +75,8 @@ public class SelectSinkOperation implements ModifyOperation { @Override public String asSummaryString() { return OperationUtils.formatWithChildren( - "SelectSink", - new HashMap<>(), + "CollectSink", + Collections.singletonMap("identifier", tableIdentifier), Collections.singletonList(child), Operation::asSummaryString); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java index 2d6445f..40428de 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java @@ -34,5 +34,5 @@ public interface ModifyOperationVisitor<T> { <U> T visit(UnregisteredSinkModifyOperation<U> unregisteredSink); - T visit(SelectSinkOperation selectOperation); + T visit(CollectModifyOperation selectOperation); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/BatchSelectTableSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/BatchSelectTableSink.java deleted file mode 100644 index f07b19a..0000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/BatchSelectTableSink.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.connectors; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.sinks.StreamTableSink; -import org.apache.flink.types.Row; - -/** A {@link StreamTableSink} for batch select job to collect the result to local. */ -public class BatchSelectTableSink extends SelectTableSinkBase<RowData> - implements StreamTableSink<RowData> { - - public BatchSelectTableSink(TableSchema tableSchema) { - super(tableSchema, createTypeInfo(tableSchema).toRowSerializer()); - } - - @Override - public TypeInformation<RowData> getOutputType() { - return createTypeInfo(getTableSchema()); - } - - @Override - protected Row convertToRow(RowData element) { - // convert RowData to Row - return converter.toExternal(element); - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java new file mode 100644 index 0000000..ec33f39 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.connectors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.internal.CollectResultProvider; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +/** Table sink for {@link TableResult#collect()}. */ +@Internal +final class CollectDynamicSink implements DynamicTableSink { + + private final ObjectIdentifier tableIdentifier; + + private final DataType consumedDataType; + + // mutable attributes + + private CollectResultIterator<RowData> iterator; + + CollectDynamicSink(ObjectIdentifier tableIdentifier, DataType consumedDataType) { + this.tableIdentifier = tableIdentifier; + this.consumedDataType = consumedDataType; + } + + public CollectResultProvider getSelectResultProvider() { + return new CollectResultProvider() { + @Override + public void setJobClient(JobClient jobClient) { + iterator.setJobClient(jobClient); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public CloseableIterator<Row> getResultIterator() { + // Row after deserialization + return (CloseableIterator) iterator; + } + }; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return requestedMode; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return (DataStreamSinkProvider) + inputStream -> { + final CheckpointConfig checkpointConfig = + inputStream.getExecutionEnvironment().getCheckpointConfig(); + final ExecutionConfig config = inputStream.getExecutionConfig(); + + final TypeSerializer<RowData> externalSerializer = + ExternalTypeInfo.<RowData>of(consumedDataType, true) + .createSerializer(config); + final String accumulatorName = tableIdentifier.getObjectName(); + + final CollectSinkOperatorFactory<RowData> factory = + new CollectSinkOperatorFactory<>(externalSerializer, accumulatorName); + final CollectSinkOperator<RowData> operator = + (CollectSinkOperator<RowData>) factory.getOperator(); + + this.iterator = + new CollectResultIterator<>( + operator.getOperatorIdFuture(), + externalSerializer, + accumulatorName, + checkpointConfig); + + final CollectStreamSink<RowData> sink = + new CollectStreamSink<>(inputStream, factory); + return sink.name("Collect table sink"); + }; + } + + @Override + public DynamicTableSink copy() { + final CollectDynamicSink copy = new CollectDynamicSink(tableIdentifier, consumedDataType); + // kind of violates the contract of copy() but should not harm + // as it is null during optimization anyway until physical translation + copy.iterator = iterator; + return copy; + } + + @Override + public String asSummaryString() { + return String.format("TableToCollect(type=%s)", consumedDataType); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index d982b70..acd398f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableColumn.MetadataColumn; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; @@ -35,6 +36,7 @@ import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.CollectModifyOperation; import org.apache.flink.table.operations.ExternalModifyOperation; import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; @@ -48,6 +50,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; @@ -77,6 +80,49 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor @Internal public final class DynamicSinkUtils { + /** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */ + public static RelNode convertCollectToRel( + FlinkRelBuilder relBuilder, + RelNode input, + CollectModifyOperation collectModifyOperation) { + final DataTypeFactory dataTypeFactory = + unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory(); + final ResolvedSchema childSchema = collectModifyOperation.getChild().getResolvedSchema(); + final ResolvedSchema schema = + ResolvedSchema.physical( + childSchema.getColumnNames(), childSchema.getColumnDataTypes()); + final CatalogTable unresolvedTable = new InlineCatalogTable(schema); + final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema); + + final DataType consumedDataType = fixCollectDataType(dataTypeFactory, schema); + + final CollectDynamicSink tableSink = + new CollectDynamicSink( + collectModifyOperation.getTableIdentifier(), consumedDataType); + collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider()); + return convertSinkToRel( + relBuilder, + input, + collectModifyOperation.getTableIdentifier(), + Collections.emptyMap(), + false, + tableSink, + catalogTable); + } + + /** Temporary solution until we drop legacy types. */ + private static DataType fixCollectDataType( + DataTypeFactory dataTypeFactory, ResolvedSchema schema) { + final DataType fixedDataType = + DataTypeUtils.transform( + dataTypeFactory, + schema.toSourceRowDataType(), + TypeTransformations.legacyRawToTypeInfoRaw(), + TypeTransformations.legacyToNonLegacy()); + // TODO erase the conversion class earlier when dropping legacy code, esp. FLINK-22321 + return TypeConversions.fromLogicalToDataType(fixedDataType.getLogicalType()); + } + /** * Converts an external sink (i.e. further {@link DataStream} transformations) to a {@link * RelNode}. @@ -86,7 +132,7 @@ public final class DynamicSinkUtils { RelNode input, ExternalModifyOperation externalModifyOperation) { final ResolvedSchema schema = externalModifyOperation.getResolvedSchema(); - final CatalogTable unresolvedTable = new ExternalCatalogTable(schema); + final CatalogTable unresolvedTable = new InlineCatalogTable(schema); final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema); final DynamicTableSink tableSink = new ExternalDynamicSink( diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java index 07ff041..800eca6 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java @@ -84,7 +84,7 @@ public final class DynamicSourceUtils { DataType physicalDataType, boolean isTopLevelRecord, ChangelogMode changelogMode) { - final CatalogTable unresolvedTable = new ExternalCatalogTable(schema); + final CatalogTable unresolvedTable = new InlineCatalogTable(schema); final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema); final DynamicTableSource tableSource = new ExternalDynamicSource<>( diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java index 9356c52..6e64b81 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java @@ -39,7 +39,7 @@ final class ExternalDynamicSink implements DynamicTableSink { private final DataType physicalDataType; - public ExternalDynamicSink(ChangelogMode changelogMode, DataType physicalDataType) { + ExternalDynamicSink(ChangelogMode changelogMode, DataType physicalDataType) { this.changelogMode = changelogMode; this.physicalDataType = physicalDataType; } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalCatalogTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/InlineCatalogTable.java similarity index 79% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalCatalogTable.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/InlineCatalogTable.java index 494ac53..e14a99b 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalCatalogTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/InlineCatalogTable.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; @@ -32,15 +33,15 @@ import java.util.Map; import java.util.Optional; /** - * Helper {@link CatalogTable} for representing a table that is backed by external {@link - * DataStream} API. + * Helper {@link CatalogTable} for representing a table that is backed by some inline connector + * (i.e. {@link DataStream} or {@link TableResult#collect()}). */ @Internal -final class ExternalCatalogTable implements CatalogTable { +final class InlineCatalogTable implements CatalogTable { private final ResolvedSchema schema; - ExternalCatalogTable(ResolvedSchema schema) { + InlineCatalogTable(ResolvedSchema schema) { this.schema = schema; } @@ -52,18 +53,18 @@ final class ExternalCatalogTable implements CatalogTable { @Override public Map<String, String> getOptions() { throw new TableException( - "A catalog table that is backed by a DataStream cannot be expressed with " - + "options and can thus also not be persisted."); + "A catalog table that is backed by a DataStream or used for TableResult.collect() " + + "cannot be expressed with options and can thus also not be persisted."); } @Override public String getComment() { - return "Data Stream API"; + return "Inline catalog table"; } @Override public CatalogBaseTable copy() { - return new ExternalCatalogTable(schema); + return new InlineCatalogTable(schema); } @Override @@ -89,7 +90,8 @@ final class ExternalCatalogTable implements CatalogTable { @Override public CatalogTable copy(Map<String, String> options) { throw new TableException( - "A catalog table that is backed by a DataStream cannot be expressed with " - + "options and can thus also not be enriched with hints."); + "A catalog table that is backed by a DataStream or used for TableResult.collect() " + + "cannot be expressed with options and can thus also not be enriched " + + "with hints."); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkBase.java deleted file mode 100644 index 64f8796..0000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkBase.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.connectors; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; -import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; -import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; -import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectResultProvider; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.util.DataFormatConverters; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.apache.flink.table.sinks.StreamTableSink; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; - -import java.util.UUID; - -/** - * Basic implementation of {@link StreamTableSink} for select job to collect the result to local. - */ -public abstract class SelectTableSinkBase<T> implements StreamTableSink<T> { - - private final TableSchema tableSchema; - protected final DataFormatConverters.DataFormatConverter<RowData, Row> converter; - private final TypeSerializer<T> typeSerializer; - - private CollectResultIterator<T> iterator; - - @SuppressWarnings("unchecked") - public SelectTableSinkBase(TableSchema schema, TypeSerializer<T> typeSerializer) { - this.tableSchema = schema; - this.converter = - DataFormatConverters.getConverterForDataType( - this.tableSchema.toPhysicalRowDataType()); - this.typeSerializer = typeSerializer; - } - - @Override - public TableSchema getTableSchema() { - return tableSchema; - } - - @Override - public TableSink<T> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - throw new UnsupportedOperationException(); - } - - @Override - public DataStreamSink<?> consumeDataStream(DataStream<T> dataStream) { - StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); - - String accumulatorName = "tableResultCollect_" + UUID.randomUUID(); - CollectSinkOperatorFactory<T> factory = - new CollectSinkOperatorFactory<>(typeSerializer, accumulatorName); - CollectSinkOperator<Row> operator = (CollectSinkOperator<Row>) factory.getOperator(); - this.iterator = - new CollectResultIterator<>( - operator.getOperatorIdFuture(), - typeSerializer, - accumulatorName, - env.getCheckpointConfig()); - - CollectStreamSink<?> sink = new CollectStreamSink<>(dataStream, factory); - env.addOperator(sink.getTransformation()); - return sink.name("Select table sink"); - } - - public SelectResultProvider getSelectResultProvider() { - return new SelectResultProvider() { - @Override - public void setJobClient(JobClient jobClient) { - iterator.setJobClient(jobClient); - } - - @Override - public CloseableIterator<Row> getResultIterator() { - return new RowIteratorWrapper(iterator); - } - }; - } - - /** An Iterator wrapper class that converts Iterator<T> to Iterator<Row>. */ - private class RowIteratorWrapper implements CloseableIterator<Row> { - private final CollectResultIterator<T> iterator; - - public RowIteratorWrapper(CollectResultIterator<T> iterator) { - this.iterator = iterator; - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public Row next() { - return convertToRow(iterator.next()); - } - - @Override - public void close() throws Exception { - iterator.close(); - } - } - - protected abstract Row convertToRow(T element); - - /** Create {@link InternalTypeInfo} of {@link RowData} based on given table schema. */ - protected static InternalTypeInfo<RowData> createTypeInfo(TableSchema tableSchema) { - return InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType()); - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkSchemaConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkSchemaConverter.java deleted file mode 100644 index 8e6d950..0000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/SelectTableSinkSchemaConverter.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.connectors; - -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.TimestampKind; -import org.apache.flink.table.types.logical.TimestampType; - -/** An utility class that provides abilities to change {@link TableSchema}. */ -public class SelectTableSinkSchemaConverter { - - /** Change to default conversion class and build a new {@link TableSchema}. */ - public static TableSchema changeDefaultConversionClass(TableSchema tableSchema) { - DataType[] oldTypes = tableSchema.getFieldDataTypes(); - String[] fieldNames = tableSchema.getFieldNames(); - - TableSchema.Builder builder = TableSchema.builder(); - for (int i = 0; i < tableSchema.getFieldCount(); i++) { - DataType fieldType = - LogicalTypeDataTypeConverter.fromLogicalTypeToDataType( - LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(oldTypes[i])); - builder.field(fieldNames[i], fieldType); - } - return builder.build(); - } - - /** - * Convert time attributes (proc time / event time) to regular timestamp and build a new {@link - * TableSchema}. - */ - public static TableSchema convertTimeAttributeToRegularTimestamp(TableSchema tableSchema) { - DataType[] dataTypes = tableSchema.getFieldDataTypes(); - String[] oldNames = tableSchema.getFieldNames(); - - TableSchema.Builder builder = TableSchema.builder(); - for (int i = 0; i < tableSchema.getFieldCount(); i++) { - DataType fieldType = dataTypes[i]; - String fieldName = oldNames[i]; - if (fieldType.getLogicalType() instanceof TimestampType) { - TimestampType timestampType = (TimestampType) fieldType.getLogicalType(); - if (!timestampType.getKind().equals(TimestampKind.REGULAR)) { - // converts `TIME ATTRIBUTE(ROWTIME)`/`TIME ATTRIBUTE(PROCTIME)` to - // `TIMESTAMP(3)` - builder.field(fieldName, DataTypes.TIMESTAMP(3)); - continue; - } - } - builder.field(fieldName, fieldType); - } - return builder.build(); - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/StreamSelectTableSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/StreamSelectTableSink.java deleted file mode 100644 index ea948af..0000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/StreamSelectTableSink.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.connectors; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.sinks.RetractStreamTableSink; -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; - -/** - * A {@link RetractStreamTableSink} for streaming select job to collect the result to local. - * - * <p>{@link RowData} contains {@link RowKind} attribute which can represents all kind of changes. - * The boolean flag is useless here, only because {@link RetractStreamTableSink} requires - * Tuple2<Boolean, T> type. - */ -public class StreamSelectTableSink extends SelectTableSinkBase<Tuple2<Boolean, RowData>> - implements RetractStreamTableSink<RowData> { - - public StreamSelectTableSink(TableSchema tableSchema) { - super( - tableSchema, - new TupleTypeInfo<Tuple2<Boolean, RowData>>( - Types.BOOLEAN, createTypeInfo(tableSchema)) - .createSerializer(new ExecutionConfig())); - } - - @Override - public TypeInformation<RowData> getRecordType() { - return createTypeInfo(getTableSchema()); - } - - @Override - protected Row convertToRow(Tuple2<Boolean, RowData> tuple2) { - // convert Tuple2<Boolean, RowData> to Row - return converter.toExternal(tuple2.f1); - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index 4edc0c4..0326b18 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -22,11 +22,10 @@ import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.dag.Transformation import org.apache.flink.configuration.ExecutionOptions import org.apache.flink.table.api.config.OptimizerConfigOptions -import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException, TableSchema} +import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} -import org.apache.flink.table.planner.connectors.{BatchSelectTableSink, SelectTableSinkBase} import org.apache.flink.table.planner.operations.PlannerQueryOperation import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph @@ -85,10 +84,6 @@ class BatchPlanner( } } - override protected def createSelectTableSink(tableSchema: TableSchema): SelectTableSinkBase[_] = { - new BatchSelectTableSink(tableSchema) - } - override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { require(operations.nonEmpty, "operations should not be empty") validateAndOverrideConfiguration() diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index eab74f7..58122cc 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.api.dag.Transformation import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions} -import org.apache.flink.table.api.{PlannerType, SqlDialect, TableConfig, TableEnvironment, TableException, TableSchema} +import org.apache.flink.table.api.{PlannerType, SqlDialect, TableConfig, TableEnvironment, TableException} import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.delegation.{Executor, Parser, Planner} @@ -33,8 +33,8 @@ import org.apache.flink.table.operations._ import org.apache.flink.table.planner.JMap import org.apache.flink.table.planner.calcite._ import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema +import org.apache.flink.table.planner.connectors.DynamicSinkUtils import org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast -import org.apache.flink.table.planner.connectors.{DynamicSinkUtils, SelectTableSinkBase, SelectTableSinkSchemaConverter} import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink @@ -193,26 +193,9 @@ abstract class PlannerBase( "UnregisteredSink", ConnectorCatalogTable.sink(s.getSink, !isStreamingMode)) - case s: SelectSinkOperation => - val input = getRelBuilder.queryOperation(s.getChild).build() - // convert query schema to sink schema - val sinkSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp( - SelectTableSinkSchemaConverter.changeDefaultConversionClass( - TableSchema.fromResolvedSchema(s.getChild.getResolvedSchema))) - // validate query schema and sink schema, and apply cast if possible - val query = validateSchemaAndApplyImplicitCast( - input, - sinkSchema, - null, - dataTypeFactory, - getTypeFactory) - val sink = createSelectTableSink(sinkSchema) - s.setSelectResultProvider(sink.getSelectResultProvider) - LogicalLegacySink.create( - query, - sink, - "collect", - ConnectorCatalogTable.sink(sink, !isStreamingMode)) + case collectModifyOperation: CollectModifyOperation => + val input = getRelBuilder.queryOperation(modifyOperation.getChild).build() + DynamicSinkUtils.convertCollectToRel(getRelBuilder, input, collectModifyOperation) case catalogSink: CatalogSinkModifyOperation => val input = getRelBuilder.queryOperation(modifyOperation.getChild).build() @@ -342,14 +325,6 @@ abstract class PlannerBase( */ protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] - /** - * Creates a [[SelectTableSinkBase]] for a select query. - * - * @param tableSchema the table schema of select result. - * @return The sink to fetch the select result. - */ - protected def createSelectTableSink(tableSchema: TableSchema): SelectTableSinkBase[_] - private def getTableSink( objectIdentifier: ObjectIdentifier, dynamicOptions: JMap[String, String]) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index 868f612..c5e96b0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -21,11 +21,10 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.dag.Transformation import org.apache.flink.configuration.ExecutionOptions -import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException, TableSchema} +import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} -import org.apache.flink.table.planner.connectors.{SelectTableSinkBase, StreamSelectTableSink} import org.apache.flink.table.planner.operations.PlannerQueryOperation import org.apache.flink.table.planner.plan.`trait`._ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph @@ -75,10 +74,6 @@ class StreamPlanner( } } - override protected def createSelectTableSink(tableSchema: TableSchema): SelectTableSinkBase[_] = { - new StreamSelectTableSink(tableSchema) - } - override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { require(operations.nonEmpty, "operations should not be empty") validateAndOverrideConfiguration() diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index 67cfe26..5f3cbe6 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -21,7 +21,11 @@ package org.apache.flink.table.planner.runtime.stream.sql; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; @@ -35,6 +39,8 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RawType; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; @@ -47,12 +53,15 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import java.time.DayOfWeek; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -184,14 +193,49 @@ public class DataStreamJavaITCase extends AbstractTestBase { tableEnv.createTemporaryView("t", table); - final TableResult result = tableEnv.executeSql("SELECT p.d, p.b FROM t"); + final TableResult result = tableEnv.executeSql("SELECT p, p.d, p.b FROM t"); - testResult(result, Row.of(42.0, null), Row.of(null, null)); + testResult( + result, + Row.of(new ImmutablePojo(42.0, null), 42.0, null), + Row.of(null, null, null)); testResult(tableEnv.toDataStream(table, ComplexPojo.class), pojos); } @Test + public void testFromAndToDataStreamWithRaw() throws Exception { + final List<Tuple2<DayOfWeek, ZoneOffset>> rawRecords = + Arrays.asList( + Tuple2.of(DayOfWeek.MONDAY, ZoneOffset.UTC), + Tuple2.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5))); + + final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = env.fromCollection(rawRecords); + + // verify incoming type information + assertThat(dataStream.getType(), instanceOf(TupleTypeInfo.class)); + final TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) dataStream.getType(); + assertThat(tupleInfo.getFieldTypes()[0], instanceOf(EnumTypeInfo.class)); + assertThat(tupleInfo.getFieldTypes()[1], instanceOf(GenericTypeInfo.class)); + + final Table table = tableEnv.fromDataStream(dataStream); + + // verify schema conversion + final List<DataType> columnDataTypes = table.getResolvedSchema().getColumnDataTypes(); + assertThat(columnDataTypes.get(0).getLogicalType(), instanceOf(RawType.class)); + assertThat(columnDataTypes.get(1).getLogicalType(), instanceOf(RawType.class)); + + // test reverse operation + testResult( + table.execute(), + Row.of(DayOfWeek.MONDAY, ZoneOffset.UTC), + Row.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5))); + testResult( + tableEnv.toDataStream(table, DataTypes.of(dataStream.getType())), + rawRecords.toArray(new Tuple2[0])); + } + + @Test public void testFromAndToDataStreamEventTime() throws Exception { final DataStream<Tuple3<Long, Integer, String>> dataStream = getWatermarkedDataStream(); diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala index 527083d..5370cf1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala @@ -358,15 +358,15 @@ class AggregationITCase extends BatchTestBase { val t = CollectionBatchExecTable.get3TupleDataSet(tEnv, "a, b, c") .groupBy('b) - .select('b, top10Fun('b.cast(Types.INT), 'a.cast(Types.FLOAT))) + .select('b, top10Fun('b.cast(DataTypes.INT()).ifNull(0), 'a.cast(DataTypes.FLOAT).ifNull(0f))) val expected = - "1,[1,1.0, null, null, null, null, null, null, null, null, null]\n" + - "2,[2,3.0, 2,2.0, null, null, null, null, null, null, null, null]\n" + - "3,[3,6.0, 3,5.0, 3,4.0, null, null, null, null, null, null, null]\n" + - "4,[4,10.0, 4,9.0, 4,8.0, 4,7.0, null, null, null, null, null, null]\n" + - "5,[5,15.0, 5,14.0, 5,13.0, 5,12.0, 5,11.0, null, null, null, null, null]\n" + - "6,[6,21.0, 6,20.0, 6,19.0, 6,18.0, 6,17.0, 6,16.0, null, null, null, null]" + "1,[(1,1.0), null, null, null, null, null, null, null, null, null]\n" + + "2,[(2,3.0), (2,2.0), null, null, null, null, null, null, null, null]\n" + + "3,[(3,6.0), (3,5.0), (3,4.0), null, null, null, null, null, null, null]\n" + + "4,[(4,10.0), (4,9.0), (4,8.0), (4,7.0), null, null, null, null, null, null]\n" + + "5,[(5,15.0), (5,14.0), (5,13.0), (5,12.0), (5,11.0), null, null, null, null, null]\n" + + "6,[(6,21.0), (6,20.0), (6,19.0), (6,18.0), (6,17.0), (6,16.0), null, null, null, null]" val results = executeQuery(t) TestBaseUtils.compareResultAsText(results.asJava, expected) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala index 7dcb2e9..c0b56f0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala @@ -587,8 +587,6 @@ class CalcITCase extends BatchTestBase { @Test def testSelectStarFromNestedTable(): Unit = { - val sqlQuery = "SELECT * FROM MyTable" - val table = BatchTableEnvUtil.fromCollection(tEnv, Seq( ((0, 0), "0"), ((1, 1), "1"), @@ -598,9 +596,9 @@ class CalcITCase extends BatchTestBase { val results = executeQuery(table) results.zipWithIndex.foreach { case (row, i) => - val nestedRow = row.getField(0).asInstanceOf[Row] - assertEquals(i, nestedRow.getField(0)) - assertEquals(i, nestedRow.getField(1)) + val nestedRow = row.getField(0).asInstanceOf[(Int, Int)] + assertEquals(i, nestedRow._1) + assertEquals(i, nestedRow._2) assertEquals(i.toString, row.getField(1)) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala index bcf7605..5b22ac5 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.types.DataType import org.apache.flink.types.Row import org.junit.Assert.assertEquals -import org.junit.{Assert, Test} +import org.junit.Test import java.math.{BigDecimal => JBigDecimal} @@ -52,7 +52,7 @@ class DecimalITCase extends BatchTestBase { // check result schema val resultTable = tableTransfer(t) val ts2 = resultTable.getResolvedSchema.getColumnDataTypes.asScala - Assert.assertEquals(expectedColTypes.length, ts2.length) + assertEquals(expectedColTypes.length, ts2.length) expectedColTypes.zip(ts2).foreach { case (t1, t2) => assertEquals(t1, t2) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java index f9c576d..5799a5a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectResultProvider; +import org.apache.flink.table.api.internal.CollectResultProvider; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.util.AbstractID; @@ -76,8 +76,8 @@ public class BatchSelectTableSink implements BatchTableSink<Row> { .setParallelism(1); } - public SelectResultProvider getSelectResultProvider() { - return new SelectResultProvider() { + public CollectResultProvider getSelectResultProvider() { + return new CollectResultProvider() { private JobClient jobClient; @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java index 13096c0..17a67bf 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectResultProvider; +import org.apache.flink.table.api.internal.CollectResultProvider; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; @@ -93,8 +93,8 @@ public class StreamSelectTableSink implements RetractStreamTableSink<Row> { return sink.name("Streaming select table sink"); } - public SelectResultProvider getSelectResultProvider() { - return new SelectResultProvider() { + public CollectResultProvider getSelectResultProvider() { + return new CollectResultProvider() { @Override public void setJobClient(JobClient jobClient) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index 6164e62..3c79184 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -191,7 +191,7 @@ class StreamPlanner( case s: UnregisteredSinkModifyOperation[_] => writeToSink(s.getChild, s.getSink, "UnregisteredSink") - case s: SelectSinkOperation => + case s: CollectModifyOperation => val sink = new StreamSelectTableSink( TableSchema.fromResolvedSchema(s.getChild.getResolvedSchema)) s.setSelectResultProvider(sink.getSelectResultProvider)
