This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e3248d8  [FLINK-11790][table-planner-blink] Introduce FlinkRelNode 
interface and FlinkConventions
e3248d8 is described below

commit e3248d844c728c714857c5d69520f08ddf4e4c85
Author: godfreyhe <[email protected]>
AuthorDate: Sat Mar 2 13:35:36 2019 +0800

    [FLINK-11790][table-planner-blink] Introduce FlinkRelNode interface and 
FlinkConventions
    
    This closes #7878
---
 .../flink/table/plan/nodes/FlinkConventions.scala  | 55 ++++++++++++++++++++++
 .../flink/table/plan/nodes/FlinkRelNode.scala      | 28 +++++++++++
 .../table/plan/nodes/logical/FlinkLogicalRel.scala | 28 +++++++++++
 .../plan/nodes/physical/FlinkPhysicalRel.scala     | 42 +++++++++++++++++
 .../nodes/physical/batch/BatchPhysicalRel.scala    | 28 +++++++++++
 .../nodes/physical/stream/StreamPhysicalRel.scala  | 51 ++++++++++++++++++++
 6 files changed, 232 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala
new file mode 100644
index 0000000..2ef58c6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalRel
+import org.apache.flink.table.plan.nodes.physical.batch.BatchPhysicalRel
+import org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel
+
+import org.apache.calcite.plan.{Convention, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+/**
+  * Override the default convention implementation to support using 
AbstractConverter for conversion
+  */
+class FlinkConvention(name: String, relClass: Class[_ <: RelNode])
+  extends Convention.Impl(name, relClass) {
+
+  override def useAbstractConvertersForConversion(
+      fromTraits: RelTraitSet,
+      toTraits: RelTraitSet): Boolean = {
+    if (relClass == classOf[StreamPhysicalRel]) {
+      // stream
+      !fromTraits.satisfies(toTraits) &&
+        fromTraits.containsIfApplicable(FlinkConventions.STREAM_PHYSICAL) &&
+        toTraits.containsIfApplicable(FlinkConventions.STREAM_PHYSICAL)
+    } else {
+      // batch
+      !fromTraits.satisfies(toTraits) &&
+        fromTraits.containsIfApplicable(FlinkConventions.BATCH_PHYSICAL) &&
+        toTraits.containsIfApplicable(FlinkConventions.BATCH_PHYSICAL)
+    }
+  }
+}
+
+object FlinkConventions {
+  val LOGICAL = new Convention.Impl("LOGICAL", classOf[FlinkLogicalRel])
+  val STREAM_PHYSICAL = new FlinkConvention("STREAM_PHYSICAL", 
classOf[StreamPhysicalRel])
+  val BATCH_PHYSICAL = new FlinkConvention("BATCH_PHYSICAL", 
classOf[BatchPhysicalRel])
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
new file mode 100644
index 0000000..48b0bc6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.RelNode
+
+/**
+  * Base class for flink relational expression.
+  */
+trait FlinkRelNode extends RelNode {
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRel.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRel.scala
new file mode 100644
index 0000000..5c17535
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRel.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkRelNode
+
+/**
+  * Base class for flink logical relational expression.
+  */
+trait FlinkLogicalRel extends FlinkRelNode {
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/FlinkPhysicalRel.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/FlinkPhysicalRel.scala
new file mode 100644
index 0000000..cd85e7b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/FlinkPhysicalRel.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical
+
+import org.apache.flink.table.plan.nodes.FlinkRelNode
+
+import org.apache.calcite.plan.RelTraitSet
+import org.apache.calcite.rel.RelNode
+
+/**
+  * Base class for flink physical relational expression.
+  */
+trait FlinkPhysicalRel extends FlinkRelNode {
+
+  /**
+    * Try to satisfy required traits by descendant of current node. If 
descendant can satisfy
+    * required traits, and current node will not destroy it, then returns the 
new node with
+    * converted inputs.
+    *
+    * @param requiredTraitSet required traits
+    * @return A converted node which satisfy required traits by inputs node of 
current node.
+    *         Returns null if required traits cannot be pushed down into 
inputs.
+    */
+  def satisfyTraitsByInput(requiredTraitSet: RelTraitSet): RelNode = null
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchPhysicalRel.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchPhysicalRel.scala
new file mode 100644
index 0000000..a71d1bb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchPhysicalRel.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.batch
+
+import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
+
+/**
+  * Base class for batch physical relational expression.
+  */
+trait BatchPhysicalRel extends FlinkPhysicalRel {
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala
new file mode 100644
index 0000000..b402d1f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
+
+import org.apache.calcite.rel.RelNode
+
+/**
+  * Base class for stream physical relational expression.
+  */
+trait StreamPhysicalRel extends FlinkPhysicalRel {
+
+  /**
+    * Whether the [[StreamPhysicalRel]] produces update and delete changes.
+    */
+  def producesUpdates: Boolean = false
+
+  /**
+    * Whether the [[StreamPhysicalRel]] requires retraction messages or not.
+    */
+  def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  /**
+    * Whether the [[StreamPhysicalRel]] consumes retraction messages instead 
of forwarding them.
+    * The node might or might not produce new retraction messages.
+    */
+  def consumesRetractions: Boolean = false
+
+  /**
+    * Whether the [[StreamPhysicalRel]] produces retraction messages.
+    */
+  def producesRetractions: Boolean = false
+
+}

Reply via email to