This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fd447f08f90 [FLINK-38890][table] Migrate
`PullUpWindowTableFunctionIntoWindowAggregateRule` to java
fd447f08f90 is described below
commit fd447f08f9091efd3f8dfb35b63c4572471071d9
Author: Roman <[email protected]>
AuthorDate: Mon Jan 26 23:40:19 2026 +0100
[FLINK-38890][table] Migrate
`PullUpWindowTableFunctionIntoWindowAggregateRule` to java
---
...WindowTableFunctionIntoWindowAggregateRule.java | 298 +++++++++++++++++++++
...indowTableFunctionIntoWindowAggregateRule.scala | 205 --------------
2 files changed, 298 insertions(+), 205 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
new file mode 100644
index 00000000000..b0aaaa460ff
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
+import
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.WindowSpec;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowTableFunction;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.WindowUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple4;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Planner rule that tries to pull up {@link
StreamPhysicalWindowTableFunction} into a {@link
+ * StreamPhysicalWindowAggregate}.
+ */
[email protected]
+public class PullUpWindowTableFunctionIntoWindowAggregateRule
+ extends RelRule<
+ PullUpWindowTableFunctionIntoWindowAggregateRule
+
.PullUpWindowTableFunctionIntoWindowAggregateRuleConfig> {
+
+ public static final PullUpWindowTableFunctionIntoWindowAggregateRule
INSTANCE =
+ PullUpWindowTableFunctionIntoWindowAggregateRule
+
.PullUpWindowTableFunctionIntoWindowAggregateRuleConfig.DEFAULT
+ .toRule();
+
+ protected PullUpWindowTableFunctionIntoWindowAggregateRule(
+ PullUpWindowTableFunctionIntoWindowAggregateRule
+
.PullUpWindowTableFunctionIntoWindowAggregateRuleConfig
+ config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ StreamPhysicalWindowAggregate windowAgg = call.rel(0);
+ StreamPhysicalCalc calc = call.rel(2);
+ FlinkRelMetadataQuery fmq =
+
FlinkRelMetadataQuery.reuseOrCreate(windowAgg.getCluster().getMetadataQuery());
+
+ // condition and projection of Calc shouldn't contain calls on window
columns,
+ // otherwise, we can't transpose WindowTVF and Calc
+ if (WindowUtil.calcContainsCallsOnWindowColumns(calc, fmq)) {
+ return false;
+ }
+
+ ImmutableBitSet aggInputWindowProps =
fmq.getRelWindowProperties(calc).getWindowColumns();
+ // aggregate call shouldn't be on window columns
+ // TODO: this can be supported in the future by referencing them as a
RexFieldVariable
+ return JavaScalaConversionUtil.toJava(windowAgg.aggCalls()).stream()
+ .allMatch(
+ aggCall ->
+ aggInputWindowProps
+
.intersect(ImmutableBitSet.of(aggCall.getArgList()))
+ .isEmpty());
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ StreamPhysicalWindowAggregate windowAgg = call.rel(0);
+ StreamPhysicalCalc calc = call.rel(2);
+ StreamPhysicalWindowTableFunction windowTVF = call.rel(3);
+ FlinkRelMetadataQuery fmq =
+
FlinkRelMetadataQuery.reuseOrCreate(windowAgg.getCluster().getMetadataQuery());
+ RelOptCluster cluster = windowAgg.getCluster();
+ RelNode input = unwrapRel(windowTVF.getInput());
+
+ if (input instanceof StreamPhysicalExchange) {
+ input = ((StreamPhysicalExchange) input).getInput();
+ }
+
+ RelDataType inputRowType = input.getRowType();
+
+ RelTraitSet requiredInputTraitSet =
+
input.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+ RelNode newInput = RelOptRule.convert(input, requiredInputTraitSet);
+
+ //
-------------------------------------------------------------------------
+ // 1. transpose Calc and WindowTVF, build the new Calc node
+ //
-------------------------------------------------------------------------
+ ImmutableBitSet windowColumns =
fmq.getRelWindowProperties(windowTVF).getWindowColumns();
+ Tuple4<RexProgram, int[], Object, Object> programInfo =
+ WindowUtil.buildNewProgramWithoutWindowColumns(
+ cluster.getRexBuilder(),
+ calc.getProgram(),
+ inputRowType,
+ windowTVF.windowing().getTimeAttributeIndex(),
+ windowColumns.toArray());
+ RexProgram newProgram = programInfo._1();
+ int[] aggInputFieldsShift = programInfo._2();
+ int timeAttributeIndex = (int) programInfo._3();
+
+ StreamPhysicalCalc newCalc =
+ new StreamPhysicalCalc(
+ cluster,
+ calc.getTraitSet(),
+ newInput,
+ newProgram,
+ newProgram.getOutputRowType());
+
+ //
-------------------------------------------------------------------------
+ // 2. Adjust grouping index and convert Calc with new distribution
+ //
-------------------------------------------------------------------------
+ int[] newGrouping =
+ Arrays.stream(windowAgg.grouping())
+ .map(grouping -> aggInputFieldsShift[grouping])
+ .toArray();
+ FlinkRelDistribution requiredDistribution =
+ (newGrouping.length != 0)
+ ? FlinkRelDistribution.hash(newGrouping, true)
+ : FlinkRelDistribution.SINGLETON();
+ RelTraitSet requiredTraitSet =
+ newCalc.getTraitSet()
+ .replace(FlinkConventions.STREAM_PHYSICAL())
+ .replace(requiredDistribution);
+ RelNode convertedCalc = RelOptRule.convert(newCalc, requiredTraitSet);
+
+ //
-----------------------------------------------------------------------------
+ // 3. Adjust aggregate arguments index and construct new window
aggregate node
+ //
-----------------------------------------------------------------------------
+ WindowSpec newWindowSpec =
updateWindowSpec(windowTVF.windowing().getWindow(), newCalc);
+ TimeAttributeWindowingStrategy newWindowing =
+ new TimeAttributeWindowingStrategy(
+ newWindowSpec,
+ windowTVF.windowing().getTimeAttributeType(),
+ timeAttributeIndex);
+ RelTraitSet providedTraitSet =
+
windowAgg.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+
+ List<AggregateCall> newAggCalls =
+ JavaScalaConversionUtil.toJava(windowAgg.aggCalls()).stream()
+ .map(
+ aggCall -> {
+ List<Integer> newArgList =
+ aggCall.getArgList().stream()
+ .map(arg ->
aggInputFieldsShift[arg])
+
.collect(Collectors.toList());
+ int newFilterArg =
+ aggCall.hasFilter()
+ ?
aggInputFieldsShift[aggCall.filterArg]
+ : aggCall.filterArg;
+ List<RelFieldCollation> newFieldCollations
=
+
aggCall.getCollation().getFieldCollations().stream()
+ .map(
+ field ->
+
field.withFieldIndex(
+
aggInputFieldsShift[
+
field
+
.getFieldIndex()]))
+
.collect(Collectors.toList());
+ return aggCall.copy(
+ newArgList,
+ newFilterArg,
+
RelCollations.of(newFieldCollations));
+ })
+ .collect(Collectors.toList());
+
+ StreamPhysicalWindowAggregate newWindowAgg =
+ new StreamPhysicalWindowAggregate(
+ cluster,
+ providedTraitSet,
+ convertedCalc,
+ newGrouping,
+ JavaScalaConversionUtil.toScala(newAggCalls),
+ newWindowing,
+ windowAgg.namedWindowProperties());
+
+ call.transformTo(newWindowAgg);
+ }
+
+ private RelNode unwrapRel(RelNode rel) {
+ RelNode current = rel;
+ while (current instanceof RelSubset) {
+ current = ((RelSubset) current).getOriginal();
+ }
+ return current;
+ }
+
+ private WindowSpec updateWindowSpec(WindowSpec oldWindowSpec,
StreamPhysicalCalc calc) {
+ if (oldWindowSpec instanceof SessionWindowSpec) {
+ final SessionWindowSpec sessionWindowSpec = (SessionWindowSpec)
oldWindowSpec;
+ final int[] windowPartitionKeys =
sessionWindowSpec.getPartitionKeyIndices();
+ final int[] newPartitionKeysThroughCalc =
+ getSessionPartitionKeysThroughCalc(windowPartitionKeys,
calc);
+ checkArgument(windowPartitionKeys.length ==
newPartitionKeysThroughCalc.length);
+ return new SessionWindowSpec(sessionWindowSpec.getGap(),
newPartitionKeysThroughCalc);
+ }
+ return oldWindowSpec;
+ }
+
+ private int[] getSessionPartitionKeysThroughCalc(
+ int[] sessionWindowPartitionKeyIndices, StreamPhysicalCalc calc) {
+ final List<Integer> newPartitionKeyIndices = new ArrayList<>();
+ final RexProgram program = calc.getProgram();
+ for (int index = 0; index < program.getNamedProjects().size();
index++) {
+ final Pair<RexLocalRef, String> project =
program.getNamedProjects().get(index);
+ final RexNode expr = program.expandLocalRef(project.left);
+ if (expr instanceof RexInputRef) {
+ final int inputIndex = ((RexInputRef) expr).getIndex();
+ if (IntStream.of(sessionWindowPartitionKeyIndices).anyMatch(i
-> i == inputIndex)) {
+ newPartitionKeyIndices.add(index);
+ }
+ }
+ }
+ return
newPartitionKeyIndices.stream().mapToInt(Integer::intValue).toArray();
+ }
+
+ /** Configuration for {@link
PullUpWindowTableFunctionIntoWindowAggregateRule}. */
+ @Value.Immutable(singleton = false)
+ public interface PullUpWindowTableFunctionIntoWindowAggregateRuleConfig
extends RelRule.Config {
+ PullUpWindowTableFunctionIntoWindowAggregateRule
+ .PullUpWindowTableFunctionIntoWindowAggregateRuleConfig
+ DEFAULT =
+
ImmutablePullUpWindowTableFunctionIntoWindowAggregateRule
+
.PullUpWindowTableFunctionIntoWindowAggregateRuleConfig.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+
b0.operand(StreamPhysicalWindowAggregate.class)
+ .oneInput(
+ b1 ->
+
b1.operand(
+
StreamPhysicalExchange
+
.class)
+
.oneInput(
+
b2 ->
+
b2.operand(
+
StreamPhysicalCalc
+
.class)
+
.oneInput(
+
b3 ->
+
b3.operand(
+
StreamPhysicalWindowTableFunction
+
.class)
+
.anyInputs()))))
+
.withDescription("PullUpWindowTableFunctionIntoWindowAggregateRule")
+ .as(
+
PullUpWindowTableFunctionIntoWindowAggregateRule
+
.PullUpWindowTableFunctionIntoWindowAggregateRuleConfig
+ .class);
+
+ @Override
+ default PullUpWindowTableFunctionIntoWindowAggregateRule toRule() {
+ return new PullUpWindowTableFunctionIntoWindowAggregateRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.scala
deleted file mode 100644
index f98252eed98..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.flink.table.planner.plan.logical.{SessionWindowSpec,
TimeAttributeWindowingStrategy, WindowSpec}
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalCalc,
StreamPhysicalExchange, StreamPhysicalWindowAggregate,
StreamPhysicalWindowTableFunction}
-import org.apache.flink.table.planner.plan.utils.WindowUtil
-import
org.apache.flink.table.planner.plan.utils.WindowUtil.buildNewProgramWithoutWindowColumns
-import org.apache.flink.util.Preconditions.checkArgument
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.rel.{RelCollations, RelNode}
-import org.apache.calcite.rex.RexInputRef
-import org.apache.calcite.util.ImmutableBitSet
-
-import scala.annotation.tailrec
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Planner rule that tries to pull up [[StreamPhysicalWindowTableFunction]]
into a
- * [[StreamPhysicalWindowAggregate]].
- */
-class PullUpWindowTableFunctionIntoWindowAggregateRule
- extends RelOptRule(
- operand(
- classOf[StreamPhysicalWindowAggregate],
- operand(
- classOf[StreamPhysicalExchange],
- operand(
- classOf[StreamPhysicalCalc],
- operand(classOf[StreamPhysicalWindowTableFunction], any())))
- ),
- "PullUpWindowTableFunctionIntoWindowAggregateRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val windowAgg: StreamPhysicalWindowAggregate = call.rel(0)
- val calc: StreamPhysicalCalc = call.rel(2)
- val fmq =
FlinkRelMetadataQuery.reuseOrCreate(windowAgg.getCluster.getMetadataQuery)
-
- // condition and projection of Calc shouldn't contain calls on window
columns,
- // otherwise, we can't transpose WindowTVF and Calc
- if (WindowUtil.calcContainsCallsOnWindowColumns(calc, fmq)) {
- return false
- }
-
- val aggInputWindowProps = fmq.getRelWindowProperties(calc).getWindowColumns
- // aggregate call shouldn't be on window columns
- // TODO: this can be supported in the future by referencing them as a
RexFieldVariable
- windowAgg.aggCalls.forall {
- call =>
aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
- }
- }
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val windowAgg: StreamPhysicalWindowAggregate = call.rel(0)
- val calc: StreamPhysicalCalc = call.rel(2)
- val windowTVF: StreamPhysicalWindowTableFunction = call.rel(3)
- val fmq =
FlinkRelMetadataQuery.reuseOrCreate(windowAgg.getCluster.getMetadataQuery)
- val cluster = windowAgg.getCluster
- val input = unwrapRel(windowTVF.getInput) match {
- case exchange: StreamPhysicalExchange =>
- exchange.getInput
- case other =>
- other
- }
- val inputRowType = input.getRowType
-
- val requiredInputTraitSet =
input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- val newInput: RelNode = RelOptRule.convert(input, requiredInputTraitSet)
-
- //
-------------------------------------------------------------------------
- // 1. transpose Calc and WindowTVF, build the new Calc node
- //
-------------------------------------------------------------------------
- val windowColumns = fmq.getRelWindowProperties(windowTVF).getWindowColumns
- val (newProgram, aggInputFieldsShift, timeAttributeIndex, _) =
- buildNewProgramWithoutWindowColumns(
- cluster.getRexBuilder,
- calc.getProgram,
- inputRowType,
- windowTVF.windowing.getTimeAttributeIndex,
- windowColumns.toArray)
- val newCalc = new StreamPhysicalCalc(
- cluster,
- calc.getTraitSet,
- newInput,
- newProgram,
- newProgram.getOutputRowType)
-
- //
-------------------------------------------------------------------------
- // 2. Adjust grouping index and convert Calc with new distribution
- //
-------------------------------------------------------------------------
- val newGrouping = windowAgg.grouping
- .map(aggInputFieldsShift(_))
- val requiredDistribution = if (newGrouping.length != 0) {
- FlinkRelDistribution.hash(newGrouping, requireStrict = true)
- } else {
- FlinkRelDistribution.SINGLETON
- }
- val requiredTraitSet = newCalc.getTraitSet
- .replace(FlinkConventions.STREAM_PHYSICAL)
- .replace(requiredDistribution)
- val convertedCalc = RelOptRule.convert(newCalc, requiredTraitSet)
-
- //
-----------------------------------------------------------------------------
- // 3. Adjust aggregate arguments index and construct new window aggregate
node
- //
-----------------------------------------------------------------------------
- val newWindowSpec = updateWindowSpec(windowTVF.windowing.getWindow,
newCalc)
- val newWindowing = new TimeAttributeWindowingStrategy(
- newWindowSpec,
- windowTVF.windowing.getTimeAttributeType,
- timeAttributeIndex)
- val providedTraitSet =
windowAgg.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- val newAggCalls = windowAgg.aggCalls.map {
- call =>
- val newArgList = call.getArgList.map(arg =>
Int.box(aggInputFieldsShift(arg)))
- val newFilterArg = if (call.hasFilter) {
- aggInputFieldsShift(call.filterArg)
- } else {
- call.filterArg
- }
- val newFiledCollations = call.getCollation.getFieldCollations.map {
- field =>
field.withFieldIndex(aggInputFieldsShift(field.getFieldIndex))
- }
- val newCollation = RelCollations.of(newFiledCollations)
- call.copy(newArgList, newFilterArg, newCollation)
- }
-
- val newWindowAgg = new StreamPhysicalWindowAggregate(
- cluster,
- providedTraitSet,
- convertedCalc,
- newGrouping,
- newAggCalls,
- newWindowing,
- windowAgg.namedWindowProperties)
-
- call.transformTo(newWindowAgg)
- }
-
- @tailrec
- private def unwrapRel(rel: RelNode): RelNode = {
- rel match {
- case relSubset: RelSubset =>
- unwrapRel(relSubset.getOriginal)
- case _ =>
- rel
- }
- }
-
- private def updateWindowSpec(oldWindowSpec: WindowSpec, calc:
StreamPhysicalCalc): WindowSpec = {
- oldWindowSpec match {
- case sessionWindowSpec: SessionWindowSpec =>
- val windowPartitionKeys = sessionWindowSpec.getPartitionKeyIndices
- val newPartitionKeysThroughCalc =
- getSessionPartitionKeysThroughCalc(windowPartitionKeys, calc)
- checkArgument(windowPartitionKeys.length ==
newPartitionKeysThroughCalc.length)
- new SessionWindowSpec(sessionWindowSpec.getGap,
newPartitionKeysThroughCalc)
- case _ => oldWindowSpec
- }
- }
-
- private def getSessionPartitionKeysThroughCalc(
- sessionWindowPartitionKeyIndices: Array[Int],
- calc: StreamPhysicalCalc): Array[Int] = {
- val newPartitionKeyIndices = ArrayBuffer[Int]()
- val program = calc.getProgram
- program.getNamedProjects.zipWithIndex.foreach {
- case (project, index) =>
- val expr = program.expandLocalRef(project.left)
- expr match {
- case inputRef: RexInputRef =>
- if (sessionWindowPartitionKeyIndices.indexOf(inputRef.getIndex) !=
-1) {
- newPartitionKeyIndices += index
- }
- case _ => // ignore
- }
- }
- newPartitionKeyIndices.toArray
- }
-}
-
-object PullUpWindowTableFunctionIntoWindowAggregateRule {
- val INSTANCE = new PullUpWindowTableFunctionIntoWindowAggregateRule
-}