This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b7d32069c0971fba1c33950aa373624bb6018505 Author: luoyuxia <[email protected]> AuthorDate: Sun Jan 15 20:14:33 2023 +0800 [FLINK-30662][table] Planner supports row-level delete This closes #21676 --- .../table/api/internal/TableEnvironmentImpl.java | 7 +- .../flink/table/catalog/ContextResolvedTable.java | 9 + .../table/planner/connectors/DynamicSinkUtils.java | 379 ++++++++++- .../planner/connectors/DynamicSourceUtils.java | 26 +- .../operations/SqlToOperationConverter.java | 21 +- .../plan/abilities/sink/RowLevelDeleteSpec.java | 97 +++ .../plan/abilities/sink/SinkAbilitySpec.java | 3 +- .../plan/nodes/exec/common/CommonExecSink.java | 44 ++ .../utils/RowLevelModificationContextUtils.java | 57 ++ .../table/planner/delegation/PlannerBase.scala | 14 + .../planner/plan/schema/TableSourceTable.scala | 30 + .../factories/TestUpdateDeleteTableFactory.java | 384 ++++++++++- .../operations/SqlToOperationConverterTest.java | 24 +- .../planner/plan/batch/sql/RowLevelDeleteTest.java | 129 ++++ .../runtime/batch/sql/DeleteTableITCase.java | 132 +++- .../runtime/stream/sql/DeleteTableITCase.java | 7 + .../planner/plan/batch/sql/RowLevelDeleteTest.xml | 714 +++++++++++++++++++++ .../runtime/operators/sink/RowKindSetter.java | 46 ++ .../runtime/operators/sink/RowKindSetterTest.java | 70 ++ 19 files changed, 2129 insertions(+), 64 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 3578e1845cb..536db5c430a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -721,7 +721,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } else { if (operations.size() > 1 && operations.stream().anyMatch(this::isRowLevelModification)) { - throw new TableException("Only single UPDATE/DELETE statement is supported."); + throw new TableException( + "Unsupported SQL query! Only accept a single SQL statement of type DELETE, UPDATE."); } return planner.explain(operations, format, extraDetails); } @@ -784,6 +785,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { if (operations.size() != 1 || !(operations.get(0) instanceof ModifyOperation) + || isRowLevelModification(operations.get(0)) || operations.get(0) instanceof CreateTableASOperation) { throw new TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG); } @@ -859,7 +861,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { if (operations.size() > 1) { throw new TableException( String.format( - "Only single %s statement is supported.", modifyType)); + "Unsupported SQL query! Only accept a single SQL statement of type %s.", + modifyType)); } if (isStreamingMode) { throw new TableException( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java index 52c84cfcccf..e7b9e5f0835 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java @@ -157,6 +157,15 @@ public final class ContextResolvedTable { false); } + /** Copy the {@link ContextResolvedTable}, replacing the underlying {@link ResolvedSchema}. */ + public ContextResolvedTable copy(ResolvedSchema newSchema) { + return new ContextResolvedTable( + objectIdentifier, + catalog, + new ResolvedCatalogTable((CatalogTable) resolvedTable.getOrigin(), newSchema), + false); + } + /** * This method tries to return the connector name of the table, trying to provide a bit more * helpful toString for anonymous tables. It's only to help users to debug, and its return value diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index b810e71c615..69574fabd74 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -19,9 +19,11 @@ package org.apache.flink.table.planner.connectors; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; @@ -35,19 +37,28 @@ import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ExternalCatalogTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.connector.RowLevelModificationScanContext; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.operations.CollectModifyOperation; import org.apache.flink.table.operations.ExternalModifyOperation; import org.apache.flink.table.operations.SinkModifyOperation; import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec; +import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec; import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec; import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec; import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.TypeTransformations; import org.apache.flink.table.types.logical.LogicalType; @@ -58,10 +69,20 @@ import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableModify; import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; import java.time.ZoneId; import java.util.ArrayList; @@ -69,6 +90,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -193,6 +215,12 @@ public final class DynamicSinkUtils { List<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<>(); + boolean isDelete = false; + if (input instanceof LogicalTableModify) { + LogicalTableModify tableModify = (LogicalTableModify) input; + isDelete = tableModify.getOperation() == TableModify.Operation.DELETE; + } + // 1. prepare table sink prepareDynamicSink( tableDebugName, @@ -201,12 +229,31 @@ public final class DynamicSinkUtils { sink, contextResolvedTable.getResolvedTable(), sinkAbilitySpecs); + + // rewrite rel node for delete + if (isDelete) { + input = + convertDelete( + (LogicalTableModify) input, + sink, + contextResolvedTable, + tableDebugName, + dataTypeFactory, + typeFactory, + sinkAbilitySpecs); + } + sinkAbilitySpecs.forEach(spec -> spec.apply(sink)); // 2. validate the query schema to the sink's table schema and apply cast if possible - final RelNode query = - validateSchemaAndApplyImplicitCast( - input, schema, tableDebugName, dataTypeFactory, typeFactory); + RelNode query = input; + // skip validate and implicit cast when it's delete as it been done before + if (!isDelete) { + query = + validateSchemaAndApplyImplicitCast( + input, schema, tableDebugName, dataTypeFactory, typeFactory); + } + relBuilder.push(query); // 3. convert the sink's table schema to the consumed data type of the sink @@ -230,26 +277,48 @@ public final class DynamicSinkUtils { sinkAbilitySpecs.toArray(new SinkAbilitySpec[0])); } - /** - * Checks if the given query can be written into the given sink's table schema. - * - * <p>It checks whether field types are compatible (types should be equal including precisions). - * If types are not compatible, but can be implicitly cast, a cast projection will be applied. - * Otherwise, an exception will be thrown. - */ + /** Checks if the given query can be written into the given sink's table schema. */ public static RelNode validateSchemaAndApplyImplicitCast( RelNode query, ResolvedSchema sinkSchema, String tableDebugName, DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory) { - final RowType queryType = FlinkTypeFactory.toLogicalRowType(query.getRowType()); - final List<RowField> queryFields = queryType.getFields(); - final RowType sinkType = (RowType) fixSinkDataType(dataTypeFactory, sinkSchema.toSinkRowDataType()) .getLogicalType(); + + return validateSchemaAndApplyImplicitCast(query, sinkType, tableDebugName, typeFactory); + } + + /** Checks if the given query can be written into the given target types. */ + public static RelNode validateSchemaAndApplyImplicitCast( + RelNode query, + List<DataType> targetTypes, + String tableDebugName, + DataTypeFactory dataTypeFactory, + FlinkTypeFactory typeFactory) { + final RowType sinkType = + (RowType) + fixSinkDataType( + dataTypeFactory, + DataTypes.ROW(targetTypes.toArray(new DataType[0]))) + .getLogicalType(); + return validateSchemaAndApplyImplicitCast(query, sinkType, tableDebugName, typeFactory); + } + + /** + * Checks if the given query can be written into the given sink type. + * + * <p>It checks whether field types are compatible (types should be equal including precisions). + * If types are not compatible, but can be implicitly cast, a cast projection will be applied. + * Otherwise, an exception will be thrown. + */ + private static RelNode validateSchemaAndApplyImplicitCast( + RelNode query, RowType sinkType, String tableDebugName, FlinkTypeFactory typeFactory) { + final RowType queryType = FlinkTypeFactory.toLogicalRowType(query.getRowType()); + final List<RowField> queryFields = queryType.getFields(); final List<RowField> sinkFields = sinkType.getFields(); if (queryFields.size() != sinkFields.size()) { @@ -282,6 +351,290 @@ public final class DynamicSinkUtils { return query; } + private static RelNode convertDelete( + LogicalTableModify tableModify, + DynamicTableSink sink, + ContextResolvedTable contextResolvedTable, + String tableDebugName, + DataTypeFactory dataTypeFactory, + FlinkTypeFactory typeFactory, + List<SinkAbilitySpec> sinkAbilitySpecs) { + if (!(sink instanceof SupportsRowLevelDelete)) { + throw new UnsupportedOperationException( + String.format( + "Can't perform delete operation of the table %s because the corresponding dynamic table sink has not yet implemented %s.", + tableDebugName, SupportsRowLevelDelete.class.getName())); + } + + // get the row-level delete info + SupportsRowLevelDelete supportsRowLevelDelete = (SupportsRowLevelDelete) sink; + RowLevelModificationScanContext context = RowLevelModificationContextUtils.getScanContext(); + SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo = + supportsRowLevelDelete.applyRowLevelDelete(context); + sinkAbilitySpecs.add( + new RowLevelDeleteSpec(rowLevelDeleteInfo.getRowLevelDeleteMode(), context)); + + if (rowLevelDeleteInfo.getRowLevelDeleteMode() + == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) { + // convert the LogicalTableModify node to a rel node representing row-level delete + return convertToRowLevelDelete( + tableModify, + contextResolvedTable, + rowLevelDeleteInfo, + tableDebugName, + dataTypeFactory, + typeFactory); + } else if (rowLevelDeleteInfo.getRowLevelDeleteMode() + == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) { + // if it's for remaining row, convert the predicate in where clause + // to the negative predicate + convertPredicateToNegative(tableModify); + // convert the LogicalTableModify node to a rel node representing row-level delete + return convertToRowLevelDelete( + tableModify, + contextResolvedTable, + rowLevelDeleteInfo, + tableDebugName, + dataTypeFactory, + typeFactory); + } else { + throw new TableException( + "Unknown delete mode: " + rowLevelDeleteInfo.getRowLevelDeleteMode()); + } + } + + /** Convert tableModify node to a rel node representing for row-level delete. */ + private static RelNode convertToRowLevelDelete( + LogicalTableModify tableModify, + ContextResolvedTable contextResolvedTable, + SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo, + String tableDebugName, + DataTypeFactory dataTypeFactory, + FlinkTypeFactory typeFactory) { + // get the required columns + ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema(); + Optional<List<Column>> optionalColumns = rowLevelDeleteInfo.requiredColumns(); + List<Column> requiredColumns = optionalColumns.orElse(resolvedSchema.getColumns()); + // get the root table scan which we may need rewrite it + LogicalTableScan tableScan = getSourceTableScan(tableModify); + // get the index for the required columns and extra meta cols if necessary + Tuple2<List<Integer>, List<MetadataColumn>> colsIndexAndExtraMetaCols = + getRequireColumnsIndexAndExtraMetaCols(tableScan, requiredColumns, resolvedSchema); + List<Integer> colIndexes = colsIndexAndExtraMetaCols.f0; + List<MetadataColumn> metadataColumns = colsIndexAndExtraMetaCols.f1; + // if meta columns size is greater than 0, we need to modify the underlying + // LogicalTableScan to make it can read meta column + if (metadataColumns.size() > 0) { + resolvedSchema = + addExtraMetaCols( + tableModify, tableScan, tableDebugName, metadataColumns, typeFactory); + } + // create a project only select the required columns for delete + return projectColumnsForDelete( + tableModify, + resolvedSchema, + colIndexes, + tableDebugName, + dataTypeFactory, + typeFactory); + } + + /** Convert the predicate in WHERE clause to the negative predicate. */ + private static void convertPredicateToNegative(LogicalTableModify tableModify) { + RexBuilder rexBuilder = tableModify.getCluster().getRexBuilder(); + RelNode input = tableModify.getInput(); + LogicalFilter newFilter; + // if the input is a table scan, there's no predicate which means it's always true + // the negative predicate should be false + if (input.getInput(0) instanceof LogicalTableScan) { + newFilter = LogicalFilter.create(input.getInput(0), rexBuilder.makeLiteral(false)); + } else { + LogicalFilter filter = (LogicalFilter) input.getInput(0); + // create a filter with negative predicate + RexNode complementFilter = + rexBuilder.makeCall( + filter.getCondition().getType(), + FlinkSqlOperatorTable.NOT, + Collections.singletonList(filter.getCondition())); + newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), complementFilter); + } + // replace with the new filter + input.replaceInput(0, newFilter); + } + + /** Get the index for the required columns and extra meta cols if necessary. */ + private static Tuple2<List<Integer>, List<MetadataColumn>> + getRequireColumnsIndexAndExtraMetaCols( + LogicalTableScan tableScan, + List<Column> requiredColumns, + ResolvedSchema resolvedSchema) { + // index list for the required columns + List<Integer> columnIndexList = new ArrayList<>(); + // extra meta cols + List<MetadataColumn> extraMetadataColumns = new ArrayList<>(); + List<String> fieldNames = resolvedSchema.getColumnNames(); + final TableSourceTable sourceTable = tableScan.getTable().unwrap(TableSourceTable.class); + DynamicTableSource dynamicTableSource = sourceTable.tableSource(); + int additionCols = 0; + // iterate for each required column + for (Column column : requiredColumns) { + int index = fieldNames.indexOf(column.getName()); + // if we can't find the column, we may need to add extra column + if (index <= -1) { + // we only consider add metadata column + if (column instanceof Column.MetadataColumn) { + // need to add meta column + columnIndexList.add(fieldNames.size() + additionCols); + if (!(dynamicTableSource instanceof SupportsReadingMetadata)) { + throw new UnsupportedOperationException( + String.format( + "The table source don't support reading metadata, but the require columns contains the meta columns: %s.", + column)); + } + // list what metas the source supports to read + SupportsReadingMetadata supportsReadingMetadata = + (SupportsReadingMetadata) dynamicTableSource; + Map<String, DataType> readableMetadata = + supportsReadingMetadata.listReadableMetadata(); + // check the source can read the meta column + String metaCol = + ((MetadataColumn) column).getMetadataKey().orElse(column.getName()); + if (!readableMetadata.containsKey(metaCol)) { + throw new IllegalArgumentException( + String.format( + "Expect to read the meta column %s, but the table source for table %s doesn't support read the metadata column." + + "Please make sure the readable metadata for the source contains %s.", + column, + UnresolvedIdentifier.of( + tableScan.getTable().getQualifiedName()), + metaCol)); + } + // mark it as extra col + additionCols += 1; + DataType dataType = readableMetadata.get(metaCol); + if (!dataType.equals(column.getDataType())) { + throw new IllegalArgumentException( + String.format( + "Un-matched data type: the required column %s has datatype %s, but the data type in readable metadata for the table %s has data type %s. ", + column, + column.getDataType(), + UnresolvedIdentifier.of( + tableScan.getTable().getQualifiedName()), + dataType)); + } + extraMetadataColumns.add((MetadataColumn) column); + } else { + throw new IllegalArgumentException("Unknown required column " + column); + } + } else { + columnIndexList.add(index); + } + } + return Tuple2.of(columnIndexList, extraMetadataColumns); + } + + private static LogicalTableScan getSourceTableScan(RelNode relNode) { + while (!(relNode instanceof LogicalTableScan)) { + relNode = relNode.getInput(0); + } + return (LogicalTableScan) relNode; + } + + /** + * Add extra meta columns for underlying table scan, return a new resolve schema after adding + * extra meta columns. + */ + private static ResolvedSchema addExtraMetaCols( + LogicalTableModify tableModify, + LogicalTableScan tableScan, + String tableDebugName, + List<MetadataColumn> metadataColumns, + FlinkTypeFactory typeFactory) { + final TableSourceTable sourceTable = tableScan.getTable().unwrap(TableSourceTable.class); + DynamicTableSource dynamicTableSource = sourceTable.tableSource(); + // get old schema and new schema after add some cols + ResolvedSchema oldSchema = sourceTable.contextResolvedTable().getResolvedSchema(); + List<Column> newColumns = new ArrayList<>(oldSchema.getColumns()); + newColumns.addAll(metadataColumns); + // get the new resolved schema after adding extra meta columns + ResolvedSchema resolvedSchema = ResolvedSchema.of(newColumns); + + List<RelDataTypeField> oldFields = sourceTable.getRowType().getFieldList(); + List<RelDataTypeField> newFields = new ArrayList<>(sourceTable.getRowType().getFieldList()); + for (int i = 0; i < metadataColumns.size(); i++) { + MetadataColumn column = metadataColumns.get(i); + // add a new field + newFields.add( + new RelDataTypeFieldImpl( + column.getName(), + oldFields.size() + i, + typeFactory.createFieldTypeFromLogicalType( + column.getDataType().getLogicalType()))); + } + // create a copy for TableSourceTable with new resolved schema + TableSourceTable newTableSourceTab = + sourceTable.copy( + dynamicTableSource, + sourceTable.contextResolvedTable().copy(resolvedSchema), + new RelRecordType(StructKind.FULLY_QUALIFIED, newFields, false), + sourceTable.abilitySpecs()); + + // create a copy for table scan with new TableSourceTable + LogicalTableScan newTableScan = + new LogicalTableScan( + tableScan.getCluster(), + tableScan.getTraitSet(), + tableScan.getHints(), + newTableSourceTab); + Project project = (Project) tableModify.getInput(); + // replace with the new table scan + if (project.getInput() instanceof LogicalFilter) { + LogicalFilter logicalFilter = (LogicalFilter) project.getInput(); + project.replaceInput( + 0, + logicalFilter.copy( + logicalFilter.getTraitSet(), + newTableScan, + logicalFilter.getCondition())); + } else { + project.replaceInput(0, newTableScan); + } + // validate and apply metadata + DynamicSourceUtils.validateAndApplyMetadata( + tableDebugName, resolvedSchema, newTableSourceTab.tableSource()); + return resolvedSchema; + } + + private static RelNode projectColumnsForDelete( + LogicalTableModify tableModify, + ResolvedSchema resolvedSchema, + List<Integer> colIndexes, + String tableDebugName, + DataTypeFactory dataTypeFactory, + FlinkTypeFactory typeFactory) { + // now we know which columns we may need + List<RexNode> newRexNodeList = new ArrayList<>(); + List<String> newFieldNames = new ArrayList<>(); + List<DataType> deleteTargetDataTypes = new ArrayList<>(); + Project project = (Project) (tableModify.getInput()); + RexBuilder rexBuilder = tableModify.getCluster().getRexBuilder(); + // iterate each index for the column, create an input ref node for it. + for (int index : colIndexes) { + newRexNodeList.add(rexBuilder.makeInputRef(project.getInput(), index)); + newFieldNames.add(resolvedSchema.getColumnNames().get(index)); + deleteTargetDataTypes.add(resolvedSchema.getColumnDataTypes().get(index)); + } + // a project to only get specific columns + project = + project.copy( + project.getTraitSet(), + project.getInput(), + newRexNodeList, + RexUtil.createStructType(typeFactory, newRexNodeList, newFieldNames, null)); + return validateSchemaAndApplyImplicitCast( + project, deleteTargetDataTypes, tableDebugName, dataTypeFactory, typeFactory); + } + // -------------------------------------------------------------------------------------------- /** Temporary solution until we drop legacy types. */ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java index 989a20c6670..db5b4090877 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java @@ -33,15 +33,18 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.RowLevelModificationScanContext; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.expressions.converter.ExpressionConverter; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils; import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.types.DataType; @@ -157,6 +160,7 @@ public final class DynamicSourceUtils { if (source instanceof ScanTableSource) { validateScanSource( tableDebugName, schema, (ScanTableSource) source, isBatchMode, config); + prepareRowLevelModificationScan(source); } // lookup table source is validated in LookupJoin node @@ -378,7 +382,7 @@ public final class DynamicSourceUtils { .collect(Collectors.toList()); } - private static void validateAndApplyMetadata( + public static void validateAndApplyMetadata( String tableDebugName, ResolvedSchema schema, DynamicTableSource source) { final List<MetadataColumn> metadataColumns = extractMetadataColumns(schema); @@ -555,6 +559,26 @@ public final class DynamicSourceUtils { } } + private static void prepareRowLevelModificationScan(DynamicTableSource dynamicTableSource) { + // if the modification type has been set and the dynamic source supports row-level + // modification scan + if (RowLevelModificationContextUtils.getModificationType() != null + && dynamicTableSource instanceof SupportsRowLevelModificationScan) { + SupportsRowLevelModificationScan modificationScan = + (SupportsRowLevelModificationScan) dynamicTableSource; + // get the previous scan context + RowLevelModificationScanContext scanContext = + RowLevelModificationContextUtils.getScanContext(); + // pass the previous scan context to current table soruce and + // get a new scan context + RowLevelModificationScanContext newScanContext = + modificationScan.applyRowLevelModificationScan( + RowLevelModificationContextUtils.getModificationType(), scanContext); + // set the scan context + RowLevelModificationContextUtils.setScanContext(newScanContext); + } + } + private DynamicSourceUtils() { // no instantiation } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 1851cce8c11..d07f45a26a6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -120,6 +120,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.FieldReferenceExpression; @@ -194,6 +195,7 @@ import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.utils.Expander; import org.apache.flink.table.planner.utils.OperationConverterUtils; +import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils; import org.apache.flink.table.resource.ResourceType; import org.apache.flink.table.resource.ResourceUri; import org.apache.flink.table.types.DataType; @@ -285,6 +287,7 @@ public class SqlToOperationConverter { /** Convert a validated sql node to Operation. */ private static Optional<Operation> convertValidatedSqlNode( FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode validated) { + beforeConversion(); SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager); if (validated instanceof SqlCreateCatalog) { @@ -417,6 +420,11 @@ public class SqlToOperationConverter { // ~ Tools ------------------------------------------------------------------ + private static void beforeConversion() { + // clear row-level modification context + RowLevelModificationContextUtils.clearContext(); + } + /** Convert DROP TABLE statement. */ private Operation convertDropTable(SqlDropTable sqlDropTable) { UnresolvedIdentifier unresolvedIdentifier = @@ -1501,6 +1509,9 @@ public class SqlToOperationConverter { } private Operation convertDelete(SqlDelete sqlDelete) { + // set it's delete + RowLevelModificationContextUtils.setModificationType( + SupportsRowLevelModificationScan.RowLevelModificationType.DELETE); RelRoot updateRelational = flinkPlanner.rel(sqlDelete); LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel; UnresolvedIdentifier unresolvedTableIdentifier = @@ -1528,12 +1539,10 @@ public class SqlToOperationConverter { } } } - - // otherwise, delete push down is not applicable, throw unsupported exception - throw new UnsupportedOperationException( - String.format( - "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.", - unresolvedTableIdentifier.asSummaryString())); + // delete push down is not applicable, use row-level delete + PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify); + return new SinkModifyOperation( + contextResolvedTable, queryOperation, SinkModifyOperation.ModifyType.DELETE); } private void validateTableConstraint(SqlTableConstraint constraint) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java new file mode 100644 index 00000000000..04d9ca5bf12 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.sink; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level + * delete mode to/from JSON, but also can delete existing data for {@link + * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeName("RowLevelDelete") +public class RowLevelDeleteSpec implements SinkAbilitySpec { + public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = "rowLevelDeleteMode"; + + @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) + @Nonnull + private final SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode; + + @JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext; + + @JsonCreator + public RowLevelDeleteSpec( + @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) @Nonnull + SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode, + @Nullable RowLevelModificationScanContext scanContext) { + this.rowLevelDeleteMode = Preconditions.checkNotNull(rowLevelDeleteMode); + this.scanContext = scanContext; + } + + @Override + public void apply(DynamicTableSink tableSink) { + if (tableSink instanceof SupportsRowLevelDelete) { + ((SupportsRowLevelDelete) tableSink).applyRowLevelDelete(scanContext); + } else { + throw new TableException( + String.format( + "%s does not support SupportsRowLevelDelete.", + tableSink.getClass().getName())); + } + } + + public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() { + return rowLevelDeleteMode; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RowLevelDeleteSpec that = (RowLevelDeleteSpec) o; + return rowLevelDeleteMode == that.rowLevelDeleteMode + && Objects.equals(scanContext, that.scanContext); + } + + @Override + public int hashCode() { + return Objects.hash(rowLevelDeleteMode, scanContext); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java index 25d2d78eda1..fcfb6078433 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java @@ -34,7 +34,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp @JsonSubTypes({ @JsonSubTypes.Type(value = OverwriteSpec.class), @JsonSubTypes.Type(value = PartitioningSpec.class), - @JsonSubTypes.Type(value = WritingMetadataSpec.class) + @JsonSubTypes.Type(value = WritingMetadataSpec.class), + @JsonSubTypes.Type(value = RowLevelDeleteSpec.class) }) @Internal public interface SinkAbilitySpec { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 7f978565060..e97791a9b1f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -47,9 +47,12 @@ import org.apache.flink.table.connector.sink.OutputFormatProvider; import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator; import org.apache.flink.table.planner.connectors.TransformationSinkProvider; +import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec; +import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; @@ -66,6 +69,7 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer; +import org.apache.flink.table.runtime.operators.sink.RowKindSetter; import org.apache.flink.table.runtime.operators.sink.SinkOperator; import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer; import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter; @@ -99,6 +103,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> public static final String PARTITIONER_TRANSFORMATION = "partitioner"; public static final String UPSERT_MATERIALIZE_TRANSFORMATION = "upsert-materialize"; public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter"; + public static final String ROW_KIND_SETTER = "row-kind-setter"; public static final String SINK_TRANSFORMATION = "sink"; public static final String FIELD_NAME_DYNAMIC_TABLE_SINK = "dynamicTableSink"; @@ -199,6 +204,11 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> inputUpsertKey); } + Optional<RowKind> targetRowKind = getTargetRowKind(); + if (targetRowKind.isPresent()) { + sinkTransform = applyRowKindSetter(sinkTransform, targetRowKind.get(), config); + } + return (Transformation<Object>) applySinkProvider( sinkTransform, @@ -454,6 +464,20 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> return materializeTransform; } + private Transformation<RowData> applyRowKindSetter( + Transformation<RowData> inputTransform, RowKind rowKind, ExecNodeConfig config) { + return ExecNodeUtil.createOneInputTransformation( + inputTransform, + createTransformationMeta( + ROW_KIND_SETTER, + String.format("RowKindSetter(TargetRowKind=[%s])", rowKind), + "RowKindSetter", + config), + new RowKindSetter(rowKind), + inputTransform.getOutputType(), + inputTransform.getParallelism()); + } + private Transformation<?> applySinkProvider( Transformation<RowData> inputTransform, StreamExecutionEnvironment env, @@ -618,4 +642,24 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> CHAR, BINARY } + + /** + * Get the target row-kind that the row data should change to, assuming the current row kind is + * RowKind.INSERT. Return Optional.empty() if it doesn't need to change. Currently, it'll only + * consider row-level delete. + */ + private Optional<RowKind> getTargetRowKind() { + if (tableSinkSpec.getSinkAbilities() != null) { + for (SinkAbilitySpec sinkAbilitySpec : tableSinkSpec.getSinkAbilities()) { + if (sinkAbilitySpec instanceof RowLevelDeleteSpec) { + RowLevelDeleteSpec deleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec; + if (deleteSpec.getRowLevelDeleteMode() + == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) { + return Optional.of(RowKind.DELETE); + } + } + } + } + return Optional.empty(); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/RowLevelModificationContextUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/RowLevelModificationContextUtils.java new file mode 100644 index 00000000000..71b7baf341e --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/RowLevelModificationContextUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +/** + * Utils for putting/getting {@link RowLevelModificationScanContext}/{@link + * SupportsRowLevelModificationScan.RowLevelModificationType} in current thread, it enables to + * put/get them in different methods. + */ +public class RowLevelModificationContextUtils { + private static final ThreadLocal<SupportsRowLevelModificationScan.RowLevelModificationType> + modificationTypeThreadLocal = new ThreadLocal<>(); + + private static final ThreadLocal<RowLevelModificationScanContext> scanContextThreadLocal = + new ThreadLocal<>(); + + public static void setModificationType( + SupportsRowLevelModificationScan.RowLevelModificationType rowLevelModificationType) { + modificationTypeThreadLocal.set(rowLevelModificationType); + } + + public static SupportsRowLevelModificationScan.RowLevelModificationType getModificationType() { + return modificationTypeThreadLocal.get(); + } + + public static void setScanContext(RowLevelModificationScanContext scanContext) { + scanContextThreadLocal.set(scanContext); + } + + public static RowLevelModificationScanContext getScanContext() { + return scanContextThreadLocal.get(); + } + + public static void clearContext() { + modificationTypeThreadLocal.remove(); + scanContextThreadLocal.remove(); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index fab1ee0db84..cf9f302164e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -237,6 +237,20 @@ abstract class PlannerBase( case (table, sink: TableSink[_]) => // Legacy tables can't be anonymous val identifier = catalogSink.getContextResolvedTable.getIdentifier + + // check it's not for UPDATE/DELETE because they're not supported for Legacy table + if (catalogSink.isDelete || catalogSink.isUpdate) { + throw new TableException( + String.format( + "Can't perform %s operation of the table %s " + + " because the corresponding table sink is the legacy TableSink," + + " Please implement %s for it.", + if (catalogSink.isDelete) "delete" else "update", + identifier, + classOf[DynamicTableSink].getName + )) + } + // check the logical field type and physical field type are compatible val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType) // validate logical schema and physical schema are compatible diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala index c7f8ae95422..c5d1ea41004 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala @@ -115,6 +115,36 @@ class TableSourceTable( ) } + /** + * Creates a copy of this table with specified digest and context resolved table + * + * @param newTableSource + * tableSource to replace + * @param newResolveTable + * resolved table to replace + * @param newRowType + * new row type + * @return + * added TableSourceTable instance with specified digest + */ + def copy( + newTableSource: DynamicTableSource, + newResolveTable: ContextResolvedTable, + newRowType: RelDataType, + newAbilitySpecs: Array[SourceAbilitySpec]): TableSourceTable = { + new TableSourceTable( + relOptSchema, + newRowType, + statistic, + newTableSource, + isStreamingMode, + newResolveTable, + flinkContext, + flinkTypeFactory, + abilitySpecs ++ newAbilitySpecs + ) + } + /** * Creates a copy of this table with specified digest and statistic. * diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java index df114031ecf..b7c0ce53d11 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java @@ -21,16 +21,31 @@ package org.apache.flink.table.planner.factories; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.FieldReferenceExpression; @@ -41,6 +56,10 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.jetbrains.annotations.Nullable; import java.util.ArrayList; import java.util.Arrays; @@ -82,6 +101,34 @@ public class TestUpdateDeleteTableFactory .defaultValue(true) .withDescription("Whether the table supports delete push down."); + private static final ConfigOption<Boolean> MIX_DELETE = + ConfigOptions.key("mix-delete") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether the table support both delete push down and row-level delete. " + + "Note: for supporting delete push down, only the filter pushed is empty, can the filter be accepted."); + + private static final ConfigOption<SupportsRowLevelDelete.RowLevelDeleteMode> DELETE_MODE = + ConfigOptions.key("delete-mode") + .enumType(SupportsRowLevelDelete.RowLevelDeleteMode.class) + .defaultValue(SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) + .withDescription("The delete mode for row level delete."); + + private static final ConfigOption<List<String>> REQUIRED_COLUMNS_FOR_DELETE = + ConfigOptions.key("required-columns-for-delete") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "The columns' name for the required columns in row-level delete"); + + private static final List<Column.MetadataColumn> META_COLUMNS = + Arrays.asList( + Column.metadata("g", DataTypes.STRING(), null, true), + Column.metadata("meta_f1", DataTypes.INT().notNull(), null, false), + Column.metadata("meta_f2", DataTypes.STRING().notNull(), "meta_k2", false)); + private static final AtomicInteger idCounter = new AtomicInteger(0); private static final Map<String, Collection<RowData>> registeredRowData = new HashMap<>(); @@ -97,13 +144,29 @@ public class TestUpdateDeleteTableFactory helper.validate(); String dataId = helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get())); - if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) { - return new SupportsDeletePushDownSink( + SupportsRowLevelDelete.RowLevelDeleteMode deleteMode = helper.getOptions().get(DELETE_MODE); + List<String> requireCols = helper.getOptions().get(REQUIRED_COLUMNS_FOR_DELETE); + if (helper.getOptions().get(MIX_DELETE)) { + return new SupportsDeleteSink( + context.getObjectIdentifier(), + context.getCatalogTable(), + deleteMode, dataId, - helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE), - context.getCatalogTable()); + requireCols); } else { - return new TestSink(); + if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) { + return new SupportsDeletePushDownSink( + dataId, + helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE), + context.getCatalogTable()); + } else { + return new SupportsRowLevelDeleteSink( + context.getObjectIdentifier(), + context.getCatalogTable(), + deleteMode, + dataId, + requireCols); + } } } @@ -114,7 +177,7 @@ public class TestUpdateDeleteTableFactory String dataId = helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get())); - return new TestTableSource(dataId); + return new TestTableSource(dataId, context.getObjectIdentifier()); } @Override @@ -130,20 +193,29 @@ public class TestUpdateDeleteTableFactory @Override public Set<ConfigOption<?>> optionalOptions() { return new HashSet<>( - Arrays.asList(DATA_ID, ONLY_ACCEPT_EQUAL_PREDICATE, SUPPORT_DELETE_PUSH_DOWN)); + Arrays.asList( + DATA_ID, + ONLY_ACCEPT_EQUAL_PREDICATE, + SUPPORT_DELETE_PUSH_DOWN, + MIX_DELETE, + DELETE_MODE, + REQUIRED_COLUMNS_FOR_DELETE)); } /** A test table source which supports reading metadata. */ - private static class TestTableSource implements ScanTableSource { + private static class TestTableSource + implements ScanTableSource, SupportsReadingMetadata, SupportsRowLevelModificationScan { private final String dataId; + private final ObjectIdentifier tableIdentifier; - public TestTableSource(String dataId) { + public TestTableSource(String dataId, ObjectIdentifier tableIdentifier) { this.dataId = dataId; + this.tableIdentifier = tableIdentifier; } @Override public DynamicTableSource copy() { - return new TestTableSource(dataId); + return new TestTableSource(dataId, tableIdentifier); } @Override @@ -175,6 +247,37 @@ public class TestUpdateDeleteTableFactory } }; } + + @Override + public Map<String, DataType> listReadableMetadata() { + Map<String, DataType> metaData = new HashMap<>(); + META_COLUMNS.forEach( + column -> + metaData.put( + column.getMetadataKey().orElse(column.getName()), + column.getDataType())); + return metaData; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {} + + @Override + public RowLevelModificationScanContext applyRowLevelModificationScan( + RowLevelModificationType rowLevelModificationType, + @Nullable RowLevelModificationScanContext previousContext) { + TestScanContext scanContext = + previousContext == null + ? new TestScanContext() + : (TestScanContext) previousContext; + scanContext.scanTables.add(tableIdentifier); + return scanContext; + } + } + + /** A test scan context for row-level modification scan. */ + private static class TestScanContext implements RowLevelModificationScanContext { + private final Set<ObjectIdentifier> scanTables = new HashSet<>(); } /** A common test sink. */ @@ -192,12 +295,188 @@ public class TestUpdateDeleteTableFactory @Override public DynamicTableSink copy() { - return null; + return new TestSink(); } @Override public String asSummaryString() { - return null; + return "Test Sink"; + } + } + + /** A sink that supports delete push down. */ + private static class SupportsRowLevelDeleteSink extends TestSink + implements SupportsRowLevelDelete { + + private final ObjectIdentifier tableIdentifier; + private final ResolvedCatalogTable resolvedCatalogTable; + private final RowLevelDeleteMode deleteMode; + protected final String dataId; + private final List<String> requireColumnsForDelete; + + private boolean isDelete; + + public SupportsRowLevelDeleteSink( + ObjectIdentifier tableIdentifier, + ResolvedCatalogTable resolvedCatalogTable, + RowLevelDeleteMode deleteMode, + String dataId, + List<String> requireColumnsForDelete) { + this( + tableIdentifier, + resolvedCatalogTable, + deleteMode, + dataId, + requireColumnsForDelete, + false); + } + + public SupportsRowLevelDeleteSink( + ObjectIdentifier tableIdentifier, + ResolvedCatalogTable resolvedCatalogTable, + RowLevelDeleteMode deleteMode, + String dataId, + List<String> requireColumnsForDelete, + boolean isDelete) { + this.tableIdentifier = tableIdentifier; + this.resolvedCatalogTable = resolvedCatalogTable; + this.deleteMode = deleteMode; + this.dataId = dataId; + this.requireColumnsForDelete = requireColumnsForDelete; + this.isDelete = isDelete; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.all(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return new DataStreamSinkProvider() { + @Override + public DataStreamSink<?> consumeDataStream( + ProviderContext providerContext, DataStream<RowData> dataStream) { + return dataStream + .addSink( + new DeleteDataSinkFunction( + dataId, + getAllFieldGetter( + resolvedCatalogTable.getResolvedSchema()), + deleteMode)) + .setParallelism(1); + } + }; + } + + @Override + public DynamicTableSink copy() { + return new SupportsRowLevelDeleteSink( + tableIdentifier, + resolvedCatalogTable, + deleteMode, + dataId, + requireColumnsForDelete, + isDelete); + } + + @Override + public String asSummaryString() { + return "support row-level delete sink"; + } + + @Override + public RowLevelDeleteInfo applyRowLevelDelete( + @Nullable RowLevelModificationScanContext context) { + // the context should contain the object identifier of the table to be written + Preconditions.checkArgument(context instanceof TestScanContext); + TestScanContext scanContext = (TestScanContext) context; + Preconditions.checkArgument( + scanContext.scanTables.contains(tableIdentifier), + String.format( + "The scan context should contains the object identifier for table %s in row-level delete.", + tableIdentifier)); + + this.isDelete = true; + return new RowLevelDeleteInfo() { + @Override + public Optional<List<Column>> requiredColumns() { + List<Column> requiredCols = null; + if (requireColumnsForDelete != null) { + requiredCols = + getRequiredColumns( + requireColumnsForDelete, + resolvedCatalogTable.getResolvedSchema()); + } + return Optional.ofNullable(requiredCols); + } + + @Override + public RowLevelDeleteMode getRowLevelDeleteMode() { + return deleteMode; + } + }; + } + } + + /** The sink for delete existing data. */ + private static class DeleteDataSinkFunction extends RichSinkFunction<RowData> { + private final String dataId; + private final RowData.FieldGetter[] fieldGetters; + private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode; + + private transient Collection<RowData> data; + private transient List<RowData> newData; + + DeleteDataSinkFunction( + String dataId, + RowData.FieldGetter[] fieldGetters, + SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) { + this.dataId = dataId; + this.fieldGetters = fieldGetters; + this.deleteMode = deleteMode; + } + + @Override + public void open(Configuration parameters) { + data = registeredRowData.get(dataId); + newData = new ArrayList<>(); + } + + @Override + public void invoke(RowData value, Context context) { + if (deleteMode == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) { + consumeDeletedRows(value); + } else if (deleteMode == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) { + consumeRemainingRows(value); + } else { + throw new TableException(String.format("Unknown delete mode: %s.", deleteMode)); + } + } + + private void consumeDeletedRows(RowData deletedRow) { + Preconditions.checkState( + deletedRow.getRowKind() == RowKind.DELETE, + String.format( + "The RowKind for the coming rows should be %s in delete mode %s.", + RowKind.DELETE, DELETE_MODE)); + data.removeIf(rowData -> equal(rowData, deletedRow, fieldGetters)); + } + + private void consumeRemainingRows(RowData remainingRow) { + Preconditions.checkState( + remainingRow.getRowKind() == RowKind.INSERT, + String.format( + "The RowKind for the coming rows should be %s in delete mode %s.", + RowKind.INSERT, DELETE_MODE)); + newData.add(copyRowData(remainingRow, fieldGetters)); + } + + @Override + public void finish() { + if (deleteMode == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) { + registeredRowData.put(dataId, newData); + } } } @@ -315,6 +594,41 @@ public class TestUpdateDeleteTableFactory return true; } + /** A sink that supports both delete push down and row-level delete. */ + private static class SupportsDeleteSink extends SupportsRowLevelDeleteSink + implements SupportsDeletePushDown { + + public SupportsDeleteSink( + ObjectIdentifier tableIdentifier, + ResolvedCatalogTable resolvedCatalogTable, + SupportsRowLevelDelete.RowLevelDeleteMode deleteMode, + String dataId, + List<String> requireColumnsForDelete) { + super( + tableIdentifier, + resolvedCatalogTable, + deleteMode, + dataId, + requireColumnsForDelete); + } + + @Override + public boolean applyDeleteFilters(List<ResolvedExpression> filters) { + // only accept when the filters are empty + return filters.isEmpty(); + } + + @Override + public Optional<Long> executeDeletion() { + Collection<RowData> oldRows = registeredRowData.get(dataId); + if (oldRows != null) { + registeredRowData.put(dataId, new ArrayList<>()); + return Optional.of((long) oldRows.size()); + } + return Optional.empty(); + } + } + private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) { List<DataType> dataTypes = resolvedSchema.getColumnDataTypes(); RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[dataTypes.size()]; @@ -323,4 +637,50 @@ public class TestUpdateDeleteTableFactory } return fieldGetters; } + + private static boolean equal( + RowData value1, RowData value2, RowData.FieldGetter[] fieldGetters) { + for (RowData.FieldGetter fieldGetter : fieldGetters) { + if (!Objects.equals( + fieldGetter.getFieldOrNull(value1), fieldGetter.getFieldOrNull(value2))) { + return false; + } + } + return true; + } + + private static RowData copyRowData(RowData rowData, RowData.FieldGetter[] fieldGetters) { + Object[] values = new Object[fieldGetters.length]; + for (int i = 0; i < fieldGetters.length; i++) { + values[i] = fieldGetters[i].getFieldOrNull(rowData); + } + return GenericRowData.of(values); + } + + private static List<Column> getRequiredColumns( + List<String> requiredColName, ResolvedSchema schema) { + List<Column> requiredCols = new ArrayList<>(); + for (String colName : requiredColName) { + Optional<Column> optionalColumn = schema.getColumn(colName); + if (optionalColumn.isPresent()) { + requiredCols.add(optionalColumn.get()); + } else { + Column metaCol = null; + for (Column.MetadataColumn metadataColumn : META_COLUMNS) { + String metaColName = + metadataColumn.getMetadataKey().orElse(metadataColumn.getName()); + if (metaColName.equals(colName)) { + metaCol = metadataColumn; + break; + } + } + if (metaCol == null) { + throw new TableException( + String.format("Can't find the required column: `%s`.", colName)); + } + requiredCols.add(metaCol); + } + } + return requiredCols; + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 129b969b9a0..86d09ee4be5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -2710,7 +2710,7 @@ public class SqlToOperationConverterTest { } @Test - public void testDeletePushDown() throws Exception { + public void testDelete() throws Exception { Map<String, String> options = new HashMap<>(); options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER); CatalogTable catalogTable = @@ -2722,31 +2722,25 @@ public class SqlToOperationConverterTest { null, Collections.emptyList(), options); - ObjectIdentifier tableIdentifier = - ObjectIdentifier.of("builtin", "default", "test_push_down"); + ObjectIdentifier tableIdentifier = ObjectIdentifier.of("builtin", "default", "test_delete"); catalogManager.createTable(catalogTable, tableIdentifier, false); // no filter in delete statement - Operation operation = parse("DELETE FROM test_push_down"); + Operation operation = parse("DELETE FROM test_delete"); checkDeleteFromFilterOperation(operation, "[]"); // with filters in delete statement - operation = parse("DELETE FROM test_push_down where a = 1 and c = '123'"); + operation = parse("DELETE FROM test_delete where a = 1 and c = '123'"); checkDeleteFromFilterOperation(operation, "[equals(a, 1), equals(c, '123')]"); // with filter = false after reduced in delete statement - operation = parse("DELETE FROM test_push_down where a = 1 + 6 and a = 2"); + operation = parse("DELETE FROM test_delete where a = 1 + 6 and a = 2"); checkDeleteFromFilterOperation(operation, "[false]"); - assertThatThrownBy( - () -> - parse( - "DELETE FROM test_push_down where a = (select count(*) from test_push_down)")) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage( - String.format( - "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.", - tableIdentifier.asSerializableString())); + operation = parse("DELETE FROM test_delete where a = (select count(*) from test_delete)"); + assertThat(operation).isInstanceOf(SinkModifyOperation.class); + SinkModifyOperation modifyOperation = (SinkModifyOperation) operation; + assertThat(modifyOperation.isDelete()).isTrue(); } // ~ Tool Methods ---------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.java new file mode 100644 index 00000000000..7bc295ce6fd --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.batch.sql; + +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import scala.collection.Seq; + +/** Test for row-level delete. */ +@RunWith(Parameterized.class) +public class RowLevelDeleteTest extends TableTestBase { + + private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode; + private final Seq<ExplainDetail> explainDetails = + JavaScalaConversionUtil.toScala( + Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)); + + private BatchTableTestUtil util; + + @Parameterized.Parameters(name = "deleteMode = {0}") + public static Collection<SupportsRowLevelDelete.RowLevelDeleteMode> data() { + return Arrays.asList( + SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS, + SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS); + } + + public RowLevelDeleteTest(SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) { + this.deleteMode = deleteMode; + } + + @Before + public void before() { + util = batchTestUtil(TableConfig.getDefault()); + util.tableEnv() + .getConfig() + .getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 12); + } + + @Test + public void testDeleteWithoutFilter() { + createTableForDelete(); + util.verifyExplainInsert("DELETE FROM t", explainDetails); + } + + @Test + public void testDeleteWithFilter() { + createTableForDelete(); + util.verifyExplainInsert("DELETE FROM t where a = 1 and b = '123'", explainDetails); + } + + @Test + public void testDeleteWithSubQuery() { + createTableForDelete(); + util.verifyExplainInsert( + "DELETE FROM t where b = '123' and a = (select count(*) from t)", explainDetails); + } + + @Test + public void testDeleteWithCustomColumns() { + util.tableEnv() + .executeSql( + String.format( + "CREATE TABLE t (a int, b string, c double) WITH" + + " (" + + "'connector' = 'test-update-delete', " + + "'required-columns-for-delete' = 'b;c', " + + "'delete-mode' = '%s', 'support-delete-push-down' = 'false'" + + ") ", + deleteMode)); + util.verifyExplainInsert("DELETE FROM t where b = '123'", explainDetails); + } + + @Test + public void testDeleteWithMetaColumns() { + util.tableEnv() + .executeSql( + String.format( + "CREATE TABLE t (a int, b string, c double) WITH" + + " (" + + "'connector' = 'test-update-delete', " + + "'required-columns-for-delete' = 'meta_f1;meta_k2;b', " + + "'delete-mode' = '%s', 'support-delete-push-down' = 'false'" + + ") ", + deleteMode)); + util.verifyExplainInsert("DELETE FROM t where b = '123'", explainDetails); + } + + private void createTableForDelete() { + util.tableEnv() + .executeSql( + String.format( + "CREATE TABLE t (a int, b string) WITH " + + "('connector' = 'test-update-delete'," + + " 'delete-mode' = '%s', 'support-delete-push-down' = 'false') ", + deleteMode)); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java index 8a60d24dfbe..ad706fc9466 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java @@ -18,6 +18,11 @@ package org.apache.flink.table.planner.runtime.batch.sql; +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; @@ -27,17 +32,35 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** The IT case for DELETE statement in batch mode. */ +@RunWith(Parameterized.class) public class DeleteTableITCase extends BatchTestBase { private static final int ROW_NUM = 5; + private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode; + + @Parameterized.Parameters(name = "deleteMode = {0}") + public static Collection<SupportsRowLevelDelete.RowLevelDeleteMode> data() { + return Arrays.asList( + SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS, + SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS); + } + + public DeleteTableITCase(SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) { + this.deleteMode = deleteMode; + } + @Test public void testDeletePushDown() throws Exception { String dataId = registerData(); @@ -50,11 +73,9 @@ public class DeleteTableITCase extends BatchTestBase { + ")", dataId)); // it only contains equal expression, should be pushed down - List<Row> rows = - CollectionUtil.iteratorToList( - tEnv().executeSql("DELETE FROM t WHERE a = 1").collect()); + List<Row> rows = toRows(tEnv().executeSql("DELETE FROM t where a = 1")); assertThat(rows.toString()).isEqualTo("[+I[1], +I[OK]]"); - rows = CollectionUtil.iteratorToList(tEnv().executeSql("SELECT * FROM t").collect()); + rows = toRows(tEnv().executeSql("SELECT * FROM t")); assertThat(rows.toString()) .isEqualTo("[+I[0, b_0, 0.0], +I[2, b_2, 4.0], +I[3, b_3, 6.0], +I[4, b_4, 8.0]]"); @@ -63,19 +84,98 @@ public class DeleteTableITCase extends BatchTestBase { assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t where a > 1")) .isInstanceOf(UnsupportedOperationException.class) .hasMessage( - "Only delete push down is supported currently, " - + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t`."); + String.format( + "Can't perform delete operation of the table %s because the corresponding dynamic table sink has not yet implemented %s.", + "default_catalog.default_database.t", + SupportsRowLevelDelete.class.getName())); + } + @Test + public void testRowLevelDelete() throws Exception { + String dataId = registerData(); tEnv().executeSql( - "CREATE TABLE t1 (a int) WITH" - + " ('connector' = 'test-update-delete'," - + " 'support-delete-push-down' = 'false')"); - // should throw exception for sink that doesn't implement SupportsDeletePushDown interface - assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t1")) - .isInstanceOf(UnsupportedOperationException.class) + String.format( + "CREATE TABLE t (a int, b string, c double) WITH" + + " ('connector' = 'test-update-delete'," + + " 'data-id' = '%s'," + + " 'delete-mode' = '%s'," + + " 'support-delete-push-down' = 'false'" + + ")", + dataId, deleteMode)); + tEnv().executeSql("DELETE FROM t WHERE a > 1").await(); + List<Row> rows = toRows(tEnv().executeSql("SELECT * FROM t")); + assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 2.0]]"); + + tEnv().executeSql("DELETE FROM t WHERE a >= (select count(1) from t where c > 1)").await(); + rows = toRows(tEnv().executeSql("SELECT * FROM t")); + assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0]]"); + } + + @Test + public void testMixDelete() throws Exception { + // test mix delete push down and row-level delete + String dataId = registerData(); + tEnv().executeSql( + String.format( + "CREATE TABLE t (a int, b string, c double) WITH" + + " ('connector' = 'test-update-delete'," + + " 'data-id' = '%s'," + + " 'mix-delete' = 'true')", + dataId)); + // the deletion can't be pushed down, but the deletion should still success. + tEnv().executeSql("DELETE FROM t WHERE a >= (select count(1) from t where c > 2)").await(); + List<Row> rows = toRows(tEnv().executeSql("SELECT * FROM t")); + assertThat(rows.toString()) + .isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 2.0], +I[2, b_2, 4.0]]"); + + // should fall back to delete push down + rows = toRows(tEnv().executeSql("DELETE FROM t")); + assertThat(rows.toString()).isEqualTo("[+I[3], +I[OK]]"); + rows = toRows(tEnv().executeSql("SELECT * FROM t")); + assertThat(rows).isEmpty(); + } + + @Test + public void testStatementSetContainDeleteAndInsert() throws Exception { + tEnv().executeSql( + "CREATE TABLE t (a int, b string, c double) WITH" + + " ('connector' = 'test-update-delete')"); + StatementSet statementSet = tEnv().createStatementSet(); + // should throw exception when statement set contains insert and delete statement + statementSet.addInsertSql("INSERT INTO t VALUES (1, 'v1', 1)"); + statementSet.addInsertSql("DELETE FROM t"); + assertThatThrownBy(statementSet::execute) + .isInstanceOf(TableException.class) + .hasMessage( + "Unsupported SQL query! Only accept a single SQL statement of type DELETE."); + } + + @Test + public void testCompilePlanSql() throws Exception { + tEnv().executeSql( + "CREATE TABLE t (a int, b string, c double) WITH" + + " ('connector' = 'test-update-delete')"); + // should throw exception when compile sql for delete statement + assertThatThrownBy(() -> tEnv().compilePlanSql("DELETE FROM t")) + .isInstanceOf(TableException.class) .hasMessage( - "Only delete push down is supported currently, " - + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t1`."); + "Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT"); + } + + @Test + public void testDeleteWithLegacyTableSink() { + tEnv().executeSql( + "CREATE TABLE t (a int, b string, c double) WITH" + + " ('connector' = 'COLLECTION')"); + assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t")) + .isInstanceOf(TableException.class) + .hasMessage( + String.format( + "Can't perform delete operation of the table %s " + + " because the corresponding table sink is the legacy TableSink," + + " Please implement %s for it.", + "`default_catalog`.`default_database`.`t`", + DynamicTableSink.class.getName())); } private String registerData() { @@ -90,4 +190,8 @@ public class DeleteTableITCase extends BatchTestBase { } return values; } + + private List<Row> toRows(TableResult result) { + return CollectionUtil.iteratorToList(result.collect()); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java index e6ba79ef2b4..fe96af95d52 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java @@ -34,5 +34,12 @@ public class DeleteTableITCase extends StreamingTestBase { assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t")) .isInstanceOf(TableException.class) .hasMessage("DELETE statement is not supported for streaming mode now."); + + tEnv().executeSql( + "CREATE TABLE t1 (a int) WITH ('connector' = 'test-update-delete', " + + "'support-delete-push-down' = 'false')"); + assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t1")) + .isInstanceOf(TableException.class) + .hasMessage("DELETE statement is not supported for streaming mode now."); } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml new file mode 100644 index 00000000000..b835e944ea3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml @@ -0,0 +1,714 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testDeleteWithoutFilter[deleteMode = DELETED_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])", + "parallelism" : 1 + }, { + "id" : , + "type" : "RowKindSetter[]", + "pact" : "Operator", + "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithFilter[deleteMode = DELETED_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalFilter(condition=[AND(=($0, 1), =($1, _UTF-16LE'123'))]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[CAST(1 AS INTEGER) AS a, CAST('123' AS VARCHAR(2147483647)) AS b], where=[AND(=(a, 1), =(b, '123'))]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[CAST(1 AS INTEGER) AS a, CAST('123' AS VARCHAR(2147483647)) AS b], where=[((a = 1) AND (b = '123'))]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[CAST(1 AS INTEGER) AS a, CAST('123' AS VARCHAR(2147483647)) AS b], where=[((a = 1) AND (b = '123'))])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "RowKindSetter[]", + "pact" : "Operator", + "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithSubQuery[deleteMode = DELETED_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalFilter(condition=[AND(=($1, _UTF-16LE'123'), =(CAST($0):BIGINT, $SCALAR_QUERY({ +LogicalAggregate(group=[{}], EXPR$0=[COUNT()]) + LogicalProject($f0=[0]) + LogicalTableScan(table=[[default_catalog, default_database, t]]) +})))]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[a, CAST('123' AS VARCHAR(2147483647)) AS b]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, EXPR$0)], select=[a, a0, EXPR$0], build=[right], singleRowJoin=[true]) + :- Calc(select=[a, CAST(a AS BIGINT) AS a0], where=[=(b, '123')]) + : +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +- Exchange(distribution=[broadcast]) + +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[a, CAST('123' AS VARCHAR(2147483647)) AS b]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[(a0 = EXPR$0)], select=[a, a0, EXPR$0], build=[right], singleRowJoin=[true]) + :- Exchange(distribution=[any], shuffle_mode=[BATCH]) + : +- Calc(select=[a, CAST(a AS BIGINT) AS a0], where=[(b = '123')]) + : +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])(reuse_id=[1]) + +- Exchange(distribution=[broadcast]) + +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- Reused(reference_id=[1]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[a, CAST(a AS BIGINT) AS a0], where=[(b = '123')])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[0 AS $f0])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "HashAggregate[]", + "pact" : "Operator", + "contents" : "[]:LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "HashAggregate[]", + "pact" : "Operator", + "contents" : "[]:HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "GLOBAL", + "side" : "second" + } ] + }, { + "id" : , + "type" : "NestedLoopJoin[]", + "pact" : "Operator", + "contents" : "[]:NestedLoopJoin(joinType=[InnerJoin], where=[(a0 = EXPR$0)], select=[a, a0, EXPR$0], build=[right], singleRowJoin=[true])", + "parallelism" : 12, + "predecessors" : [ { + "id" : , + "ship_strategy" : "REBALANCE", + "side" : "second" + }, { + "id" : , + "ship_strategy" : "BROADCAST", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[a, CAST('123' AS VARCHAR(2147483647)) AS b])", + "parallelism" : 12, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "RowKindSetter[]", + "pact" : "Operator", + "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])", + "parallelism" : 12, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "REBALANCE", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithCustomColumns[deleteMode = DELETED_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[b, c]) ++- LogicalProject(b=[$1], c=[$2]) + +- LogicalFilter(condition=[=($1, _UTF-16LE'123')]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[b, c]) ++- Calc(select=[CAST('123' AS VARCHAR(2147483647)) AS b, c], where=[=(b, '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[b, c]) ++- Calc(select=[CAST('123' AS VARCHAR(2147483647)) AS b, c], where=[(b = '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[CAST('123' AS VARCHAR(2147483647)) AS b, c], where=[(b = '123')])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "RowKindSetter[]", + "pact" : "Operator", + "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithMetaColumns[deleteMode = DELETED_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b]) ++- LogicalProject(meta_f1=[$3], meta_f2=[$4], b=[$1]) + +- LogicalFilter(condition=[=($1, _UTF-16LE'123')]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b]) ++- Calc(select=[meta_f1, meta_f2, CAST('123' AS VARCHAR(2147483647)) AS b], where=[=(b, '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b]) ++- Calc(select=[meta_f1, meta_f2, CAST('123' AS VARCHAR(2147483647)) AS b], where=[(b = '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[meta_f1, meta_f2, CAST('123' AS VARCHAR(2147483647)) AS b], where=[(b = '123')])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "RowKindSetter[]", + "pact" : "Operator", + "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithoutFilter[deleteMode = REMAINING_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalFilter(condition=[false]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Values(tuples=[[]], values=[a, b]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Values(tuples=[[]], values=[a, b]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: Values[]", + "pact" : "Data Source", + "contents" : "[]:Values(tuples=[[]], values=[a, b])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithFilter[deleteMode = REMAINING_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalFilter(condition=[NOT(AND(=($0, 1), =($1, _UTF-16LE'123')))]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[a, b], where=[OR(<>(a, 1), <>(b, '123'))]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[a, b], where=[((a <> 1) OR (b <> '123'))]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[a, b], where=[((a <> 1) OR (b <> '123'))])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithSubQuery[deleteMode = REMAINING_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[a, b]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalFilter(condition=[NOT(AND(=($1, _UTF-16LE'123'), =(CAST($0):BIGINT, $SCALAR_QUERY({ +LogicalAggregate(group=[{}], EXPR$0=[COUNT()]) + LogicalProject($f0=[0]) + LogicalTableScan(table=[[default_catalog, default_database, t]]) +}))))]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[a, b], where=[OR(<>(b, '123'), <>(CAST(a AS BIGINT), EXPR$0))]) + +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, EXPR$0], build=[right], singleRowJoin=[true]) + :- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +- Exchange(distribution=[broadcast]) + +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[a, b]) ++- Calc(select=[a, b], where=[((b <> '123') OR (CAST(a AS BIGINT) <> EXPR$0))]) + +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, EXPR$0], build=[right], singleRowJoin=[true]) + :- Exchange(distribution=[any], shuffle_mode=[BATCH]) + : +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])(reuse_id=[1]) + +- Exchange(distribution=[broadcast]) + +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- Reused(reference_id=[1]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[0 AS $f0])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "HashAggregate[]", + "pact" : "Operator", + "contents" : "[]:LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "HashAggregate[]", + "pact" : "Operator", + "contents" : "[]:HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "GLOBAL", + "side" : "second" + } ] + }, { + "id" : , + "type" : "NestedLoopJoin[]", + "pact" : "Operator", + "contents" : "[]:NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, EXPR$0], build=[right], singleRowJoin=[true])", + "parallelism" : 12, + "predecessors" : [ { + "id" : , + "ship_strategy" : "REBALANCE", + "side" : "second" + }, { + "id" : , + "ship_strategy" : "BROADCAST", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[a, b], where=[((b <> '123') OR (CAST(a AS BIGINT) <> EXPR$0))])", + "parallelism" : 12, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "REBALANCE", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithCustomColumns[deleteMode = REMAINING_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[b, c]) ++- LogicalProject(b=[$1], c=[$2]) + +- LogicalFilter(condition=[NOT(=($1, _UTF-16LE'123'))]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[b, c]) ++- Calc(select=[b, c], where=[<>(b, '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[b, c]) ++- Calc(select=[b, c], where=[(b <> '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[b, c], where=[(b <> '123')])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> + <TestCase name="testDeleteWithMetaColumns[deleteMode = REMAINING_ROWS]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b]) ++- LogicalProject(meta_f1=[$3], meta_f2=[$4], b=[$1]) + +- LogicalFilter(condition=[NOT(=($1, _UTF-16LE'123'))]) + +- LogicalTableScan(table=[[default_catalog, default_database, t]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b]) ++- Calc(select=[meta_f1, meta_f2, b], where=[<>(b, '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b]) ++- Calc(select=[meta_f1, meta_f2, b], where=[(b <> '123')]) + +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: t[]", + "pact" : "Data Source", + "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc[]", + "pact" : "Operator", + "contents" : "[]:Calc(select=[meta_f1, meta_f2, b], where=[(b <> '123')])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +}]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java new file mode 100644 index 00000000000..8944f6605f5 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.types.RowKind; + +/** An operator that sets the row kind of the incoming records to a specific row kind. */ +@Internal +public class RowKindSetter extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private final RowKind targetRowKind; + + public RowKindSetter(RowKind targetRowKind) { + this.targetRowKind = targetRowKind; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + element.getValue().setRowKind(targetRowKind); + output.collect(element); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/RowKindSetterTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/RowKindSetterTest.java new file mode 100644 index 00000000000..0127cdc53dd --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/RowKindSetterTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RowKindSetter}. */ +public class RowKindSetterTest { + + @Test + public void testSetRowKind() throws Exception { + // test set to all row kind + for (RowKind targetRowKind : RowKind.values()) { + RowKindSetter rowKindSetter = new RowKindSetter(targetRowKind); + try (OneInputStreamOperatorTestHarness<RowData, RowData> operatorTestHarness = + new OneInputStreamOperatorTestHarness<>(rowKindSetter)) { + operatorTestHarness.open(); + + // get the rows with all row kind + List<RowData> rows = getRowsWithAllRowKind(); + for (RowData row : rows) { + operatorTestHarness.processElement(new StreamRecord<>(row)); + } + // verify the row kind of output + verifyRowKind(operatorTestHarness.extractOutputValues(), targetRowKind); + } + } + } + + private List<RowData> getRowsWithAllRowKind() { + List<RowData> rows = new ArrayList<>(); + for (RowKind rowKind : RowKind.values()) { + rows.add(GenericRowData.of(rowKind, 1)); + } + return rows; + } + + private void verifyRowKind(List<RowData> rows, RowKind rowKind) { + for (RowData row : rows) { + assertThat(row.getRowKind()).isEqualTo(rowKind); + } + } +}
