This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5739b16dcb2 [FLINK-34600][table] Migrate 
PushLimitIntoLegacyTableSourceScanRule to java
5739b16dcb2 is described below

commit 5739b16dcb26c5f4a38b5cc914082f69e4a001bf
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sun Nov 24 15:32:35 2024 +0100

    [FLINK-34600][table] Migrate PushLimitIntoLegacyTableSourceScanRule to java
---
 .../PushLimitIntoLegacyTableSourceScanRule.java    | 153 +++++++++++++++++++++
 .../PushLimitIntoLegacyTableSourceScanRule.scala   | 126 -----------------
 2 files changed, 153 insertions(+), 126 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.java
new file mode 100644
index 00000000000..834caed78bb
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.legacy.sources.LimitableTableSource;
+import org.apache.flink.table.legacy.sources.TableSource;
+import org.apache.flink.table.plan.stats.TableStats;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacyTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.tools.RelBuilder;
+import org.immutables.value.Value;
+
+import java.util.Collections;
+
+/**
+ * Planner rule that tries to push limit into a {@link LimitableTableSource}. 
The original limit
+ * will still be retained.
+ *
+ * <p>The reasons why the limit still be retained: 1.If the source is required 
to return the exact
+ * number of limit number, the implementation of the source is highly 
required. The source is
+ * required to accurately control the record number of split, and the 
parallelism setting also need
+ * to be adjusted accordingly. 2.When remove the limit, maybe filter will be 
pushed down to the
+ * source after limit pushed down. The source need know it should do limit 
first and do the filter
+ * later, it is hard to implement. 3.We can support limit with offset, we can 
push down offset +
+ * fetch to table source.
+ */
+@Internal
[email protected]
+public class PushLimitIntoLegacyTableSourceScanRule
+        extends RelRule<
+                PushLimitIntoLegacyTableSourceScanRule
+                        .PushLimitIntoLegacyTableSourceScanRuleConfig> {
+
+    public static final PushLimitIntoLegacyTableSourceScanRule INSTANCE =
+            PushLimitIntoLegacyTableSourceScanRuleConfig.DEFAULT.toRule();
+
+    protected PushLimitIntoLegacyTableSourceScanRule(
+            PushLimitIntoLegacyTableSourceScanRuleConfig config) {
+        super(config);
+    }
+
+    public boolean matches(RelOptRuleCall call) {
+        Sort sort = call.rel(0);
+        final boolean onlyLimit =
+                sort.getCollation().getFieldCollations().isEmpty() && 
sort.fetch != null;
+        if (onlyLimit) {
+            LegacyTableSourceTable table =
+                    
call.rel(1).getTable().unwrap(LegacyTableSourceTable.class);
+            if (table != null) {
+                TableSource tableSource = table.tableSource();
+                return tableSource instanceof LimitableTableSource
+                        && !((LimitableTableSource) 
tableSource).isLimitPushedDown();
+            }
+        }
+        return false;
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+        Sort sort = call.rel(0);
+        FlinkLogicalLegacyTableSourceScan scan = call.rel(1);
+        LegacyTableSourceTable tableSourceTable =
+                scan.getTable().unwrap(LegacyTableSourceTable.class);
+        int offset = (sort.offset == null) ? 0 : 
RexLiteral.intValue(sort.offset);
+        int limit = offset + RexLiteral.intValue(sort.fetch);
+        RelBuilder relBuilder = call.builder();
+        LegacyTableSourceTable newRelOptTable = applyLimit(limit, 
tableSourceTable);
+        FlinkLogicalLegacyTableSourceScan newScan = 
scan.copy(scan.getTraitSet(), newRelOptTable);
+
+        TableSource newTableSource =
+                
newRelOptTable.unwrap(LegacyTableSourceTable.class).tableSource();
+        TableSource oldTableSource =
+                
tableSourceTable.unwrap(LegacyTableSourceTable.class).tableSource();
+
+        if (((LimitableTableSource) newTableSource).isLimitPushedDown()
+                && 
newTableSource.explainSource().equals(oldTableSource.explainSource())) {
+            throw new TableException(
+                    "Failed to push limit into table source! "
+                            + "table source with pushdown capability must 
override and change "
+                            + "explainSource() API to explain the pushdown 
applied!");
+        }
+
+        call.transformTo(sort.copy(sort.getTraitSet(), 
Collections.singletonList(newScan)));
+    }
+
+    private LegacyTableSourceTable applyLimit(long limit, 
FlinkPreparingTableBase relOptTable) {
+        LegacyTableSourceTable tableSourceTable = 
relOptTable.unwrap(LegacyTableSourceTable.class);
+        LimitableTableSource limitedSource = (LimitableTableSource) 
tableSourceTable.tableSource();
+        TableSource newTableSource = limitedSource.applyLimit(limit);
+
+        FlinkStatistic statistic = relOptTable.getStatistic();
+        long newRowCount =
+                (statistic.getRowCount() != null)
+                        ? Math.min(limit, statistic.getRowCount().longValue())
+                        : limit;
+        // Update TableStats after limit push down
+        TableStats newTableStats = new TableStats(newRowCount);
+        FlinkStatistic newStatistic =
+                
FlinkStatistic.builder().statistic(statistic).tableStats(newTableStats).build();
+        return tableSourceTable.copy(newTableSource, newStatistic);
+    }
+
+    /** Configuration for {@link PushLimitIntoLegacyTableSourceScanRule}. */
+    @Value.Immutable(singleton = false)
+    public interface PushLimitIntoLegacyTableSourceScanRuleConfig extends 
RelRule.Config {
+        
PushLimitIntoLegacyTableSourceScanRule.PushLimitIntoLegacyTableSourceScanRuleConfig
+                DEFAULT =
+                        ImmutablePushLimitIntoLegacyTableSourceScanRule
+                                
.PushLimitIntoLegacyTableSourceScanRuleConfig.builder()
+                                .operandSupplier(
+                                        b0 ->
+                                                
b0.operand(FlinkLogicalSort.class)
+                                                        .oneInput(
+                                                                b1 ->
+                                                                        
b1.operand(
+                                                                               
         FlinkLogicalLegacyTableSourceScan
+                                                                               
                 .class)
+                                                                               
 .noInputs()))
+                                
.description("PushLimitIntoLegacyTableSourceScanRule")
+                                .build()
+                                
.as(PushLimitIntoLegacyTableSourceScanRuleConfig.class);
+
+        @Override
+        default PushLimitIntoLegacyTableSourceScanRule toRule() {
+            return new PushLimitIntoLegacyTableSourceScanRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.scala
deleted file mode 100644
index 677de3a76c5..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.legacy.sources.LimitableTableSource
-import org.apache.flink.table.plan.stats.TableStats
-import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan,
 FlinkLogicalSort}
-import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
LegacyTableSourceTable}
-import org.apache.flink.table.planner.plan.stats.FlinkStatistic
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{none, operand}
-import org.apache.calcite.rel.core.{Sort, TableScan}
-import org.apache.calcite.rex.RexLiteral
-import org.apache.calcite.tools.RelBuilder
-
-import java.util.Collections
-
-/**
- * Planner rule that tries to push limit into a [[LimitableTableSource]]. The 
original limit will
- * still be retained.
- *
- * The reasons why the limit still be retained:
- * 1.If the source is required to return the exact number of limit number, the 
implementation of the
- * source is highly required. The source is required to accurately control the 
record number of
- * split, and the parallelism setting also need to be adjusted accordingly. 
2.When remove the limit,
- * maybe filter will be pushed down to the source after limit pushed down. The 
source need know it
- * should do limit first and do the filter later, it is hard to implement. 
3.We can support limit
- * with offset, we can push down offset + fetch to table source.
- */
-class PushLimitIntoLegacyTableSourceScanRule
-  extends RelOptRule(
-    operand(classOf[FlinkLogicalSort], 
operand(classOf[FlinkLogicalLegacyTableSourceScan], none)),
-    "PushLimitIntoLegacyTableSourceScanRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val sort = call.rel(0).asInstanceOf[Sort]
-    val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && sort.fetch 
!= null
-    if (onlyLimit) {
-      call
-        .rel(1)
-        .asInstanceOf[TableScan]
-        .getTable
-        .unwrap(classOf[LegacyTableSourceTable[_]]) match {
-        case table: LegacyTableSourceTable[_] =>
-          table.tableSource match {
-            case source: LimitableTableSource[_] =>
-              return !source.isLimitPushedDown
-            case _ =>
-          }
-        case _ =>
-      }
-    }
-    false
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val sort = call.rel(0).asInstanceOf[Sort]
-    val scan = call.rel(1).asInstanceOf[FlinkLogicalLegacyTableSourceScan]
-    val tableSourceTable = 
scan.getTable.unwrap(classOf[LegacyTableSourceTable[_]])
-    val offset = if (sort.offset == null) 0 else 
RexLiteral.intValue(sort.offset)
-    val limit = offset + RexLiteral.intValue(sort.fetch)
-    val relBuilder = call.builder()
-    val newRelOptTable = applyLimit(limit, tableSourceTable, relBuilder)
-    val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
-
-    val newTableSource = 
newRelOptTable.unwrap(classOf[LegacyTableSourceTable[_]]).tableSource
-    val oldTableSource = 
tableSourceTable.unwrap(classOf[LegacyTableSourceTable[_]]).tableSource
-
-    if (
-      newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
-      && newTableSource.explainSource().equals(oldTableSource.explainSource)
-    ) {
-      throw new TableException(
-        "Failed to push limit into table source! "
-          + "table source with pushdown capability must override and change "
-          + "explainSource() API to explain the pushdown applied!")
-    }
-
-    call.transformTo(sort.copy(sort.getTraitSet, 
Collections.singletonList(newScan)))
-  }
-
-  private def applyLimit(
-      limit: Long,
-      relOptTable: FlinkPreparingTableBase,
-      relBuilder: RelBuilder): LegacyTableSourceTable[_] = {
-    val tableSourceTable = 
relOptTable.unwrap(classOf[LegacyTableSourceTable[Any]])
-    val limitedSource = 
tableSourceTable.tableSource.asInstanceOf[LimitableTableSource[Any]]
-    val newTableSource = limitedSource.applyLimit(limit)
-
-    val statistic = relOptTable.getStatistic
-    val newRowCount = if (statistic.getRowCount != null) {
-      Math.min(limit, statistic.getRowCount.toLong)
-    } else {
-      limit
-    }
-    // Update TableStats after limit push down
-    val newTableStats = new TableStats(newRowCount)
-    val newStatistic = FlinkStatistic
-      .builder()
-      .statistic(statistic)
-      .tableStats(newTableStats)
-      .build()
-    tableSourceTable.copy(newTableSource, newStatistic)
-  }
-}
-
-object PushLimitIntoLegacyTableSourceScanRule {
-  val INSTANCE: RelOptRule = new PushLimitIntoLegacyTableSourceScanRule
-}

Reply via email to