This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3fa5ecbc8534662fe7e20bf75e23f12890318d02 Author: luoyuxia <[email protected]> AuthorDate: Sun Jan 15 17:48:13 2023 +0800 [FLINK-30662][table] Planner supports delete push down. This closes #21676 --- .../table/api/internal/TableEnvironmentImpl.java | 47 +++ .../operations/DeleteFromFilterOperation.java | 78 +++++ .../table/operations/SinkModifyOperation.java | 51 +++- .../planner/operations/DeletePushDownUtils.java | 283 ++++++++++++++++++ .../operations/SqlToOperationConverter.java | 44 +++ .../PushFilterInCalcIntoTableSourceScanRule.java | 2 +- .../logical/PushFilterIntoSourceScanRuleBase.java | 24 -- .../logical/PushFilterIntoTableSourceScanRule.java | 2 +- .../table/planner/plan/utils/FlinkRexUtil.scala | 29 +- .../factories/TestUpdateDeleteTableFactory.java | 326 +++++++++++++++++++++ .../operations/DeletePushDownUtilsTest.java | 184 ++++++++++++ .../operations/SqlToOperationConverterTest.java | 52 ++++ .../runtime/batch/sql/DeleteTableITCase.java | 93 ++++++ .../runtime/stream/sql/DeleteTableITCase.java | 38 +++ .../org.apache.flink.table.factories.Factory | 1 + 15 files changed, 1226 insertions(+), 28 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index c54c3d5fa2d..3578e1845cb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -89,6 +89,7 @@ import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CollectModifyOperation; import org.apache.flink.table.operations.CompileAndExecutePlanOperation; import org.apache.flink.table.operations.CreateTableASOperation; +import org.apache.flink.table.operations.DeleteFromFilterOperation; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; @@ -718,6 +719,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { if (operations.isEmpty()) { return ""; } else { + if (operations.size() > 1 + && operations.stream().anyMatch(this::isRowLevelModification)) { + throw new TableException("Only single UPDATE/DELETE statement is supported."); + } return planner.explain(operations, format, extraDetails); } } @@ -847,6 +852,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { executeInternal(ctasOperation.getCreateTableOperation()); mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager)); } else { + boolean isRowLevelModification = isRowLevelModification(modify); + if (isRowLevelModification) { + String modifyType = + ((SinkModifyOperation) modify).isDelete() ? "DELETE" : "UPDATE"; + if (operations.size() > 1) { + throw new TableException( + String.format( + "Only single %s statement is supported.", modifyType)); + } + if (isStreamingMode) { + throw new TableException( + String.format( + "%s statement is not supported for streaming mode now.", + modifyType)); + } + if (modify instanceof DeleteFromFilterOperation) { + return executeInternal((DeleteFromFilterOperation) modify); + } + } mapOperations.add(modify); } } @@ -865,6 +889,21 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { return result; } + private TableResultInternal executeInternal( + DeleteFromFilterOperation deleteFromFilterOperation) { + Optional<Long> rows = + deleteFromFilterOperation.getSupportsDeletePushDownSink().executeDeletion(); + if (rows.isPresent()) { + return TableResultImpl.builder() + .resultKind(ResultKind.SUCCESS) + .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) + .data(Arrays.asList(Row.of(String.valueOf(rows.get())), Row.of("OK"))) + .build(); + } else { + return TableResultImpl.TABLE_RESULT_OK; + } + } + private TableResultInternal executeInternal( List<Transformation<?>> transformations, List<String> sinkIdentifierNames) { final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames); @@ -1973,4 +2012,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) { return planner.explainPlan(compiledPlan, extraDetails); } + + private boolean isRowLevelModification(Operation operation) { + if (operation instanceof SinkModifyOperation) { + SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation; + return sinkModifyOperation.isDelete() || sinkModifyOperation.isUpdate(); + } + return true; + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java new file mode 100644 index 00000000000..f902663d732 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** The operation for deleting data in a table according to filters directly. */ +@Internal +public class DeleteFromFilterOperation extends SinkModifyOperation { + + @Nonnull private final SupportsDeletePushDown supportsDeletePushDownSink; + @Nonnull private final List<ResolvedExpression> filters; + + public DeleteFromFilterOperation( + ContextResolvedTable contextResolvedTable, + @Nonnull SupportsDeletePushDown supportsDeletePushDownSink, + @Nonnull List<ResolvedExpression> filters) { + super(contextResolvedTable, null, ModifyType.DELETE); + this.supportsDeletePushDownSink = Preconditions.checkNotNull(supportsDeletePushDownSink); + this.filters = Preconditions.checkNotNull(filters); + } + + @Nonnull + public SupportsDeletePushDown getSupportsDeletePushDownSink() { + return supportsDeletePushDownSink; + } + + @Nonnull + public List<ResolvedExpression> getFilters() { + return filters; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", getContextResolvedTable().getIdentifier().asSummaryString()); + params.put("filters", filters); + return OperationUtils.formatWithChildren( + "DeleteFromFilter", params, Collections.emptyList(), Operation::asSummaryString); + } + + @Override + public QueryOperation getChild() { + throw new UnsupportedOperationException("This shouldn't be called"); + } + + @Override + public <T> T accept(ModifyOperationVisitor<T> visitor) { + throw new UnsupportedOperationException("This shouldn't be called"); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java index e9cdb12b3b5..db360c0f5f2 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java @@ -38,27 +38,58 @@ import java.util.Map; @Internal public class SinkModifyOperation implements ModifyOperation { - private final ContextResolvedTable contextResolvedTable; + protected final ContextResolvedTable contextResolvedTable; private final Map<String, String> staticPartitions; private final QueryOperation child; private final boolean overwrite; private final Map<String, String> dynamicOptions; + private final ModifyType modifyType; public SinkModifyOperation(ContextResolvedTable contextResolvedTable, QueryOperation child) { this(contextResolvedTable, child, Collections.emptyMap(), false, Collections.emptyMap()); } + public SinkModifyOperation( + ContextResolvedTable contextResolvedTable, + QueryOperation child, + ModifyType modifyType) { + this( + contextResolvedTable, + child, + Collections.emptyMap(), + false, + Collections.emptyMap(), + modifyType); + } + public SinkModifyOperation( ContextResolvedTable contextResolvedTable, QueryOperation child, Map<String, String> staticPartitions, boolean overwrite, Map<String, String> dynamicOptions) { + this( + contextResolvedTable, + child, + staticPartitions, + overwrite, + dynamicOptions, + ModifyType.INSERT); + } + + public SinkModifyOperation( + ContextResolvedTable contextResolvedTable, + QueryOperation child, + Map<String, String> staticPartitions, + boolean overwrite, + Map<String, String> dynamicOptions, + ModifyType modifyType) { this.contextResolvedTable = contextResolvedTable; this.child = child; this.staticPartitions = staticPartitions; this.overwrite = overwrite; this.dynamicOptions = dynamicOptions; + this.modifyType = modifyType; } public ContextResolvedTable getContextResolvedTable() { @@ -73,6 +104,14 @@ public class SinkModifyOperation implements ModifyOperation { return overwrite; } + public boolean isUpdate() { + return modifyType == ModifyType.UPDATE; + } + + public boolean isDelete() { + return modifyType == ModifyType.DELETE; + } + public Map<String, String> getDynamicOptions() { return dynamicOptions; } @@ -91,6 +130,7 @@ public class SinkModifyOperation implements ModifyOperation { public String asSummaryString() { Map<String, Object> params = new LinkedHashMap<>(); params.put("identifier", getContextResolvedTable().getIdentifier().asSummaryString()); + params.put("modifyType", modifyType); params.put("staticPartitions", staticPartitions); params.put("overwrite", overwrite); params.put("dynamicOptions", dynamicOptions); @@ -101,4 +141,13 @@ public class SinkModifyOperation implements ModifyOperation { Collections.singletonList(child), Operation::asSummaryString); } + + /** The type of sink modification. */ + public enum ModifyType { + INSERT, + + UPDATE, + + DELETE + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java new file mode 100644 index 00000000000..c9682ed5078 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.resolver.ExpressionResolver; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.TableFactoryUtil; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule; +import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.planner.utils.TableConfigUtils; + +import org.apache.calcite.plan.RelOptPredicateList; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.rules.ReduceExpressionsRule; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; + +/** A utility class for delete push down. */ +public class DeletePushDownUtils { + + /** + * Get the {@link DynamicTableSink} for the table to be modified. Return Optional.empty() if it + * can't get the {@link DynamicTableSink}. + */ + public static Optional<DynamicTableSink> getDynamicTableSink( + ContextResolvedTable contextResolvedTable, + LogicalTableModify tableModify, + CatalogManager catalogManager) { + final FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster()); + + CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable(); + // only consider DynamicTableSink + if (catalogBaseTable instanceof CatalogTable) { + ResolvedCatalogTable resolvedTable = contextResolvedTable.getResolvedTable(); + Optional<Catalog> optionalCatalog = contextResolvedTable.getCatalog(); + ObjectIdentifier objectIdentifier = contextResolvedTable.getIdentifier(); + boolean isTemporary = contextResolvedTable.isTemporary(); + // only consider the CatalogTable that doesn't use legacy connector sink option + if (!contextResolvedTable.isAnonymous() + && !TableFactoryUtil.isLegacyConnectorOptions( + catalogManager + .getCatalog(objectIdentifier.getCatalogName()) + .orElse(null), + context.getTableConfig(), + !context.isBatchMode(), + objectIdentifier, + resolvedTable, + isTemporary)) { + DynamicTableSinkFactory dynamicTableSinkFactory = null; + if (optionalCatalog.isPresent() + && optionalCatalog.get().getFactory().isPresent() + && optionalCatalog.get().getFactory().get() + instanceof DynamicTableSinkFactory) { + // try get from catalog + dynamicTableSinkFactory = + (DynamicTableSinkFactory) optionalCatalog.get().getFactory().get(); + } + + if (dynamicTableSinkFactory == null) { + Optional<DynamicTableSinkFactory> factoryFromModule = + context.getModuleManager().getFactory((Module::getTableSinkFactory)); + // then try get from module + dynamicTableSinkFactory = factoryFromModule.orElse(null); + } + // create table dynamic table sink + DynamicTableSink tableSink = + FactoryUtil.createDynamicTableSink( + dynamicTableSinkFactory, + objectIdentifier, + resolvedTable, + Collections.emptyMap(), + context.getTableConfig(), + context.getClassLoader(), + contextResolvedTable.isTemporary()); + return Optional.of(tableSink); + } + } + return Optional.empty(); + } + + /** + * Get the resolved filter expressions from the {@code WHERE} clause in DELETE statement, return + * Optional.empty() if {@code WHERE} clause contains sub-query. + */ + public static Optional<List<ResolvedExpression>> getResolvedFilterExpressions( + LogicalTableModify tableModify) { + FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster()); + RelNode input = tableModify.getInput().getInput(0); + // no WHERE clause, return an empty list + if (input instanceof LogicalTableScan) { + return Optional.of(Collections.emptyList()); + } + if (!(input instanceof LogicalFilter)) { + return Optional.empty(); + } + + Filter filter = (Filter) input; + if (RexUtil.SubQueryFinder.containsSubQuery(filter)) { + return Optional.empty(); + } + + // optimize the filter + filter = prepareFilter(filter); + + // resolve the filter to get resolved expression + List<ResolvedExpression> resolveExpression = resolveFilter(context, filter); + return Optional.ofNullable(resolveExpression); + } + + /** Prepare the filter with reducing && simplifying. */ + private static Filter prepareFilter(Filter filter) { + // we try to reduce and simplify the filter + ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = ReduceExpressionsRuleProxy.INSTANCE; + SimplifyFilterConditionRule simplifyFilterConditionRule = + SimplifyFilterConditionRule.INSTANCE(); + // max iteration num for reducing and simplifying filter, + // we use 5 as the max iteration num which is same with the iteration num in Flink's plan + // optimizing. + int maxIteration = 5; + + boolean changed = true; + int iteration = 1; + // iterate until it reaches max iteration num or there's no changes in one iterate + while (changed && iteration <= maxIteration) { + changed = false; + // first apply the rule to reduce condition in filter + RexNode newCondition = filter.getCondition(); + List<RexNode> expList = new ArrayList<>(); + expList.add(newCondition); + if (reduceExpressionsRuleProxy.reduce(filter, expList)) { + // get the new condition + newCondition = expList.get(0); + changed = true; + } + // create a new filter + filter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + // then apply the rule to simplify filter + Option<Filter> changedFilter = + simplifyFilterConditionRule.simplify(filter, new boolean[] {false}); + if (changedFilter.isDefined()) { + filter = changedFilter.get(); + changed = true; + } + iteration += 1; + } + return filter; + } + + /** + * A proxy for {@link ReduceExpressionsRule}, which enables us to call the method {@link + * ReduceExpressionsRule#reduceExpressions(RelNode, List, RelOptPredicateList)}. + */ + private static class ReduceExpressionsRuleProxy + extends ReduceExpressionsRule<ReduceExpressionsRule.Config> { + private static final ReduceExpressionsRule.Config config = + FilterReduceExpressionsRule.FilterReduceExpressionsRuleConfig.DEFAULT; + private static final ReduceExpressionsRuleProxy INSTANCE = new ReduceExpressionsRuleProxy(); + + public ReduceExpressionsRuleProxy() { + super(config); + } + + @Override + public void onMatch(RelOptRuleCall relOptRuleCall) { + throw new UnsupportedOperationException("This shouldn't be called"); + } + + private boolean reduce(RelNode rel, List<RexNode> expList) { + return reduceExpressions( + rel, + expList, + RelOptPredicateList.EMPTY, + true, + config.matchNullability(), + config.treatDynamicCallsAsConstant()); + } + } + + /** Return the ResolvedExpression according to Filter. */ + private static List<ResolvedExpression> resolveFilter(FlinkContext context, Filter filter) { + Tuple2<RexNode[], RexNode[]> extractedPredicates = + FlinkRexUtil.extractPredicates( + filter.getInput().getRowType().getFieldNames().toArray(new String[0]), + filter.getCondition(), + filter, + filter.getCluster().getRexBuilder()); + RexNode[] convertiblePredicates = extractedPredicates._1; + RexNode[] unconvertedPredicates = extractedPredicates._2; + if (unconvertedPredicates.length != 0) { + // if contain any unconverted condition, return null + return null; + } + RexNodeToExpressionConverter converter = + new RexNodeToExpressionConverter( + filter.getCluster().getRexBuilder(), + filter.getInput().getRowType().getFieldNames().toArray(new String[0]), + context.getFunctionCatalog(), + context.getCatalogManager(), + TimeZone.getTimeZone( + TableConfigUtils.getLocalTimeZone(context.getTableConfig()))); + List<Expression> filters = + Arrays.stream(convertiblePredicates) + .map( + p -> { + Option<ResolvedExpression> expr = p.accept(converter); + if (expr.isDefined()) { + return expr.get(); + } else { + throw new TableException( + String.format( + "%s can not be converted to Expression", + p)); + } + }) + .collect(Collectors.toList()); + ExpressionResolver resolver = + ExpressionResolver.resolverFor( + context.getTableConfig(), + context.getClassLoader(), + name -> Optional.empty(), + context.getFunctionCatalog() + .asLookup( + str -> { + throw new TableException( + "We should not need to lookup any expressions at this point"); + }), + context.getCatalogManager().getDataTypeFactory(), + (sqlExpression, inputRowType, outputType) -> { + throw new TableException( + "SQL expression parsing is not supported at this location."); + }) + .build(); + return resolver.resolve(filters); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 96d279517fd..1851cce8c11 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -118,15 +118,19 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.operations.BeginStatementSetOperation; import org.apache.flink.table.operations.CompileAndExecutePlanOperation; +import org.apache.flink.table.operations.DeleteFromFilterOperation; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.EndStatementSetOperation; import org.apache.flink.table.operations.ExplainOperation; @@ -199,6 +203,8 @@ import org.apache.flink.util.StringUtils; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.hint.HintStrategyTable; import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.sql.SqlDelete; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; @@ -392,6 +398,8 @@ public class SqlToOperationConverter { return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable) validated)); } else if (validated instanceof SqlStopJob) { return Optional.of(converter.convertStopJob((SqlStopJob) validated)); + } else if (validated instanceof SqlDelete) { + return Optional.of(converter.convertDelete((SqlDelete) validated)); } else { return Optional.empty(); } @@ -1492,6 +1500,42 @@ public class SqlToOperationConverter { sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain()); } + private Operation convertDelete(SqlDelete sqlDelete) { + RelRoot updateRelational = flinkPlanner.rel(sqlDelete); + LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel; + UnresolvedIdentifier unresolvedTableIdentifier = + UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName()); + ContextResolvedTable contextResolvedTable = + catalogManager.getTableOrError( + catalogManager.qualifyIdentifier(unresolvedTableIdentifier)); + // try push down delete + Optional<DynamicTableSink> optionalDynamicTableSink = + DeletePushDownUtils.getDynamicTableSink( + contextResolvedTable, tableModify, catalogManager); + if (optionalDynamicTableSink.isPresent()) { + DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get(); + // if the table sink supports delete push down + if (dynamicTableSink instanceof SupportsDeletePushDown) { + SupportsDeletePushDown supportsDeletePushDownSink = + (SupportsDeletePushDown) dynamicTableSink; + // get resolved filter expression + Optional<List<ResolvedExpression>> filters = + DeletePushDownUtils.getResolvedFilterExpressions(tableModify); + if (filters.isPresent() + && supportsDeletePushDownSink.applyDeleteFilters(filters.get())) { + return new DeleteFromFilterOperation( + contextResolvedTable, supportsDeletePushDownSink, filters.get()); + } + } + } + + // otherwise, delete push down is not applicable, throw unsupported exception + throw new UnsupportedOperationException( + String.format( + "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.", + unresolvedTableIdentifier.asSummaryString())); + } + private void validateTableConstraint(SqlTableConstraint constraint) { if (constraint.isUnique()) { throw new UnsupportedOperationException("UNIQUE constraint is not supported yet"); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java index 33d24fa460f..4dc970af831 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java @@ -85,7 +85,7 @@ public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourc RelBuilder relBuilder = call.builder(); Tuple2<RexNode[], RexNode[]> extractedPredicates = - extractPredicates( + FlinkRexUtil.extractPredicates( originProgram.getInputRowType().getFieldNames().toArray(new String[0]), originProgram.expandLocalRef(originProgram.getCondition()), scan, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java index 3e023d50afc..eb5e4f6d25d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java @@ -22,29 +22,22 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.expressions.ResolvedExpression; -import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.expressions.converter.ExpressionConverter; import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; import org.apache.flink.table.planner.plan.schema.TableSourceTable; -import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; -import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; -import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; import org.apache.flink.table.planner.utils.ShortcutUtils; -import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.RelBuilder; import java.util.Arrays; import java.util.List; -import java.util.TimeZone; import java.util.stream.Collectors; import scala.Tuple2; @@ -120,23 +113,6 @@ public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule { return new Tuple2<>(result, newTableSourceTable); } - protected Tuple2<RexNode[], RexNode[]> extractPredicates( - String[] inputNames, RexNode filterExpression, TableScan scan, RexBuilder rexBuilder) { - FlinkContext context = ShortcutUtils.unwrapContext(scan); - int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan); - RexNodeToExpressionConverter converter = - new RexNodeToExpressionConverter( - rexBuilder, - inputNames, - context.getFunctionCatalog(), - context.getCatalogManager(), - TimeZone.getTimeZone( - TableConfigUtils.getLocalTimeZone(context.getTableConfig()))); - - return RexNodeExtractor.extractConjunctiveConditions( - filterExpression, maxCnfNodeCount, rexBuilder, converter); - } - /** * Determines wether we can pushdown the filter into the source. we can not push filter twice, * make sure FilterPushDownSpec has not been assigned as a capability. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java index 516f00c15cd..eaa6999cdd7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java @@ -78,7 +78,7 @@ public class PushFilterIntoTableSourceScanRule extends PushFilterIntoSourceScanR RelBuilder relBuilder = call.builder(); Tuple2<RexNode[], RexNode[]> extractedPredicates = - extractPredicates( + FlinkRexUtil.extractPredicates( filter.getInput().getRowType().getFieldNames().toArray(new String[0]), filter.getCondition(), scan, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala index 81819208b9d..dba27b0eb2e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala @@ -23,12 +23,14 @@ import org.apache.flink.configuration.ConfigOptions.key import org.apache.flink.table.planner.functions.sql.SqlTryCastFunction import org.apache.flink.table.planner.plan.utils.ExpressionDetail.ExpressionDetail import org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat +import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils} import com.google.common.base.Function import com.google.common.collect.{ImmutableList, Lists} import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.plan.{RelOptPredicateList, RelOptUtil} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.RelNode import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.{SqlAsOperator, SqlKind, SqlOperator} @@ -39,7 +41,7 @@ import org.apache.calcite.util._ import java.lang.{Iterable => JIterable} import java.math.BigDecimal import java.util -import java.util.Optional +import java.util.{Optional, TimeZone} import java.util.function.Predicate import scala.collection.JavaConversions._ @@ -634,6 +636,31 @@ object FlinkRexUtil { val projects = rexProgram.getProjectList.map(rexProgram.expandLocalRef) projects.forall(isDeterministicInStreaming) } + + /** + * Return convertible rex nodes and unconverted rex nodes extracted from the filter expression. + */ + def extractPredicates( + inputNames: Array[String], + filterExpression: RexNode, + rel: RelNode, + rexBuilder: RexBuilder): (Array[RexNode], Array[RexNode]) = { + val context = ShortcutUtils.unwrapContext(rel) + val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(rel); + val converter = + new RexNodeToExpressionConverter( + rexBuilder, + inputNames, + context.getFunctionCatalog, + context.getCatalogManager, + TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig))); + + RexNodeExtractor.extractConjunctiveConditions( + filterExpression, + maxCnfNodeCount, + rexBuilder, + converter); + } } /** diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java new file mode 100644 index 00000000000..df114031ecf --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.factories; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.table.data.RowData.createFieldGetter; + +/** A factory to create table to support update/delete for test purpose. */ +public class TestUpdateDeleteTableFactory + implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "test-update-delete"; + + private static final ConfigOption<String> DATA_ID = + ConfigOptions.key("data-id") + .stringType() + .noDefaultValue() + .withDescription("The data id used to read the rows."); + + private static final ConfigOption<Boolean> ONLY_ACCEPT_EQUAL_PREDICATE = + ConfigOptions.key("only-accept-equal-predicate") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether only accept when the all predicates in filter is equal expression for delete statement."); + + private static final ConfigOption<Boolean> SUPPORT_DELETE_PUSH_DOWN = + ConfigOptions.key("support-delete-push-down") + .booleanType() + .defaultValue(true) + .withDescription("Whether the table supports delete push down."); + + private static final AtomicInteger idCounter = new AtomicInteger(0); + private static final Map<String, Collection<RowData>> registeredRowData = new HashMap<>(); + + public static String registerRowData(Collection<RowData> data) { + String id = String.valueOf(idCounter.incrementAndGet()); + registeredRowData.put(id, data); + return id; + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + String dataId = + helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get())); + if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) { + return new SupportsDeletePushDownSink( + dataId, + helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE), + context.getCatalogTable()); + } else { + return new TestSink(); + } + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + String dataId = + helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get())); + return new TestTableSource(dataId); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return new HashSet<>( + Arrays.asList(DATA_ID, ONLY_ACCEPT_EQUAL_PREDICATE, SUPPORT_DELETE_PUSH_DOWN)); + } + + /** A test table source which supports reading metadata. */ + private static class TestTableSource implements ScanTableSource { + private final String dataId; + + public TestTableSource(String dataId) { + this.dataId = dataId; + } + + @Override + public DynamicTableSource copy() { + return new TestTableSource(dataId); + } + + @Override + public String asSummaryString() { + return "test table source"; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + return new SourceFunctionProvider() { + @Override + public SourceFunction<RowData> createSourceFunction() { + Collection<RowData> rows = registeredRowData.get(dataId); + if (rows != null) { + return new FromElementsFunction<>(rows); + } else { + return new FromElementsFunction<>(); + } + } + + @Override + public boolean isBounded() { + return true; + } + }; + } + } + + /** A common test sink. */ + private static class TestSink implements DynamicTableSink { + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.insertOnly(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return null; + } + + @Override + public DynamicTableSink copy() { + return null; + } + + @Override + public String asSummaryString() { + return null; + } + } + + /** A sink that supports delete push down. */ + public static class SupportsDeletePushDownSink extends TestSink + implements SupportsDeletePushDown { + + private final String dataId; + private final boolean onlyAcceptEqualPredicate; + private final ResolvedCatalogTable resolvedCatalogTable; + private final RowData.FieldGetter[] fieldGetters; + private final List<String> columns; + + private List<Tuple2<String, Object>> equalPredicates; + + public SupportsDeletePushDownSink( + String dataId, + boolean onlyAcceptEqualPredicate, + ResolvedCatalogTable resolvedCatalogTable) { + this.dataId = dataId; + this.onlyAcceptEqualPredicate = onlyAcceptEqualPredicate; + this.resolvedCatalogTable = resolvedCatalogTable; + this.fieldGetters = getAllFieldGetter(resolvedCatalogTable.getResolvedSchema()); + this.columns = resolvedCatalogTable.getResolvedSchema().getColumnNames(); + } + + @Override + public DynamicTableSink copy() { + return new SupportsDeletePushDownSink( + dataId, onlyAcceptEqualPredicate, resolvedCatalogTable); + } + + @Override + public String asSummaryString() { + return "SupportDeletePushDownSink"; + } + + @Override + public boolean applyDeleteFilters(List<ResolvedExpression> filters) { + if (onlyAcceptEqualPredicate) { + Optional<List<Tuple2<String, Object>>> optionalEqualPredicates = + getEqualPredicates(filters); + if (optionalEqualPredicates.isPresent()) { + equalPredicates = optionalEqualPredicates.get(); + return true; + } + return false; + } + return true; + } + + @Override + public Optional<Long> executeDeletion() { + if (onlyAcceptEqualPredicate) { + Collection<RowData> existingRows = registeredRowData.get(dataId); + long rowsBefore = existingRows.size(); + existingRows.removeIf( + rowData -> + satisfyEqualPredicate( + equalPredicates, rowData, fieldGetters, columns)); + return Optional.of(rowsBefore - existingRows.size()); + } + return Optional.empty(); + } + } + + /** + * Get a list of equal predicate from a list of filter, each contains [column, value]. Return + * Optional.empty() if it contains any non-equal predicate. + */ + private static Optional<List<Tuple2<String, Object>>> getEqualPredicates( + List<ResolvedExpression> filters) { + List<Tuple2<String, Object>> equalPredicates = new ArrayList<>(); + for (ResolvedExpression expression : filters) { + if (!(expression instanceof CallExpression)) { + return Optional.empty(); + } + CallExpression callExpression = (CallExpression) expression; + if (callExpression.getFunctionDefinition() != BuiltInFunctionDefinitions.EQUALS) { + return Optional.empty(); + } + String colName = getColumnName(callExpression); + Object value = getColumnValue(callExpression); + equalPredicates.add(Tuple2.of(colName, value)); + } + return Optional.of(equalPredicates); + } + + private static String getColumnName(CallExpression comp) { + return ((FieldReferenceExpression) comp.getChildren().get(0)).getName(); + } + + private static Object getColumnValue(CallExpression comp) { + ValueLiteralExpression valueLiteralExpression = + (ValueLiteralExpression) comp.getChildren().get(1); + return valueLiteralExpression + .getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass()) + .get(); + } + + /** Check the rowData satisfies the equal predicate. */ + private static boolean satisfyEqualPredicate( + List<Tuple2<String, Object>> equalPredicates, + RowData rowData, + RowData.FieldGetter[] fieldGetters, + List<String> columns) { + for (Tuple2<String, Object> equalPredicate : equalPredicates) { + String colName = equalPredicate.f0; + Object value = equalPredicate.f1; + int colIndex = columns.indexOf(colName); + if (!(Objects.equals(value, fieldGetters[colIndex].getFieldOrNull(rowData)))) { + return false; + } + } + return true; + } + + private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) { + List<DataType> dataTypes = resolvedSchema.getColumnDataTypes(); + RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[dataTypes.size()]; + for (int i = 0; i < dataTypes.size(); i++) { + fieldGetters[i] = createFieldGetter(dataTypes.get(i).getLogicalType(), i); + } + return fieldGetters; + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java new file mode 100644 index 00000000000..f134223d243 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; +import org.apache.flink.table.planner.delegation.PlannerContext; +import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory; +import org.apache.flink.table.planner.parse.CalciteParser; +import org.apache.flink.table.planner.utils.PlannerMocks; +import org.apache.flink.table.utils.CatalogManagerMocks; + +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.sql.SqlNode; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DeletePushDownUtils}. */ +public class DeletePushDownUtilsTest { + private final TableConfig tableConfig = TableConfig.getDefault(); + private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default"); + private final CatalogManager catalogManager = + CatalogManagerMocks.preparedCatalogManager() + .defaultCatalog("builtin", catalog) + .config( + Configuration.fromMap( + Collections.singletonMap( + ExecutionOptions.RUNTIME_MODE.key(), + RuntimeExecutionMode.BATCH.name()))) + .build(); + + private final PlannerMocks plannerMocks = + PlannerMocks.newBuilder() + .withBatchMode(true) + .withTableConfig(tableConfig) + .withCatalogManager(catalogManager) + .withRootSchema( + asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false))) + .build(); + private final PlannerContext plannerContext = plannerMocks.getPlannerContext(); + private final CalciteParser parser = plannerContext.createCalciteParser(); + private final FlinkPlannerImpl flinkPlanner = plannerContext.createFlinkPlanner(); + + @Test + public void testGetDynamicTableSink() { + // create a table with connector = test-update-delete + Map<String, String> options = new HashMap<>(); + options.put("connector", "test-update-delete"); + CatalogTable catalogTable = createTestCatalogTable(options); + ObjectIdentifier tableId = ObjectIdentifier.of("builtin", "default", "t"); + catalogManager.createTable(catalogTable, tableId, false); + ContextResolvedTable resolvedTable = + ContextResolvedTable.permanent( + tableId, catalog, catalogManager.resolveCatalogTable(catalogTable)); + LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t"); + Optional<DynamicTableSink> optionalDynamicTableSink = + DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager); + // verify we can get the dynamic table sink + assertThat(optionalDynamicTableSink).isPresent(); + assertThat(optionalDynamicTableSink.get()) + .isInstanceOf(TestUpdateDeleteTableFactory.SupportsDeletePushDownSink.class); + + // create table with connector = COLLECTION, it's legacy table sink + options.put("connector", "COLLECTION"); + catalogTable = createTestCatalogTable(options); + tableId = ObjectIdentifier.of("builtin", "default", "t1"); + catalogManager.createTable(catalogTable, tableId, false); + resolvedTable = + ContextResolvedTable.permanent( + tableId, catalog, catalogManager.resolveCatalogTable(catalogTable)); + tableModify = getTableModifyFromSql("DELETE FROM t1"); + optionalDynamicTableSink = + DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager); + // verify it should be empty since it's not an instance of DynamicTableSink but is legacy + // TableSink + assertThat(optionalDynamicTableSink).isEmpty(); + } + + @Test + public void testGetResolveFilterExpressions() { + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.STRING().nullable()) + .column("f2", DataTypes.BIGINT().nullable()) + .build(), + null, + Collections.emptyList(), + Collections.emptyMap()); + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "t"), false); + + // verify there's no where clause + LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t"); + Optional<List<ResolvedExpression>> optionalResolvedExpressions = + DeletePushDownUtils.getResolvedFilterExpressions(tableModify); + verifyExpression(optionalResolvedExpressions, "[]"); + + tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 and f1 = '123'"); + optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify); + verifyExpression(optionalResolvedExpressions, "[equals(f0, 1), equals(f1, '123')]"); + + tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 + 6 and f0 < 6"); + optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify); + assertThat(optionalResolvedExpressions).isPresent(); + verifyExpression(optionalResolvedExpressions, "[false]"); + + tableModify = getTableModifyFromSql("DELETE FROM t where f0 = f2 + 1"); + optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify); + verifyExpression( + optionalResolvedExpressions, "[equals(cast(f0, BIGINT NOT NULL), plus(f2, 1))]"); + + // resolve filters is not available as it contains sub-query + tableModify = getTableModifyFromSql("DELETE FROM t where f0 > (select count(1) from t)"); + optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify); + assertThat(optionalResolvedExpressions).isEmpty(); + } + + private CatalogTable createTestCatalogTable(Map<String, String> options) { + return CatalogTable.of( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.STRING().nullable()) + .column("f2", DataTypes.BIGINT().nullable()) + .build(), + null, + Collections.emptyList(), + options); + } + + private LogicalTableModify getTableModifyFromSql(String sql) { + SqlNode sqlNode = parser.parse(sql); + final SqlNode validated = flinkPlanner.validate(sqlNode); + RelRoot deleteRelational = flinkPlanner.rel(validated); + return (LogicalTableModify) deleteRelational.rel; + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private void verifyExpression( + Optional<List<ResolvedExpression>> optionalResolvedExpressions, String expected) { + assertThat(optionalResolvedExpressions).isPresent(); + assertThat(optionalResolvedExpressions.get().toString()).isEqualTo(expected); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 0197ce8d432..129b969b9a0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -52,9 +52,11 @@ import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.SqlCallExpression; import org.apache.flink.table.factories.TestManagedTableFactory; import org.apache.flink.table.operations.BeginStatementSetOperation; +import org.apache.flink.table.operations.DeleteFromFilterOperation; import org.apache.flink.table.operations.EndStatementSetOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; @@ -96,6 +98,7 @@ import org.apache.flink.table.planner.delegation.PlannerContext; import org.apache.flink.table.planner.expressions.utils.Func0$; import org.apache.flink.table.planner.expressions.utils.Func1$; import org.apache.flink.table.planner.expressions.utils.Func8$; +import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory; import org.apache.flink.table.planner.parse.CalciteParser; import org.apache.flink.table.planner.parse.ExtendedParser; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; @@ -2706,6 +2709,46 @@ public class SqlToOperationConverterTest { assertThat(extendedParser.parse(command)).get().isInstanceOf(QuitOperation.class); } + @Test + public void testDeletePushDown() throws Exception { + Map<String, String> options = new HashMap<>(); + options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER); + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("c", DataTypes.STRING().notNull()) + .build(), + null, + Collections.emptyList(), + options); + ObjectIdentifier tableIdentifier = + ObjectIdentifier.of("builtin", "default", "test_push_down"); + catalogManager.createTable(catalogTable, tableIdentifier, false); + + // no filter in delete statement + Operation operation = parse("DELETE FROM test_push_down"); + checkDeleteFromFilterOperation(operation, "[]"); + + // with filters in delete statement + operation = parse("DELETE FROM test_push_down where a = 1 and c = '123'"); + checkDeleteFromFilterOperation(operation, "[equals(a, 1), equals(c, '123')]"); + + // with filter = false after reduced in delete statement + operation = parse("DELETE FROM test_push_down where a = 1 + 6 and a = 2"); + checkDeleteFromFilterOperation(operation, "[false]"); + + assertThatThrownBy( + () -> + parse( + "DELETE FROM test_push_down where a = (select count(*) from test_push_down)")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + String.format( + "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.", + tableIdentifier.asSerializableString())); + } + // ~ Tool Methods ---------------------------------------------------------- private static TestItem createTestItem(Object... args) { @@ -2902,6 +2945,15 @@ public class SqlToOperationConverterTest { TestManagedTableFactory.ENRICHED_VALUE); } + private static void checkDeleteFromFilterOperation( + Operation operation, String expectedFilters) { + assertThat(operation).isInstanceOf(DeleteFromFilterOperation.class); + DeleteFromFilterOperation deleteFromFiltersOperation = + (DeleteFromFilterOperation) operation; + List<ResolvedExpression> filters = deleteFromFiltersOperation.getFilters(); + assertThat(filters.toString()).isEqualTo(expectedFilters); + } + // ~ Inner Classes ---------------------------------------------------------- private static class TestItem { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java new file mode 100644 index 00000000000..8a60d24dfbe --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** The IT case for DELETE statement in batch mode. */ +public class DeleteTableITCase extends BatchTestBase { + private static final int ROW_NUM = 5; + + @Test + public void testDeletePushDown() throws Exception { + String dataId = registerData(); + tEnv().executeSql( + String.format( + "CREATE TABLE t (a int, b string, c double) WITH" + + " ('connector' = 'test-update-delete'," + + " 'data-id' = '%s'," + + " 'only-accept-equal-predicate' = 'true'" + + ")", + dataId)); + // it only contains equal expression, should be pushed down + List<Row> rows = + CollectionUtil.iteratorToList( + tEnv().executeSql("DELETE FROM t WHERE a = 1").collect()); + assertThat(rows.toString()).isEqualTo("[+I[1], +I[OK]]"); + rows = CollectionUtil.iteratorToList(tEnv().executeSql("SELECT * FROM t").collect()); + assertThat(rows.toString()) + .isEqualTo("[+I[0, b_0, 0.0], +I[2, b_2, 4.0], +I[3, b_3, 6.0], +I[4, b_4, 8.0]]"); + + // should throw exception for the deletion can not be pushed down as it contains non-equal + // predicate and the table sink haven't implemented SupportsRowLevelDeleteInterface + assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t where a > 1")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Only delete push down is supported currently, " + + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t`."); + + tEnv().executeSql( + "CREATE TABLE t1 (a int) WITH" + + " ('connector' = 'test-update-delete'," + + " 'support-delete-push-down' = 'false')"); + // should throw exception for sink that doesn't implement SupportsDeletePushDown interface + assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t1")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Only delete push down is supported currently, " + + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t1`."); + } + + private String registerData() { + List<RowData> values = createValue(); + return TestUpdateDeleteTableFactory.registerRowData(values); + } + + private List<RowData> createValue() { + List<RowData> values = new ArrayList<>(); + for (int i = 0; i < ROW_NUM; i++) { + values.add(GenericRowData.of(i, StringData.fromString("b_" + i), i * 2.0)); + } + return values; + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java new file mode 100644 index 00000000000..e6ba79ef2b4 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** The IT case for DELETE statement in streaming mode. */ +public class DeleteTableITCase extends StreamingTestBase { + + @Test + public void testDelete() { + tEnv().executeSql("CREATE TABLE t (a int) WITH ('connector' = 'test-update-delete')"); + assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t")) + .isInstanceOf(TableException.class) + .hasMessage("DELETE statement is not supported for streaming mode now."); + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 7386413a5d2..bfcf0178f2c 100644 --- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -18,3 +18,4 @@ org.apache.flink.table.planner.factories.TestValuesTableFactory org.apache.flink.table.planner.factories.TestFileFactory org.apache.flink.table.planner.factories.TableFactoryHarness$Factory org.apache.flink.table.planner.plan.stream.sql.TestTableFactory +org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory
