This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e3248d8 [FLINK-11790][table-planner-blink] Introduce FlinkRelNode
interface and FlinkConventions
e3248d8 is described below
commit e3248d844c728c714857c5d69520f08ddf4e4c85
Author: godfreyhe <[email protected]>
AuthorDate: Sat Mar 2 13:35:36 2019 +0800
[FLINK-11790][table-planner-blink] Introduce FlinkRelNode interface and
FlinkConventions
This closes #7878
---
.../flink/table/plan/nodes/FlinkConventions.scala | 55 ++++++++++++++++++++++
.../flink/table/plan/nodes/FlinkRelNode.scala | 28 +++++++++++
.../table/plan/nodes/logical/FlinkLogicalRel.scala | 28 +++++++++++
.../plan/nodes/physical/FlinkPhysicalRel.scala | 42 +++++++++++++++++
.../nodes/physical/batch/BatchPhysicalRel.scala | 28 +++++++++++
.../nodes/physical/stream/StreamPhysicalRel.scala | 51 ++++++++++++++++++++
6 files changed, 232 insertions(+)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala
new file mode 100644
index 0000000..2ef58c6
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalRel
+import org.apache.flink.table.plan.nodes.physical.batch.BatchPhysicalRel
+import org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel
+
+import org.apache.calcite.plan.{Convention, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+/**
+ * Override the default convention implementation to support using
AbstractConverter for conversion
+ */
+class FlinkConvention(name: String, relClass: Class[_ <: RelNode])
+ extends Convention.Impl(name, relClass) {
+
+ override def useAbstractConvertersForConversion(
+ fromTraits: RelTraitSet,
+ toTraits: RelTraitSet): Boolean = {
+ if (relClass == classOf[StreamPhysicalRel]) {
+ // stream
+ !fromTraits.satisfies(toTraits) &&
+ fromTraits.containsIfApplicable(FlinkConventions.STREAM_PHYSICAL) &&
+ toTraits.containsIfApplicable(FlinkConventions.STREAM_PHYSICAL)
+ } else {
+ // batch
+ !fromTraits.satisfies(toTraits) &&
+ fromTraits.containsIfApplicable(FlinkConventions.BATCH_PHYSICAL) &&
+ toTraits.containsIfApplicable(FlinkConventions.BATCH_PHYSICAL)
+ }
+ }
+}
+
+object FlinkConventions {
+ val LOGICAL = new Convention.Impl("LOGICAL", classOf[FlinkLogicalRel])
+ val STREAM_PHYSICAL = new FlinkConvention("STREAM_PHYSICAL",
classOf[StreamPhysicalRel])
+ val BATCH_PHYSICAL = new FlinkConvention("BATCH_PHYSICAL",
classOf[BatchPhysicalRel])
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
new file mode 100644
index 0000000..48b0bc6
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.RelNode
+
+/**
+ * Base class for flink relational expression.
+ */
+trait FlinkRelNode extends RelNode {
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRel.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRel.scala
new file mode 100644
index 0000000..5c17535
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRel.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.FlinkRelNode
+
+/**
+ * Base class for flink logical relational expression.
+ */
+trait FlinkLogicalRel extends FlinkRelNode {
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/FlinkPhysicalRel.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/FlinkPhysicalRel.scala
new file mode 100644
index 0000000..cd85e7b
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/FlinkPhysicalRel.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical
+
+import org.apache.flink.table.plan.nodes.FlinkRelNode
+
+import org.apache.calcite.plan.RelTraitSet
+import org.apache.calcite.rel.RelNode
+
+/**
+ * Base class for flink physical relational expression.
+ */
+trait FlinkPhysicalRel extends FlinkRelNode {
+
+ /**
+ * Try to satisfy required traits by descendant of current node. If
descendant can satisfy
+ * required traits, and current node will not destroy it, then returns the
new node with
+ * converted inputs.
+ *
+ * @param requiredTraitSet required traits
+ * @return A converted node which satisfy required traits by inputs node of
current node.
+ * Returns null if required traits cannot be pushed down into
inputs.
+ */
+ def satisfyTraitsByInput(requiredTraitSet: RelTraitSet): RelNode = null
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchPhysicalRel.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchPhysicalRel.scala
new file mode 100644
index 0000000..a71d1bb
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchPhysicalRel.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.batch
+
+import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
+
+/**
+ * Base class for batch physical relational expression.
+ */
+trait BatchPhysicalRel extends FlinkPhysicalRel {
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala
new file mode 100644
index 0000000..b402d1f
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
+
+import org.apache.calcite.rel.RelNode
+
+/**
+ * Base class for stream physical relational expression.
+ */
+trait StreamPhysicalRel extends FlinkPhysicalRel {
+
+ /**
+ * Whether the [[StreamPhysicalRel]] produces update and delete changes.
+ */
+ def producesUpdates: Boolean = false
+
+ /**
+ * Whether the [[StreamPhysicalRel]] requires retraction messages or not.
+ */
+ def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+ /**
+ * Whether the [[StreamPhysicalRel]] consumes retraction messages instead
of forwarding them.
+ * The node might or might not produce new retraction messages.
+ */
+ def consumesRetractions: Boolean = false
+
+ /**
+ * Whether the [[StreamPhysicalRel]] produces retraction messages.
+ */
+ def producesRetractions: Boolean = false
+
+}