godfreyhe commented on a change in pull request #18408:
URL: https://github.com/apache/flink/pull/18408#discussion_r806430166



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.scala
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.logical.MatchRecognize
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
+import org.apache.flink.table.planner.plan.utils.MatchUtil
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil._
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import _root_.java.util
+
+import _root_.scala.collection.JavaConversions._
+
+/**
+ * Batch physical RelNode which matches along with MATCH_RECOGNIZE.
+ */
+class BatchPhysicalMatch(
+                           cluster: RelOptCluster,
+                           traitSet: RelTraitSet,
+                           inputNode: RelNode,
+                           val logicalMatch: MatchRecognize,
+                           outputRowType: RelDataType)
+  extends SingleRel(cluster, traitSet, inputNode)
+    with BatchPhysicalRel {

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
+import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
+import 
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.MathUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BatchExecMatch extends ExecNodeBase<RowData>

Review comment:
       batch exec nodes do not require json ser/de.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.scala
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
+import 
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.ToMillis
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class MatchRecognizeValidationTest extends TableTestBase {
+
+  private val batchUtil = scalaBatchTestUtil()
+  batchUtil.addDataStream[(Int, String, Timestamp)](
+    "MyTable", 'a, 'b, 'rowtime.rowtime, 'proctime.proctime)
+  batchUtil.addDataStream[(String, Long, Int, Int)](
+    "Ticker", 'symbol, 'tstamp, 'price, 'tax, 'proctime.proctime)
+  batchUtil.addFunction("ToMillis", new ToMillis)

Review comment:
       we should use new api (ddl) instead the deprecated api

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
+import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
+import 
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.MathUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BatchExecMatch extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData>, 
MultipleTransformationTranslator<RowData> {
+
+    public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
+
+    @JsonProperty(FIELD_NAME_MATCH_SPEC)
+    private final MatchSpec matchSpec;
+
+    public BatchExecMatch(
+            MatchSpec matchSpec,
+            InputProperty inputProperty,
+            RowType outputType,
+            String description) {
+        super(getNewNodeId(), Collections.singletonList(inputProperty), 
outputType, description);
+        checkArgument(getInputProperties().size() == 1);
+        this.matchSpec = checkNotNull(matchSpec);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        final Transformation<RowData> inputTransform =
+                (Transformation<RowData>) inputEdge.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+
+        checkOrderKeys(inputRowType);
+        final TableConfig config = planner.getTableConfig();
+        final EventComparator<RowData> eventComparator =
+                createEventComparator(config, inputRowType);
+        final Transformation<RowData> timestampedInputTransform =
+                translateOrder(inputTransform, inputRowType);
+
+        final Tuple2<Pattern<RowData, RowData>, List<String>> 
cepPatternAndNames =
+                translatePattern(matchSpec, config, planner.getRelBuilder(), 
inputRowType);
+        final Pattern<RowData, RowData> cepPattern = cepPatternAndNames.f0;
+
+        // TODO remove this once it is supported in CEP library
+        if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
+            throw new TableException(
+                    "Patterns that can produce empty matches are not 
supported. There must be at least one non-optional state.");
+        }
+
+        // TODO remove this once it is supported in CEP library
+        if 
(cepPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
+            throw new TableException(
+                    "Greedy quantifiers are not allowed as the last element of 
a Pattern yet. "
+                            + "Finish your pattern with either a simple 
variable or reluctant quantifier.");
+        }
+
+        if (matchSpec.isAllRows()) {
+            throw new TableException("All rows per match mode is not supported 
yet.");
+        }
+
+        final int[] partitionKeys = matchSpec.getPartition().getFieldIndices();
+        final SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
+        final LogicalType timeOrderFieldType =
+                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
+
+        final boolean isProctime = 
TypeCheckUtils.isProcTime(timeOrderFieldType);
+        final InternalTypeInfo<RowData> inputTypeInfo =
+                (InternalTypeInfo<RowData>) inputTransform.getOutputType();
+        final TypeSerializer<RowData> inputSerializer =
+                
inputTypeInfo.createSerializer(planner.getExecEnv().getConfig());
+        final NFACompiler.NFAFactory<RowData> nfaFactory =
+                NFACompiler.compileFactory(cepPattern, false);
+        final MatchCodeGenerator generator =
+                new MatchCodeGenerator(
+                        new CodeGeneratorContext(config),
+                        planner.getRelBuilder(),
+                        false, // nullableInput
+                        JavaScalaConversionUtil.toScala(cepPatternAndNames.f1),
+                        JavaScalaConversionUtil.toScala(Optional.empty()),
+                        CodeGenUtils.DEFAULT_COLLECTOR_TERM());
+        generator.bindInput(
+                inputRowType,
+                CodeGenUtils.DEFAULT_INPUT1_TERM(),
+                JavaScalaConversionUtil.toScala(Optional.empty()));
+        final PatternProcessFunctionRunner patternProcessFunction =
+                generator.generateOneRowPerMatchExpression(
+                        (RowType) getOutputType(), partitionKeys, 
matchSpec.getMeasures());
+        final CepOperator<RowData, RowData, RowData> operator =
+                new CepOperator<>(
+                        inputSerializer,
+                        isProctime,
+                        nfaFactory,
+                        eventComparator,
+                        cepPattern.getAfterMatchSkipStrategy(),
+                        patternProcessFunction,
+                        null);
+        final OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        timestampedInputTransform,
+                        getOperatorName(config),
+                        getOperatorDescription(config),
+                        operator,
+                        InternalTypeInfo.of(getOutputType()),
+                        timestampedInputTransform.getParallelism());
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(partitionKeys, 
inputTypeInfo);
+        transform.setStateKeySelector(selector);
+        transform.setStateKeyType(selector.getProducedType());
+
+        if (inputsContainSingleton()) {
+            transform.setParallelism(1);
+            transform.setMaxParallelism(1);
+        }
+        return transform;
+    }
+
+    private void checkOrderKeys(RowType inputRowType) {
+        SortSpec orderKeys = matchSpec.getOrderKeys();
+        if (orderKeys.getFieldSize() == 0) {
+            throw new TableException("You must specify either rowtime or 
proctime for order by.");
+        }
+
+        SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+        LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+        // need to identify time between others order fields. Time needs to be 
first sort element
+        if (!TypeCheckUtils.isRowTime(timeOrderFieldType)

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
+import org.junit.{Before, Test}
+
+class MatchRecognizeTest extends TableTestBase {
+
+  protected val util: BatchTableTestUtil = batchTestUtil()
+
+  @Before
+  def before(): Unit = {
+    val ddl =
+      """
+        |CREATE TABLE Ticker (
+        | `symbol` STRING,
+        | `ts_ltz` TIMESTAMP_LTZ(3),
+        | `price` INT,
+        | `tax` INT,
+        | WATERMARK FOR `ts_ltz` AS `ts_ltz` - INTERVAL '1' SECOND
+        |) WITH (
+        | 'connector' = 'values',
+        | 'bounded' = 'true'
+        |)
+        |""".stripMargin
+    util.tableEnv.executeSql(ddl)
+  }
+
+  @Test
+  def testMatchRecognizeOnRowtime(): Unit = {

Review comment:
       no rowtime concept for batch

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.scala
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
+import 
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.ToMillis
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class MatchRecognizeValidationTest extends TableTestBase {

Review comment:
       we should use JAVA for new class

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.scala
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.logical.MatchRecognize
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
+import org.apache.flink.table.planner.plan.utils.MatchUtil
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil._
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import _root_.java.util
+
+import _root_.scala.collection.JavaConversions._
+
+/**
+ * Batch physical RelNode which matches along with MATCH_RECOGNIZE.
+ */
+class BatchPhysicalMatch(
+                           cluster: RelOptCluster,
+                           traitSet: RelTraitSet,
+                           inputNode: RelNode,
+                           val logicalMatch: MatchRecognize,
+                           outputRowType: RelDataType)
+  extends SingleRel(cluster, traitSet, inputNode)
+    with BatchPhysicalRel {
+
+  if (logicalMatch.measures.values().exists(containsPythonCall(_)) ||
+    logicalMatch.patternDefinitions.values().exists(containsPythonCall(_))) {
+    throw new TableException("Python Function can not be used in 
MATCH_RECOGNIZE for now.")
+  }
+
+  override def deriveRowType(): RelDataType = outputRowType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new BatchPhysicalMatch(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      logicalMatch,
+      outputRowType)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val inputRowType = getInput.getRowType
+    val fieldNames = inputRowType.getFieldNames.toList
+    super.explainTerms(pw)
+      .itemIf("partitionBy",
+        fieldToString(logicalMatch.partitionKeys.toArray, inputRowType),
+        !logicalMatch.partitionKeys.isEmpty)
+      .itemIf("orderBy",
+        collationToString(logicalMatch.orderKeys, inputRowType),
+        !logicalMatch.orderKeys.getFieldCollations.isEmpty)
+      .itemIf("measures",
+        measuresDefineToString(
+          logicalMatch.measures,
+          fieldNames,
+          getExpressionString,
+          convertToExpressionDetail(pw.getDetailLevel)),
+        !logicalMatch.measures.isEmpty)
+      .item("rowsPerMatch", rowsPerMatchToString(logicalMatch.allRows))
+      .item("after", afterMatchToString(logicalMatch.after, fieldNames))
+      .item("pattern", logicalMatch.pattern.toString)
+      .itemIf("subset",
+        subsetToString(logicalMatch.subsets),
+        !logicalMatch.subsets.isEmpty)
+      .item("define", logicalMatch.patternDefinitions)
+  }

Review comment:
       introduce `CommonPhysicalMatch`, most code of `BatchPhysicalMatch` and 
`StreamPhysicalMatch` are the same

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.scala
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.logical.MatchRecognize
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
+import org.apache.flink.table.planner.plan.utils.MatchUtil
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil._
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import _root_.java.util
+
+import _root_.scala.collection.JavaConversions._
+
+/**
+ * Batch physical RelNode which matches along with MATCH_RECOGNIZE.
+ */
+class BatchPhysicalMatch(
+                           cluster: RelOptCluster,
+                           traitSet: RelTraitSet,
+                           inputNode: RelNode,
+                           val logicalMatch: MatchRecognize,
+                           outputRowType: RelDataType)

Review comment:
       nit: pay attention to indentation

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
+import org.junit.{Before, Test}
+
+class MatchRecognizeTest extends TableTestBase {
+
+  protected val util: BatchTableTestUtil = batchTestUtil()
+
+  @Before
+  def before(): Unit = {
+    val ddl =
+      """
+        |CREATE TABLE Ticker (
+        | `symbol` STRING,
+        | `ts_ltz` TIMESTAMP_LTZ(3),
+        | `price` INT,
+        | `tax` INT,
+        | WATERMARK FOR `ts_ltz` AS `ts_ltz` - INTERVAL '1' SECOND
+        |) WITH (
+        | 'connector' = 'values',
+        | 'bounded' = 'true'
+        |)
+        |""".stripMargin
+    util.tableEnv.executeSql(ddl)
+  }
+
+  @Test
+  def testMatchRecognizeOnRowtime(): Unit = {
+    val ddl =
+      """
+        |CREATE TABLE Ticker1 (
+        | `symbol` STRING,
+        | `ts` TIMESTAMP(3),
+        | `price` INT,
+        | `tax` INT,
+        | WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND
+        |) WITH (
+        | 'connector' = 'values',
+        | 'bounded' = 'true'
+        |)
+        |""".stripMargin
+    util.tableEnv.executeSql(ddl)
+
+    val sqlQuery =
+      s"""
+         |SELECT
+         |  symbol,
+         |  SUM(price) as price,
+         |  TUMBLE_ROWTIME(matchRowtime, interval '3' second) as rowTime,
+         |  TUMBLE_START(matchRowtime, interval '3' second) as startTime
+         |FROM Ticker1
+         |MATCH_RECOGNIZE (
+         |  PARTITION BY symbol
+         |  ORDER BY ts
+         |  MEASURES
+         |    A.price as price,
+         |    A.tax as tax,
+         |    MATCH_ROWTIME() as matchRowtime
+         |  ONE ROW PER MATCH
+         |  PATTERN (A)
+         |  DEFINE
+         |    A AS A.price > 0
+         |) AS T
+         |GROUP BY symbol, TUMBLE(matchRowtime, interval '3' second)
+         |""".stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  // 
----------------------------------------------------------------------------------------
+  // Tests for Illegal use of Match_RowTime
+  // 
----------------------------------------------------------------------------------------
+
+  @Test
+  def testMatchRowtimeWithoutArgumentOnRowtimeLTZ(): Unit = {
+    val sqlQuery =
+      s"""
+         |SELECT
+         |  symbol,
+         |  SUM(price) as price,
+         |  TUMBLE_ROWTIME(matchRowtime, interval '3' second) as rowTime,
+         |  TUMBLE_START(matchRowtime, interval '3' second) as startTime
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  PARTITION BY symbol
+         |  ORDER BY ts_ltz
+         |  MEASURES
+         |    A.price as price,
+         |    A.tax as tax,
+         |    MATCH_ROWTIME() as matchRowtime
+         |  ONE ROW PER MATCH
+         |  PATTERN (A)
+         |  DEFINE
+         |    A AS A.price > 0
+         |) AS T
+         |GROUP BY symbol, TUMBLE(matchRowtime, interval '3' second)
+         |""".stripMargin
+    util.verifyRelPlan(sqlQuery)

Review comment:
       please verify the exec plan instead of physical plan

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.scala
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.physical.batch
+
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.planner.plan.logical.MatchRecognize
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalMatch
+import org.apache.flink.table.planner.plan.utils.{MatchUtil, RexDefaultVisitor}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class BatchPhysicalMatchRule

Review comment:
       Introduce `CommonMatchRule` to put the same code of 
`BatchPhysicalMatchRule` and `StreamPhysicalMatchRule`
   
   we should use JAVA for new class

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
+import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
+import 
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.MathUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BatchExecMatch extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData>, 
MultipleTransformationTranslator<RowData> {
+
+    public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
+
+    @JsonProperty(FIELD_NAME_MATCH_SPEC)
+    private final MatchSpec matchSpec;
+
+    public BatchExecMatch(
+            MatchSpec matchSpec,
+            InputProperty inputProperty,
+            RowType outputType,
+            String description) {
+        super(getNewNodeId(), Collections.singletonList(inputProperty), 
outputType, description);
+        checkArgument(getInputProperties().size() == 1);
+        this.matchSpec = checkNotNull(matchSpec);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {

Review comment:
       Introduce CommonExecMatch to hold the same code

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
+import org.junit.{Before, Test}
+
+class MatchRecognizeTest extends TableTestBase {
+
+  protected val util: BatchTableTestUtil = batchTestUtil()
+
+  @Before
+  def before(): Unit = {
+    val ddl =
+      """
+        |CREATE TABLE Ticker (
+        | `symbol` STRING,
+        | `ts_ltz` TIMESTAMP_LTZ(3),
+        | `price` INT,
+        | `tax` INT,
+        | WATERMARK FOR `ts_ltz` AS `ts_ltz` - INTERVAL '1' SECOND

Review comment:
       no need watermark definition for batch 

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
+import org.junit.{Before, Test}
+
+class MatchRecognizeTest extends TableTestBase {

Review comment:
       Use java for new class
   
   testCascadeMatch in stream.MatchRecognizeTest can also be added into this 
class

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
+import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
+import 
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.MathUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BatchExecMatch extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData>, 
MultipleTransformationTranslator<RowData> {
+
+    public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
+
+    @JsonProperty(FIELD_NAME_MATCH_SPEC)
+    private final MatchSpec matchSpec;
+
+    public BatchExecMatch(
+            MatchSpec matchSpec,
+            InputProperty inputProperty,
+            RowType outputType,
+            String description) {
+        super(getNewNodeId(), Collections.singletonList(inputProperty), 
outputType, description);
+        checkArgument(getInputProperties().size() == 1);
+        this.matchSpec = checkNotNull(matchSpec);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        final Transformation<RowData> inputTransform =
+                (Transformation<RowData>) inputEdge.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+
+        checkOrderKeys(inputRowType);
+        final TableConfig config = planner.getTableConfig();
+        final EventComparator<RowData> eventComparator =
+                createEventComparator(config, inputRowType);
+        final Transformation<RowData> timestampedInputTransform =
+                translateOrder(inputTransform, inputRowType);
+
+        final Tuple2<Pattern<RowData, RowData>, List<String>> 
cepPatternAndNames =
+                translatePattern(matchSpec, config, planner.getRelBuilder(), 
inputRowType);
+        final Pattern<RowData, RowData> cepPattern = cepPatternAndNames.f0;
+
+        // TODO remove this once it is supported in CEP library
+        if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
+            throw new TableException(
+                    "Patterns that can produce empty matches are not 
supported. There must be at least one non-optional state.");
+        }
+
+        // TODO remove this once it is supported in CEP library
+        if 
(cepPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
+            throw new TableException(
+                    "Greedy quantifiers are not allowed as the last element of 
a Pattern yet. "
+                            + "Finish your pattern with either a simple 
variable or reluctant quantifier.");
+        }
+
+        if (matchSpec.isAllRows()) {
+            throw new TableException("All rows per match mode is not supported 
yet.");
+        }
+
+        final int[] partitionKeys = matchSpec.getPartition().getFieldIndices();
+        final SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
+        final LogicalType timeOrderFieldType =
+                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
+
+        final boolean isProctime = 
TypeCheckUtils.isProcTime(timeOrderFieldType);

Review comment:
       batch has no time attribute concept, so the logic should be changed

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
+import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
+import 
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.MathUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BatchExecMatch extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData>, 
MultipleTransformationTranslator<RowData> {
+
+    public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
+
+    @JsonProperty(FIELD_NAME_MATCH_SPEC)
+    private final MatchSpec matchSpec;
+
+    public BatchExecMatch(
+            MatchSpec matchSpec,
+            InputProperty inputProperty,
+            RowType outputType,
+            String description) {
+        super(getNewNodeId(), Collections.singletonList(inputProperty), 
outputType, description);
+        checkArgument(getInputProperties().size() == 1);
+        this.matchSpec = checkNotNull(matchSpec);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        final Transformation<RowData> inputTransform =
+                (Transformation<RowData>) inputEdge.translateToPlan(planner);
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+
+        checkOrderKeys(inputRowType);
+        final TableConfig config = planner.getTableConfig();
+        final EventComparator<RowData> eventComparator =
+                createEventComparator(config, inputRowType);
+        final Transformation<RowData> timestampedInputTransform =
+                translateOrder(inputTransform, inputRowType);
+
+        final Tuple2<Pattern<RowData, RowData>, List<String>> 
cepPatternAndNames =
+                translatePattern(matchSpec, config, planner.getRelBuilder(), 
inputRowType);
+        final Pattern<RowData, RowData> cepPattern = cepPatternAndNames.f0;
+
+        // TODO remove this once it is supported in CEP library
+        if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
+            throw new TableException(
+                    "Patterns that can produce empty matches are not 
supported. There must be at least one non-optional state.");
+        }
+
+        // TODO remove this once it is supported in CEP library
+        if 
(cepPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
+            throw new TableException(
+                    "Greedy quantifiers are not allowed as the last element of 
a Pattern yet. "
+                            + "Finish your pattern with either a simple 
variable or reluctant quantifier.");
+        }
+
+        if (matchSpec.isAllRows()) {
+            throw new TableException("All rows per match mode is not supported 
yet.");
+        }
+
+        final int[] partitionKeys = matchSpec.getPartition().getFieldIndices();
+        final SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
+        final LogicalType timeOrderFieldType =
+                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
+
+        final boolean isProctime = 
TypeCheckUtils.isProcTime(timeOrderFieldType);
+        final InternalTypeInfo<RowData> inputTypeInfo =
+                (InternalTypeInfo<RowData>) inputTransform.getOutputType();
+        final TypeSerializer<RowData> inputSerializer =
+                
inputTypeInfo.createSerializer(planner.getExecEnv().getConfig());
+        final NFACompiler.NFAFactory<RowData> nfaFactory =
+                NFACompiler.compileFactory(cepPattern, false);
+        final MatchCodeGenerator generator =
+                new MatchCodeGenerator(
+                        new CodeGeneratorContext(config),
+                        planner.getRelBuilder(),
+                        false, // nullableInput
+                        JavaScalaConversionUtil.toScala(cepPatternAndNames.f1),
+                        JavaScalaConversionUtil.toScala(Optional.empty()),
+                        CodeGenUtils.DEFAULT_COLLECTOR_TERM());
+        generator.bindInput(
+                inputRowType,
+                CodeGenUtils.DEFAULT_INPUT1_TERM(),
+                JavaScalaConversionUtil.toScala(Optional.empty()));
+        final PatternProcessFunctionRunner patternProcessFunction =
+                generator.generateOneRowPerMatchExpression(
+                        (RowType) getOutputType(), partitionKeys, 
matchSpec.getMeasures());
+        final CepOperator<RowData, RowData, RowData> operator =
+                new CepOperator<>(
+                        inputSerializer,
+                        isProctime,
+                        nfaFactory,
+                        eventComparator,
+                        cepPattern.getAfterMatchSkipStrategy(),
+                        patternProcessFunction,
+                        null);
+        final OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        timestampedInputTransform,
+                        getOperatorName(config),
+                        getOperatorDescription(config),
+                        operator,
+                        InternalTypeInfo.of(getOutputType()),
+                        timestampedInputTransform.getParallelism());
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(partitionKeys, 
inputTypeInfo);
+        transform.setStateKeySelector(selector);
+        transform.setStateKeyType(selector.getProducedType());
+
+        if (inputsContainSingleton()) {
+            transform.setParallelism(1);
+            transform.setMaxParallelism(1);
+        }
+        return transform;
+    }
+
+    private void checkOrderKeys(RowType inputRowType) {
+        SortSpec orderKeys = matchSpec.getOrderKeys();
+        if (orderKeys.getFieldSize() == 0) {
+            throw new TableException("You must specify either rowtime or 
proctime for order by.");
+        }
+
+        SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+        LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+        // need to identify time between others order fields. Time needs to be 
first sort element
+        if (!TypeCheckUtils.isRowTime(timeOrderFieldType)
+                && !TypeCheckUtils.isProcTime(timeOrderFieldType)) {
+            throw new TableException(
+                    "You must specify either rowtime or proctime for order by 
as the first one.");
+        }
+
+        // time ordering needs to be ascending
+        if (!orderKeys.getAscendingOrders()[0]) {
+            throw new TableException(
+                    "Primary sort order of a streaming table must be ascending 
on time.");
+        }
+    }
+
+    private EventComparator<RowData> createEventComparator(
+            TableConfig config, RowType inputRowType) {
+        SortSpec orderKeys = matchSpec.getOrderKeys();
+        if (orderKeys.getFieldIndices().length > 1) {
+            GeneratedRecordComparator rowComparator =
+                    ComparatorCodeGenerator.gen(
+                            config, "RowDataComparator", inputRowType, 
orderKeys);
+            return new RowDataEventComparator(rowComparator);
+        } else {
+            return null;
+        }
+    }
+
+    private Transformation<RowData> translateOrder(
+            Transformation<RowData> inputTransform, RowType inputRowType) {
+        SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
+        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+        LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+
+        if (TypeCheckUtils.isRowTime(timeOrderFieldType)) {

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to