checkstyle and rename package correct package from org.beam.dsls.sql to org.apache.beam.dsls.sql
update with checkstyle Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/529bc9d9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/529bc9d9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/529bc9d9 Branch: refs/heads/DSL_SQL Commit: 529bc9d9375ae162d70de274975c547a878053a3 Parents: f1c2b65 Author: mingmxu <[email protected]> Authored: Thu Apr 13 14:46:28 2017 -0700 Committer: mingmxu <[email protected]> Committed: Thu Apr 13 15:44:07 2017 -0700 ---------------------------------------------------------------------- dsls/pom.xml | 7 + .../beam/dsls/sql/example/BeamSqlExample.java | 103 ++++++++ .../beam/dsls/sql/example/package-info.java | 23 ++ .../interpreter/BeamSQLExpressionExecutor.java | 43 ++++ .../sql/interpreter/BeamSQLSpELExecutor.java | 127 +++++++++ .../dsls/sql/interpreter/CalciteToSpEL.java | 81 ++++++ .../beam/dsls/sql/interpreter/package-info.java | 22 ++ .../org/apache/beam/dsls/sql/package-info.java | 22 ++ .../dsls/sql/planner/BeamPipelineCreator.java | 86 +++++++ .../beam/dsls/sql/planner/BeamQueryPlanner.java | 159 ++++++++++++ .../dsls/sql/planner/BeamRelDataTypeSystem.java | 40 +++ .../beam/dsls/sql/planner/BeamRuleSets.java | 66 +++++ .../beam/dsls/sql/planner/BeamSQLRelUtils.java | 74 ++++++ .../beam/dsls/sql/planner/BeamSqlRunner.java | 94 +++++++ .../planner/BeamSqlUnsupportedException.java | 38 +++ .../planner/UnsupportedOperatorsVisitor.java | 28 ++ .../beam/dsls/sql/planner/package-info.java | 24 ++ .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 71 +++++ .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 76 ++++++ .../beam/dsls/sql/rel/BeamIOSourceRel.java | 60 +++++ .../dsls/sql/rel/BeamLogicalConvention.java | 72 ++++++ .../beam/dsls/sql/rel/BeamProjectRel.java | 83 ++++++ .../apache/beam/dsls/sql/rel/BeamRelNode.java | 38 +++ .../apache/beam/dsls/sql/rel/package-info.java | 23 ++ .../beam/dsls/sql/rule/BeamFilterRule.java | 49 ++++ .../beam/dsls/sql/rule/BeamIOSinkRule.java | 82 ++++++ .../beam/dsls/sql/rule/BeamIOSourceRule.java | 49 ++++ .../beam/dsls/sql/rule/BeamProjectRule.java | 50 ++++ .../apache/beam/dsls/sql/rule/package-info.java | 23 ++ .../beam/dsls/sql/schema/BaseBeamTable.java | 99 +++++++ .../apache/beam/dsls/sql/schema/BeamIOType.java | 28 ++ .../beam/dsls/sql/schema/BeamSQLRecordType.java | 74 ++++++ .../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 88 +++++++ .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 256 +++++++++++++++++++ .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 149 +++++++++++ .../dsls/sql/schema/InvalidFieldException.java | 34 +++ .../schema/UnsupportedDataTypeException.java | 32 +++ .../sql/schema/kafka/BeamKafkaCSVTable.java | 127 +++++++++ .../dsls/sql/schema/kafka/BeamKafkaTable.java | 111 ++++++++ .../dsls/sql/schema/kafka/package-info.java | 22 ++ .../beam/dsls/sql/schema/package-info.java | 23 ++ .../dsls/sql/transform/BeamSQLFilterFn.java | 66 +++++ .../sql/transform/BeamSQLOutputToConsoleFn.java | 45 ++++ .../dsls/sql/transform/BeamSQLProjectFn.java | 72 ++++++ .../beam/dsls/sql/transform/package-info.java | 22 ++ .../beam/dsls/sql/example/BeamSqlExample.java | 102 -------- .../org/beam/dsls/sql/example/package-info.java | 23 -- .../interpreter/BeamSQLExpressionExecutor.java | 43 ---- .../sql/interpreter/BeamSQLSpELExecutor.java | 126 --------- .../dsls/sql/interpreter/CalciteToSpEL.java | 80 ------ .../beam/dsls/sql/interpreter/package-info.java | 22 -- .../java/org/beam/dsls/sql/package-info.java | 22 -- .../dsls/sql/planner/BeamPipelineCreator.java | 85 ------ .../beam/dsls/sql/planner/BeamQueryPlanner.java | 157 ------------ .../dsls/sql/planner/BeamRelDataTypeSystem.java | 40 --- .../org/beam/dsls/sql/planner/BeamRuleSets.java | 65 ----- .../beam/dsls/sql/planner/BeamSQLRelUtils.java | 73 ------ .../beam/dsls/sql/planner/BeamSqlRunner.java | 93 ------- .../planner/BeamSqlUnsupportedException.java | 38 --- .../planner/UnsupportedOperatorsVisitor.java | 28 -- .../org/beam/dsls/sql/planner/package-info.java | 24 -- .../org/beam/dsls/sql/rel/BeamFilterRel.java | 71 ----- .../org/beam/dsls/sql/rel/BeamIOSinkRel.java | 75 ------ .../org/beam/dsls/sql/rel/BeamIOSourceRel.java | 59 ----- .../dsls/sql/rel/BeamLogicalConvention.java | 72 ------ .../org/beam/dsls/sql/rel/BeamProjectRel.java | 82 ------ .../java/org/beam/dsls/sql/rel/BeamRelNode.java | 38 --- .../org/beam/dsls/sql/rel/package-info.java | 23 -- .../org/beam/dsls/sql/rule/BeamFilterRule.java | 49 ---- .../org/beam/dsls/sql/rule/BeamIOSinkRule.java | 81 ------ .../beam/dsls/sql/rule/BeamIOSourceRule.java | 49 ---- .../org/beam/dsls/sql/rule/BeamProjectRule.java | 50 ---- .../org/beam/dsls/sql/rule/package-info.java | 22 -- .../org/beam/dsls/sql/schema/BaseBeamTable.java | 99 ------- .../org/beam/dsls/sql/schema/BeamIOType.java | 28 -- .../beam/dsls/sql/schema/BeamSQLRecordType.java | 74 ------ .../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 88 ------- .../org/beam/dsls/sql/schema/BeamSQLRow.java | 242 ------------------ .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 149 ----------- .../dsls/sql/schema/InvalidFieldException.java | 30 --- .../schema/UnsupportedDataTypeException.java | 28 -- .../sql/schema/kafka/BeamKafkaCSVTable.java | 127 --------- .../dsls/sql/schema/kafka/BeamKafkaTable.java | 111 -------- .../dsls/sql/schema/kafka/package-info.java | 22 -- .../org/beam/dsls/sql/schema/package-info.java | 23 -- .../dsls/sql/transform/BeamSQLFilterFn.java | 66 ----- .../sql/transform/BeamSQLOutputToConsoleFn.java | 45 ---- .../dsls/sql/transform/BeamSQLProjectFn.java | 72 ------ .../beam/dsls/sql/transform/package-info.java | 22 -- .../beam/dsls/sql/planner/BasePlanner.java | 74 ++++++ .../sql/planner/BeamPlannerExplainTest.java | 68 +++++ .../dsls/sql/planner/BeamPlannerSubmitTest.java | 43 ++++ .../dsls/sql/planner/MockedBeamSQLTable.java | 123 +++++++++ .../org/beam/dsls/sql/planner/BasePlanner.java | 74 ------ .../sql/planner/BeamPlannerExplainTest.java | 68 ----- .../dsls/sql/planner/BeamPlannerSubmitTest.java | 42 --- .../dsls/sql/planner/MockedBeamSQLTable.java | 123 --------- 97 files changed, 3269 insertions(+), 3225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/pom.xml b/dsls/pom.xml index a1bb0ee..6f9d635 100644 --- a/dsls/pom.xml +++ b/dsls/pom.xml @@ -52,6 +52,13 @@ </plugin> </plugins> </pluginManagement> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java new file mode 100644 index 0000000..d32bc59 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.example; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.dsls.sql.planner.BeamSqlRunner; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +/** + * This is one quick example. + * + * <p>Before start, follow https://kafka.apache.org/quickstart to setup a Kafka + * cluster locally, and run below commands to create required Kafka topics: + * <pre> + * <code> + * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ + * --partitions 1 --topic orders + * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ + * --partitions 1 --topic sub_orders + * </code> + * </pre> + * After run the application, produce several test records: + * <pre> + * <code> + * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders + * invalid,record + * 123445,0,100,3413423 + * 234123,3,232,3451231234 + * 234234,0,5,1234123 + * 345234,0,345234.345,3423 + * </code> + * </pre> + * Meanwhile, open another console to see the output: + * <pre> + * <code> + * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sub_orders + * **Expected : + * 123445,0,100.0 + * 345234,0,345234.345 + * </code> + * </pre> + */ +public class BeamSqlExample implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 3673487843555563904L; + + public static void main(String[] args) throws Exception { + BeamSqlRunner runner = new BeamSqlRunner(); + runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders")); + runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + + // case 2: insert into <table>(<fields>) select STREAM <fields> from + // <table> from <clause> + String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " + + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; + + runner.explainQuery(sql); + runner.submitQuery(sql); + } + + public static BaseBeamTable getTable(String bootstrapServer, String topic) { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + } + }; + + Map<String, Object> consumerPara = new HashMap<String, Object>(); + consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + + return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) + .updateConsumerProperties(consumerPara); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java new file mode 100644 index 0000000..52a9fce --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * examples on how to use BeamSQL. + * + */ +package org.apache.beam.dsls.sql.example; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java new file mode 100644 index 0000000..1285280 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; + +/** + * {@code BeamSQLExpressionExecutor} fills the gap between relational + * expressions in Calcite SQL and executable code. + * + */ +public interface BeamSQLExpressionExecutor extends Serializable { + + /** + * invoked before data processing. + */ + void prepare(); + + /** + * apply transformation to input record {@link BeamSQLRow}. + * + */ + List<Object> execute(BeamSQLRow inputRecord); + + void close(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java new file mode 100644 index 0000000..9c9c37f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import static com.google.common.base.Preconditions.checkArgument; +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.rel.BeamFilterRel; +import org.apache.beam.dsls.sql.rel.BeamProjectRel; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.SpelParserConfiguration; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; + +/** + * {@code BeamSQLSpELExecutor} is one implementation, to convert Calcite SQL + * relational expression to SpEL expression. + * + */ +public class BeamSQLSpELExecutor implements BeamSQLExpressionExecutor { + /** + * + */ + private static final long serialVersionUID = 6777232573390074408L; + + private List<String> spelString; + private List<Expression> spelExpressions; + + public BeamSQLSpELExecutor(BeamRelNode relNode) { + this.spelString = new ArrayList<>(); + if (relNode instanceof BeamFilterRel) { + String filterSpEL = CalciteToSpEL + .rexcall2SpEL((RexCall) ((BeamFilterRel) relNode).getCondition()); + spelString.add(filterSpEL); + } else if (relNode instanceof BeamProjectRel) { + spelString.addAll(createProjectExps((BeamProjectRel) relNode)); + // List<ProjectRule> projectRules = + // for (int idx = 0; idx < projectRules.size(); ++idx) { + // spelString.add(projectRules.get(idx).getProjectExp()); + // } + } else { + throw new BeamSqlUnsupportedException( + String.format("%s is not supported yet", relNode.getClass().toString())); + } + } + + @Override + public void prepare() { + this.spelExpressions = new ArrayList<>(); + + SpelParserConfiguration config = new SpelParserConfiguration(true, true); + ExpressionParser parser = new SpelExpressionParser(config); + for (String el : spelString) { + spelExpressions.add(parser.parseExpression(el)); + } + } + + @Override + public List<Object> execute(BeamSQLRow inputRecord) { + StandardEvaluationContext inContext = new StandardEvaluationContext(); + inContext.setVariable("in", inputRecord); + + List<Object> results = new ArrayList<>(); + for (Expression ep : spelExpressions) { + results.add(ep.getValue(inContext)); + } + return results; + } + + @Override + public void close() { + + } + + private List<String> createProjectExps(BeamProjectRel projectRel) { + List<String> rules = new ArrayList<>(); + + List<RexNode> exps = projectRel.getProjects(); + + for (int idx = 0; idx < exps.size(); ++idx) { + RexNode node = exps.get(idx); + if (node == null) { + rules.add("null"); + } + + if (node instanceof RexLiteral) { + rules.add(((RexLiteral) node).getValue() + ""); + } else { + if (node instanceof RexInputRef) { + rules.add("#in.getFieldValue(" + ((RexInputRef) node).getIndex() + ")"); + } + if (node instanceof RexCall) { + rules.add(CalciteToSpEL.rexcall2SpEL((RexCall) node)); + } + } + } + + checkArgument(rules.size() == exps.size(), "missing projects rules after conversion."); + + return rules; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java new file mode 100644 index 0000000..6cdc31b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import com.google.common.base.Joiner; +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; + +/** + * {@code CalciteToSpEL} is used in {@link BeamSQLSpELExecutor}, to convert a + * relational expression {@link RexCall} to SpEL expression. + * + */ +public class CalciteToSpEL { + + public static String rexcall2SpEL(RexCall cdn) { + List<String> parts = new ArrayList<>(); + for (RexNode subcdn : cdn.operands) { + if (subcdn instanceof RexCall) { + parts.add(rexcall2SpEL((RexCall) subcdn)); + } else { + parts.add(subcdn instanceof RexInputRef + ? "#in.getFieldValue(" + ((RexInputRef) subcdn).getIndex() + ")" : subcdn.toString()); + } + } + + String opName = cdn.op.getName(); + switch (cdn.op.getClass().getSimpleName()) { + case "SqlMonotonicBinaryOperator": // +-* + case "SqlBinaryOperator": // > < = >= <= <> OR AND || / . + switch (cdn.op.getName().toUpperCase()) { + case "AND": + return String.format(" ( %s ) ", Joiner.on("&&").join(parts)); + case "OR": + return String.format(" ( %s ) ", Joiner.on("||").join(parts)); + case "=": + return String.format(" ( %s ) ", Joiner.on("==").join(parts)); + case "<>": + return String.format(" ( %s ) ", Joiner.on("!=").join(parts)); + default: + return String.format(" ( %s ) ", Joiner.on(cdn.op.getName().toUpperCase()).join(parts)); + } + case "SqlCaseOperator": // CASE + return String.format(" (%s ? %s : %s)", parts.get(0), parts.get(1), parts.get(2)); + case "SqlCastFunction": // CAST + return parts.get(0); + case "SqlPostfixOperator": + switch (opName.toUpperCase()) { + case "IS NULL": + return String.format(" null == %s ", parts.get(0)); + case "IS NOT NULL": + return String.format(" null != %s ", parts.get(0)); + default: + throw new BeamSqlUnsupportedException(); + } + default: + throw new BeamSqlUnsupportedException(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java new file mode 100644 index 0000000..178d35f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * interpreter generate runnable 'code' to execute SQL relational expressions. + */ +package org.apache.beam.dsls.sql.interpreter; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java new file mode 100644 index 0000000..b26e8c4 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL provides a new interface to run a SQL statement with Beam. + */ +package org.apache.beam.dsls.sql; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java new file mode 100644 index 0000000..00274a2 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.util.Map; + +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam + * pipeline. + * + */ +public class BeamPipelineCreator { + private Map<String, BaseBeamTable> sourceTables; + private PCollection<BeamSQLRow> latestStream; + + private PipelineOptions options; + + private Pipeline pipeline; + + private boolean hasPersistent = false; + + public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables) { + this.sourceTables = sourceTables; + + options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() + .as(PipelineOptions.class); // FlinkPipelineOptions.class + options.setJobName("BeamPlanCreator"); + + pipeline = Pipeline.create(options); + CoderRegistry cr = pipeline.getCoderRegistry(); + cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); + cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); + } + + public PCollection<BeamSQLRow> getLatestStream() { + return latestStream; + } + + public void setLatestStream(PCollection<BeamSQLRow> latestStream) { + this.latestStream = latestStream; + } + + public Map<String, BaseBeamTable> getSourceTables() { + return sourceTables; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public boolean isHasPersistent() { + return hasPersistent; + } + + public void setHasPersistent(boolean hasPersistent) { + this.hasPersistent = hasPersistent; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java new file mode 100644 index 0000000..aac86d6 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The core component to handle through a SQL statement, to submit a Beam + * pipeline. + * + */ +public class BeamQueryPlanner { + private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); + + protected final Planner planner; + private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); + + public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT); + + public BeamQueryPlanner(SchemaPlus schema) { + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, + Collections.<String>emptyList(), TYPE_FACTORY)); + + FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) + .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) + .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build(); + this.planner = Frameworks.getPlanner(config); + + for (String t : schema.getTableNames()) { + sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); + } + } + + /** + * With a Beam pipeline generated in {@link #compileBeamPipeline(String)}, + * submit it to run and wait until finish. + * + */ + public void submitToRun(String sqlStatement) throws Exception { + Pipeline pipeline = compileBeamPipeline(sqlStatement); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + } + + /** + * With the @{@link BeamRelNode} tree generated in + * {@link #convertToBeamRel(String)}, a Beam pipeline is generated. + * + */ + public Pipeline compileBeamPipeline(String sqlStatement) throws Exception { + BeamRelNode relNode = convertToBeamRel(sqlStatement); + + BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables); + return relNode.buildBeamPipeline(planCreator); + } + + /** + * It parses and validate the input query, then convert into a + * {@link BeamRelNode} tree. + * + */ + public BeamRelNode convertToBeamRel(String sqlStatement) + throws ValidationException, RelConversionException, SqlParseException { + return (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); + } + + private RelNode validateAndConvert(SqlNode sqlNode) + throws ValidationException, RelConversionException { + SqlNode validated = validateNode(sqlNode); + LOG.info("SQL:\n" + validated); + RelNode relNode = convertToRelNode(validated); + return convertToBeamRel(relNode); + } + + private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException { + RelTraitSet traitSet = relNode.getTraitSet(); + + LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode)); + + // PlannerImpl.transform() optimizes RelNode with ruleset + return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode); + } + + private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException { + return planner.rel(sqlNode).rel; + } + + private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { + SqlNode validatedSqlNode = planner.validate(sqlNode); + validatedSqlNode.accept(new UnsupportedOperatorsVisitor()); + return validatedSqlNode; + } + + public Map<String, BaseBeamTable> getSourceTables() { + return sourceTables; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java new file mode 100644 index 0000000..c89a740 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + +/** + * customized data type in Beam. + * + */ +public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl { + public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem(); + + @Override + public int getMaxNumericScale() { + return 38; + } + + @Override + public int getMaxNumericPrecision() { + return 38; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java new file mode 100644 index 0000000..2af31dc --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.Iterator; + +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.rule.BeamFilterRule; +import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; +import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; +import org.apache.beam.dsls.sql.rule.BeamProjectRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.tools.RuleSet; + +/** + * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard + * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode} + * + */ +public class BeamRuleSets { + private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet + .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, + BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE) + .build(); + + public static RuleSet[] getRuleSets() { + return new RuleSet[] { new BeamRuleSet( + ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) }; + } + + private static class BeamRuleSet implements RuleSet { + final ImmutableSet<RelOptRule> rules; + + public BeamRuleSet(ImmutableSet<RelOptRule> rules) { + this.rules = rules; + } + + public BeamRuleSet(ImmutableList<RelOptRule> rules) { + this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); + } + + @Override + public Iterator<RelOptRule> iterator() { + return rules.iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java new file mode 100644 index 0000000..5e5f215 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlExplainLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for {@code BeamRelNode}. + */ +public class BeamSQLRelUtils { + private static final Logger LOG = LoggerFactory.getLogger(BeamSQLRelUtils.class); + + private static final AtomicInteger sequence = new AtomicInteger(0); + private static final AtomicInteger classSequence = new AtomicInteger(0); + + public static String getStageName(BeamRelNode relNode) { + return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + + sequence.getAndIncrement(); + } + + public static String getClassName(BeamRelNode relNode) { + return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + + "_" + classSequence.getAndIncrement(); + } + + public static BeamRelNode getBeamRelInput(RelNode input) { + if (input instanceof RelSubset) { + // go with known best input + input = ((RelSubset) input).getBest(); + } + return (BeamRelNode) input; + } + + public static String explain(final RelNode rel) { + return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + } + + public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { + String explain = ""; + try { + explain = RelOptUtil.toString(rel); + } catch (StackOverflowError e) { + LOG.error("StackOverflowError occurred while extracting plan. " + + "Please report it to the dev@ mailing list."); + LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); + LOG.error("Forcing plan to empty string and continue... " + + "SQL Runner may not working properly after."); + } + return explain; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java new file mode 100644 index 0000000..e457e80 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.io.Serializable; + +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interface to explain, submit a SQL query. + * + */ +public class BeamSqlRunner implements Serializable { + /** + * + */ + private static final long serialVersionUID = -4708693435115005182L; + + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class); + + private SchemaPlus schema = Frameworks.createRootSchema(true); + + private BeamQueryPlanner planner = new BeamQueryPlanner(schema); + + /** + * Add a schema. + * + */ + public void addSchema(String schemaName, Schema scheme) { + schema.add(schemaName, schema); + } + + /** + * add a {@link BaseBeamTable} to schema repository. + * + */ + public void addTable(String tableName, BaseBeamTable table) { + schema.add(tableName, table); + planner.getSourceTables().put(tableName, table); + } + + /** + * submit as a Beam pipeline. + * + */ + public void submitQuery(String sqlString) throws Exception { + planner.submitToRun(sqlString); + planner.planner.close(); + } + + /** + * explain and display the execution plan. + * + */ + public String explainQuery(String sqlString) + throws ValidationException, RelConversionException, SqlParseException { + BeamRelNode exeTree = planner.convertToBeamRel(sqlString); + String beamPlan = RelOptUtil.toString(exeTree); + System.out.println(String.format("beamPlan>\n%s", beamPlan)); + + planner.planner.close(); + return beamPlan; + } + + protected BeamQueryPlanner getPlanner() { + return planner; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java new file mode 100644 index 0000000..7cb5243 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +/** + * Generic exception for un-supported operations. + * + */ +public class BeamSqlUnsupportedException extends RuntimeException { + /** + * + */ + private static final long serialVersionUID = 3445015747629217342L; + + public BeamSqlUnsupportedException(String string) { + super(string); + } + + public BeamSqlUnsupportedException() { + super(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java new file mode 100644 index 0000000..9dfa21d --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import org.apache.calcite.sql.util.SqlShuttle; + +/** + * Unsupported operation to visit a RelNode. + * + */ +public class UnsupportedOperatorsVisitor extends SqlShuttle { + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java new file mode 100644 index 0000000..0506c5b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface. + * It defines data sources, validate a SQL statement, and convert it as a Beam + * pipeline. + */ +package org.apache.beam.dsls.sql.planner; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java new file mode 100644 index 0000000..10dd1be --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code Filter} node. + * + */ +public class BeamFilterRel extends Filter implements BeamRelNode { + + public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new BeamFilterRel(getCluster(), traitSet, input, condition); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + + RelNode input = getInput(); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); + + BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); + + PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, + ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); + + planCreator.setLatestStream(projectStream); + + return planCreator.getPipeline(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java new file mode 100644 index 0000000..cad0b3c --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import com.google.common.base.Joiner; +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code TableModify} node. + * + */ +public class BeamIOSinkRel extends TableModify implements BeamRelNode { + public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, + Prepare.CatalogReader catalogReader, RelNode child, Operation operation, + List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, + sourceExpressionList, flattened); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), + getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + + RelNode input = getInput(); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); + + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); + + BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName); + + upstream.apply(stageName, targetTable.buildIOWriter()); + + planCreator.setHasPersistent(true); + + return planCreator.getPipeline(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java new file mode 100644 index 0000000..6b1b6cd --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import com.google.common.base.Joiner; + +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.TableScan; + +/** + * BeamRelNode to replace a {@code TableScan} node. + * + */ +public class BeamIOSourceRel extends TableScan implements BeamRelNode { + + public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", ""); + + BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName, + sourceTable.buildIOReader()); + + planCreator.setLatestStream(sourceStream); + + return planCreator.getPipeline(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java new file mode 100644 index 0000000..704a374 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; + +/** + * Convertion for Beam SQL. + * + */ +public enum BeamLogicalConvention implements Convention { + INSTANCE; + + @Override + public Class getInterface() { + return BeamRelNode.class; + } + + @Override + public String getName() { + return "BEAM_LOGICAL"; + } + + @Override + public RelTraitDef getTraitDef() { + return ConventionTraitDef.INSTANCE; + } + + @Override + public boolean satisfies(RelTrait trait) { + return this == trait; + } + + @Override + public void register(RelOptPlanner planner) { + } + + @Override + public String toString() { + return getName(); + } + + @Override + public boolean canConvertConvention(Convention toConvention) { + return false; + } + + @Override + public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java new file mode 100644 index 0000000..dd731f8 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code Project} node. + * + */ +public class BeamProjectRel extends Project implements BeamRelNode { + + /** + * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}. + * + */ + public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, + List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, + RelDataType rowType) { + return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + RelNode input = getInput(); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); + + BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); + + PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo + .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); + + planCreator.setLatestStream(projectStream); + + return planCreator.getPipeline(); + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java new file mode 100644 index 0000000..e50d71a --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.sdk.Pipeline; +import org.apache.calcite.rel.RelNode; + +/** + * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's + * called by {@link BeamPipelineCreator}. + * + */ +public interface BeamRelNode extends RelNode { + + /** + * A {@link BeamRelNode} is a recursive structure, the + * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search) + * algorithm. + * + */ + Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java new file mode 100644 index 0000000..77d6204 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. + * + */ +package org.apache.beam.dsls.sql.rel; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java new file mode 100644 index 0000000..414b666 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamFilterRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; + +/** + * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. + * + */ +public class BeamFilterRule extends ConverterRule { + public static final BeamFilterRule INSTANCE = new BeamFilterRule(); + + private BeamFilterRule() { + super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Filter filter = (Filter) rel; + final RelNode input = filter.getInput(); + + return new BeamFilterRel(filter.getCluster(), + filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + filter.getCondition()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java new file mode 100644 index 0000000..4cc4ef5 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import java.util.List; + +import org.apache.beam.dsls.sql.rel.BeamIOSinkRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; + +/** + * A {@code ConverterRule} to replace {@link TableModify} with + * {@link BeamIOSinkRel}. + * + */ +public class BeamIOSinkRule extends ConverterRule { + public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); + + private BeamIOSinkRule() { + super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSinkRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableModify tableModify = (TableModify) rel; + final RelNode input = tableModify.getInput(); + + final RelOptCluster cluster = tableModify.getCluster(); + final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); + final RelOptTable relOptTable = tableModify.getTable(); + final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); + final RelNode convertedInput = convert(input, + input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); + final TableModify.Operation operation = tableModify.getOperation(); + final List<String> updateColumnList = tableModify.getUpdateColumnList(); + final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); + final boolean flattened = tableModify.isFlattened(); + + final Table table = tableModify.getTable().unwrap(Table.class); + + switch (table.getJdbcTableType()) { + case TABLE: + case STREAM: + if (operation != TableModify.Operation.INSERT) { + throw new UnsupportedOperationException( + String.format("Streams doesn't support %s modify operation", operation)); + } + return new BeamIOSinkRel(cluster, traitSet, + relOptTable, catalogReader, convertedInput, operation, updateColumnList, + sourceExpressionList, flattened); + default: + throw new IllegalArgumentException( + String.format("Unsupported table type: %s", table.getJdbcTableType())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java new file mode 100644 index 0000000..85a69ff --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamIOSourceRel; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; + +/** + * A {@code ConverterRule} to replace {@link TableScan} with + * {@link BeamIOSourceRel}. + * + */ +public class BeamIOSourceRule extends ConverterRule { + public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); + + private BeamIOSourceRule() { + super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSourceRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableScan scan = (TableScan) rel; + + return new BeamIOSourceRel(scan.getCluster(), + scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java new file mode 100644 index 0000000..6dc3b57 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamProjectRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; + +/** + * A {@code ConverterRule} to replace {@link Project} with + * {@link BeamProjectRel}. + * + */ +public class BeamProjectRule extends ConverterRule { + public static final BeamProjectRule INSTANCE = new BeamProjectRule(); + + private BeamProjectRule() { + super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Project project = (Project) rel; + final RelNode input = project.getInput(); + + return new BeamProjectRel(project.getCluster(), + project.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + project.getProjects(), project.getRowType()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java new file mode 100644 index 0000000..5d32647 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.calcite.plan.RelOptRule} to generate + * {@link org.apache.beam.dsls.sql.rel.BeamRelNode}. + */ +package org.apache.beam.dsls.sql.rule; http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java new file mode 100644 index 0000000..81829e9 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema.TableType; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; + +/** + * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. + */ +public abstract class BaseBeamTable implements ScannableTable, Serializable { + + /** + * + */ + private static final long serialVersionUID = -1262988061830914193L; + private RelDataType relDataType; + + protected BeamSQLRecordType beamSqlRecordType; + + public BaseBeamTable(RelProtoDataType protoRowType) { + this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY); + this.beamSqlRecordType = BeamSQLRecordType.from(relDataType); + } + + /** + * In Beam SQL, there's no difference between a batch query and a streaming + * query. {@link BeamIOType} is used to validate the sources. + */ + public abstract BeamIOType getSourceType(); + + /** + * create a {@code IO.read()} instance to read from source. + * + */ + public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader(); + + /** + * create a {@code IO.write()} instance to write to target. + * + */ + public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter(); + + @Override + public Enumerable<Object[]> scan(DataContext root) { + // not used as Beam SQL uses its own execution engine + return null; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return relDataType; + } + + /** + * Not used {@link Statistic} to optimize the plan. + */ + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + /** + * all sources are treated as TABLE in Beam SQL. + */ + @Override + public TableType getJdbcTableType() { + return TableType.TABLE; + } + +}
