[BEAM-301] Initial skeleton for Beam SQL
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7867ce62 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7867ce62 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7867ce62 Branch: refs/heads/DSL_SQL Commit: 7867ce62e43bef7bfd8011c605379df05494dfcf Parents: 3625dbd Author: mingmxu <[email protected]> Authored: Sun Apr 9 19:49:08 2017 -0700 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Wed Apr 12 21:59:46 2017 +0200 ---------------------------------------------------------------------- dsls/pom.xml | 5 +- dsls/sql/README.md | 24 ++ dsls/sql/pom.xml | 166 +++++++++++++ .../beam/dsls/sql/example/BeamSqlExample.java | 102 ++++++++ .../org/beam/dsls/sql/example/package-info.java | 23 ++ .../interpreter/BeamSQLExpressionExecutor.java | 43 ++++ .../sql/interpreter/BeamSQLSpELExecutor.java | 126 ++++++++++ .../dsls/sql/interpreter/CalciteToSpEL.java | 80 ++++++ .../beam/dsls/sql/interpreter/package-info.java | 22 ++ .../java/org/beam/dsls/sql/package-info.java | 22 ++ .../dsls/sql/planner/BeamPipelineCreator.java | 85 +++++++ .../beam/dsls/sql/planner/BeamQueryPlanner.java | 157 ++++++++++++ .../dsls/sql/planner/BeamRelDataTypeSystem.java | 40 +++ .../org/beam/dsls/sql/planner/BeamRuleSets.java | 65 +++++ .../beam/dsls/sql/planner/BeamSQLRelUtils.java | 73 ++++++ .../beam/dsls/sql/planner/BeamSqlRunner.java | 93 +++++++ .../planner/BeamSqlUnsupportedException.java | 38 +++ .../planner/UnsupportedOperatorsVisitor.java | 28 +++ .../org/beam/dsls/sql/planner/package-info.java | 24 ++ .../org/beam/dsls/sql/rel/BeamFilterRel.java | 71 ++++++ .../org/beam/dsls/sql/rel/BeamIOSinkRel.java | 75 ++++++ .../org/beam/dsls/sql/rel/BeamIOSourceRel.java | 59 +++++ .../dsls/sql/rel/BeamLogicalConvention.java | 72 ++++++ .../org/beam/dsls/sql/rel/BeamProjectRel.java | 82 +++++++ .../java/org/beam/dsls/sql/rel/BeamRelNode.java | 38 +++ .../org/beam/dsls/sql/rel/package-info.java | 23 ++ .../org/beam/dsls/sql/rule/BeamFilterRule.java | 49 ++++ .../org/beam/dsls/sql/rule/BeamIOSinkRule.java | 81 +++++++ .../beam/dsls/sql/rule/BeamIOSourceRule.java | 49 ++++ .../org/beam/dsls/sql/rule/BeamProjectRule.java | 50 ++++ .../org/beam/dsls/sql/rule/package-info.java | 22 ++ .../org/beam/dsls/sql/schema/BaseBeamTable.java | 99 ++++++++ .../org/beam/dsls/sql/schema/BeamIOType.java | 28 +++ .../beam/dsls/sql/schema/BeamSQLRecordType.java | 74 ++++++ .../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 88 +++++++ .../org/beam/dsls/sql/schema/BeamSQLRow.java | 242 +++++++++++++++++++ .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 149 ++++++++++++ .../dsls/sql/schema/InvalidFieldException.java | 30 +++ .../schema/UnsupportedDataTypeException.java | 28 +++ .../sql/schema/kafka/BeamKafkaCSVTable.java | 127 ++++++++++ .../dsls/sql/schema/kafka/BeamKafkaTable.java | 111 +++++++++ .../dsls/sql/schema/kafka/package-info.java | 22 ++ .../org/beam/dsls/sql/schema/package-info.java | 23 ++ .../dsls/sql/transform/BeamSQLFilterFn.java | 66 +++++ .../sql/transform/BeamSQLOutputToConsoleFn.java | 45 ++++ .../dsls/sql/transform/BeamSQLProjectFn.java | 72 ++++++ .../beam/dsls/sql/transform/package-info.java | 22 ++ dsls/sql/src/main/resources/log4j.properties | 23 ++ .../org/beam/dsls/sql/planner/BasePlanner.java | 74 ++++++ .../sql/planner/BeamPlannerExplainTest.java | 68 ++++++ .../dsls/sql/planner/BeamPlannerSubmitTest.java | 42 ++++ .../dsls/sql/planner/MockedBeamSQLTable.java | 123 ++++++++++ 52 files changed, 3441 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/pom.xml b/dsls/pom.xml index 6e00171..a1bb0ee 100644 --- a/dsls/pom.xml +++ b/dsls/pom.xml @@ -27,10 +27,11 @@ </parent> <artifactId>beam-dsls-parent</artifactId> + <packaging>pom</packaging> <name>Apache Beam :: DSLs</name> <modules> - <!-- <module>sql</module> --> + <module>sql</module> </modules> <build> @@ -53,4 +54,4 @@ </pluginManagement> </build> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/README.md ---------------------------------------------------------------------- diff --git a/dsls/sql/README.md b/dsls/sql/README.md new file mode 100644 index 0000000..ae9e0f3 --- /dev/null +++ b/dsls/sql/README.md @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Beam SQL + +Beam SQL provides a new interface, to execute a SQL query as a Beam pipeline. + +*It's working in progress...* http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml new file mode 100644 index 0000000..21c8def --- /dev/null +++ b/dsls/sql/pom.xml @@ -0,0 +1,166 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-dsls-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + </parent> + + <artifactId>beam-dsls-sql</artifactId> + <name>Apache Beam :: DSLs :: SQL</name> + <description>Beam SQL provides a new interface to generate a Beam pipeline from SQL statement</description> + + <packaging>jar</packaging> + + <properties> + <timestamp>${maven.build.timestamp}</timestamp> + <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format> + <calcite-version>1.11.0</calcite-version> + </properties> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <!-- Set testSourceDirectory in order to exclude generated-test-sources --> + <testSourceDirectory>${project.basedir}/src/test/</testSourceDirectory> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <argLine>-da</argLine> <!-- disable assert in Calcite converter validation --> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>bundle-and-repackage</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>true</shadeTestJar> + <artifactSet> + <includes> + <include>com.google.guava:guava</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>${calcite-version}</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-kafka</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-expression</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-linq4j</artifactId> + <version>${calcite-version}</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java new file mode 100644 index 0000000..7fb8def --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.example; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.beam.dsls.sql.planner.BeamSqlRunner; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; + +/** + * This is one quick example. + * <p>Before start, follow https://kafka.apache.org/quickstart to setup a Kafka + * cluster locally, and run below commands to create required Kafka topics: + * <pre> + * <code> + * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ + * --partitions 1 --topic orders + * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ + * --partitions 1 --topic sub_orders + * </code> + * </pre> + * After run the application, produce several test records: + * <pre> + * <code> + * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders + * invalid,record + * 123445,0,100,3413423 + * 234123,3,232,3451231234 + * 234234,0,5,1234123 + * 345234,0,345234.345,3423 + * </code> + * </pre> + * Meanwhile, open another console to see the output: + * <pre> + * <code> + * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sub_orders + * **Expected : + * 123445,0,100.0 + * 345234,0,345234.345 + * </code> + * </pre> + */ +public class BeamSqlExample implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 3673487843555563904L; + + public static void main(String[] args) throws Exception { + BeamSqlRunner runner = new BeamSqlRunner(); + runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders")); + runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + + // case 2: insert into <table>(<fields>) select STREAM <fields> from + // <table> from <clause> + String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " + + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; + + runner.explainQuery(sql); + runner.submitQuery(sql); + } + + public static BaseBeamTable getTable(String bootstrapServer, String topic) { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + } + }; + + Map<String, Object> consumerPara = new HashMap<String, Object>(); + consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + + return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) + .updateConsumerProperties(consumerPara); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java new file mode 100644 index 0000000..ae678e4 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * examples on how to use BeamSQL. + * + */ +package org.beam.dsls.sql.example; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java new file mode 100644 index 0000000..e9d425d --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.interpreter; + +import java.io.Serializable; +import java.util.List; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * {@code BeamSQLExpressionExecutor} fills the gap between relational + * expressions in Calcite SQL and executable code. + * + */ +public interface BeamSQLExpressionExecutor extends Serializable { + + /** + * invoked before data processing. + */ + void prepare(); + + /** + * apply transformation to input record {@link BeamSQLRow}. + * + */ + List<Object> execute(BeamSQLRow inputRecord); + + void close(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java new file mode 100644 index 0000000..48306da --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.interpreter; + +import static com.google.common.base.Preconditions.checkArgument; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.beam.dsls.sql.planner.BeamSqlUnsupportedException; +import org.beam.dsls.sql.rel.BeamFilterRel; +import org.beam.dsls.sql.rel.BeamProjectRel; +import org.beam.dsls.sql.rel.BeamRelNode; +import org.beam.dsls.sql.schema.BeamSQLRow; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.SpelParserConfiguration; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; + +/** + * {@code BeamSQLSpELExecutor} is one implementation, to convert Calcite SQL + * relational expression to SpEL expression. + * + */ +public class BeamSQLSpELExecutor implements BeamSQLExpressionExecutor { + /** + * + */ + private static final long serialVersionUID = 6777232573390074408L; + + private List<String> spelString; + private List<Expression> spelExpressions; + + public BeamSQLSpELExecutor(BeamRelNode relNode) { + this.spelString = new ArrayList<>(); + if (relNode instanceof BeamFilterRel) { + String filterSpEL = CalciteToSpEL + .rexcall2SpEL((RexCall) ((BeamFilterRel) relNode).getCondition()); + spelString.add(filterSpEL); + } else if (relNode instanceof BeamProjectRel) { + spelString.addAll(createProjectExps((BeamProjectRel) relNode)); + // List<ProjectRule> projectRules = + // for (int idx = 0; idx < projectRules.size(); ++idx) { + // spelString.add(projectRules.get(idx).getProjectExp()); + // } + } else { + throw new BeamSqlUnsupportedException( + String.format("%s is not supported yet", relNode.getClass().toString())); + } + } + + @Override + public void prepare() { + this.spelExpressions = new ArrayList<>(); + + SpelParserConfiguration config = new SpelParserConfiguration(true, true); + ExpressionParser parser = new SpelExpressionParser(config); + for (String el : spelString) { + spelExpressions.add(parser.parseExpression(el)); + } + } + + @Override + public List<Object> execute(BeamSQLRow inputRecord) { + StandardEvaluationContext inContext = new StandardEvaluationContext(); + inContext.setVariable("in", inputRecord); + + List<Object> results = new ArrayList<>(); + for (Expression ep : spelExpressions) { + results.add(ep.getValue(inContext)); + } + return results; + } + + @Override + public void close() { + + } + + private List<String> createProjectExps(BeamProjectRel projectRel) { + List<String> rules = new ArrayList<>(); + + List<RexNode> exps = projectRel.getProjects(); + + for (int idx = 0; idx < exps.size(); ++idx) { + RexNode node = exps.get(idx); + if (node == null) { + rules.add("null"); + } + + if (node instanceof RexLiteral) { + rules.add(((RexLiteral) node).getValue() + ""); + } else { + if (node instanceof RexInputRef) { + rules.add("#in.getFieldValue(" + ((RexInputRef) node).getIndex() + ")"); + } + if (node instanceof RexCall) { + rules.add(CalciteToSpEL.rexcall2SpEL((RexCall) node)); + } + } + } + + checkArgument(rules.size() == exps.size(), "missing projects rules after conversion."); + + return rules; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java new file mode 100644 index 0000000..c7cbace --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.interpreter; + +import com.google.common.base.Joiner; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.beam.dsls.sql.planner.BeamSqlUnsupportedException; + +/** + * {@code CalciteToSpEL} is used in {@link BeamSQLSpELExecutor}, to convert a + * relational expression {@link RexCall} to SpEL expression. + * + */ +public class CalciteToSpEL { + + public static String rexcall2SpEL(RexCall cdn) { + List<String> parts = new ArrayList<>(); + for (RexNode subcdn : cdn.operands) { + if (subcdn instanceof RexCall) { + parts.add(rexcall2SpEL((RexCall) subcdn)); + } else { + parts.add(subcdn instanceof RexInputRef + ? "#in.getFieldValue(" + ((RexInputRef) subcdn).getIndex() + ")" : subcdn.toString()); + } + } + + String opName = cdn.op.getName(); + switch (cdn.op.getClass().getSimpleName()) { + case "SqlMonotonicBinaryOperator": // +-* + case "SqlBinaryOperator": // > < = >= <= <> OR AND || / . + switch (cdn.op.getName().toUpperCase()) { + case "AND": + return String.format(" ( %s ) ", Joiner.on("&&").join(parts)); + case "OR": + return String.format(" ( %s ) ", Joiner.on("||").join(parts)); + case "=": + return String.format(" ( %s ) ", Joiner.on("==").join(parts)); + case "<>": + return String.format(" ( %s ) ", Joiner.on("!=").join(parts)); + default: + return String.format(" ( %s ) ", Joiner.on(cdn.op.getName().toUpperCase()).join(parts)); + } + case "SqlCaseOperator": // CASE + return String.format(" (%s ? %s : %s)", parts.get(0), parts.get(1), parts.get(2)); + case "SqlCastFunction": // CAST + return parts.get(0); + case "SqlPostfixOperator": + switch (opName.toUpperCase()) { + case "IS NULL": + return String.format(" null == %s ", parts.get(0)); + case "IS NOT NULL": + return String.format(" null != %s ", parts.get(0)); + default: + throw new BeamSqlUnsupportedException(); + } + default: + throw new BeamSqlUnsupportedException(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java new file mode 100644 index 0000000..85235e2 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * interpreter generate runnable 'code' to execute SQL relational expressions. + */ +package org.beam.dsls.sql.interpreter; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java new file mode 100644 index 0000000..c6f5cf6 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL provides a new interface to run a SQL statement with Beam. + */ +package org.beam.dsls.sql; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java new file mode 100644 index 0000000..5a0c73d --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PCollection; +import org.beam.dsls.sql.rel.BeamRelNode; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.beam.dsls.sql.schema.BeamSQLRecordType; +import org.beam.dsls.sql.schema.BeamSQLRecordTypeCoder; +import org.beam.dsls.sql.schema.BeamSQLRow; +import org.beam.dsls.sql.schema.BeamSqlRowCoder; + +/** + * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam + * pipeline. + * + */ +public class BeamPipelineCreator { + private Map<String, BaseBeamTable> sourceTables; + private PCollection<BeamSQLRow> latestStream; + + private PipelineOptions options; + + private Pipeline pipeline; + + private boolean hasPersistent = false; + + public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables) { + this.sourceTables = sourceTables; + + options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() + .as(PipelineOptions.class); // FlinkPipelineOptions.class + options.setJobName("BeamPlanCreator"); + + pipeline = Pipeline.create(options); + CoderRegistry cr = pipeline.getCoderRegistry(); + cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); + cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); + } + + public PCollection<BeamSQLRow> getLatestStream() { + return latestStream; + } + + public void setLatestStream(PCollection<BeamSQLRow> latestStream) { + this.latestStream = latestStream; + } + + public Map<String, BaseBeamTable> getSourceTables() { + return sourceTables; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public boolean isHasPersistent() { + return hasPersistent; + } + + public void setHasPersistent(boolean hasPersistent) { + this.hasPersistent = hasPersistent; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java new file mode 100644 index 0000000..a31ace0 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.beam.dsls.sql.rel.BeamLogicalConvention; +import org.beam.dsls.sql.rel.BeamRelNode; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The core component to handle through a SQL statement, to submit a Beam + * pipeline. + * + */ +public class BeamQueryPlanner { + private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); + + protected final Planner planner; + private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); + + public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT); + + public BeamQueryPlanner(SchemaPlus schema) { + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, + Collections.<String>emptyList(), TYPE_FACTORY)); + + FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) + .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) + .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build(); + this.planner = Frameworks.getPlanner(config); + + for (String t : schema.getTableNames()) { + sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); + } + } + + /** + * With a Beam pipeline generated in {@link #compileBeamPipeline(String)}, + * submit it to run and wait until finish. + * + */ + public void submitToRun(String sqlStatement) throws Exception { + Pipeline pipeline = compileBeamPipeline(sqlStatement); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + } + + /** + * With the @{@link BeamRelNode} tree generated in + * {@link #convertToBeamRel(String)}, a Beam pipeline is generated. + * + */ + public Pipeline compileBeamPipeline(String sqlStatement) throws Exception { + BeamRelNode relNode = convertToBeamRel(sqlStatement); + + BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables); + return relNode.buildBeamPipeline(planCreator); + } + + /** + * It parses and validate the input query, then convert into a + * {@link BeamRelNode} tree. + * + */ + public BeamRelNode convertToBeamRel(String sqlStatement) + throws ValidationException, RelConversionException, SqlParseException { + return (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); + } + + private RelNode validateAndConvert(SqlNode sqlNode) + throws ValidationException, RelConversionException { + SqlNode validated = validateNode(sqlNode); + LOG.info("SQL:\n" + validated); + RelNode relNode = convertToRelNode(validated); + return convertToBeamRel(relNode); + } + + private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException { + RelTraitSet traitSet = relNode.getTraitSet(); + + LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode)); + + // PlannerImpl.transform() optimizes RelNode with ruleset + return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode); + } + + private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException { + return planner.rel(sqlNode).rel; + } + + private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { + SqlNode validatedSqlNode = planner.validate(sqlNode); + validatedSqlNode.accept(new UnsupportedOperatorsVisitor()); + return validatedSqlNode; + } + + public Map<String, BaseBeamTable> getSourceTables() { + return sourceTables; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java new file mode 100644 index 0000000..bf35296 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + +/** + * customized data type in Beam. + * + */ +public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl { + public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem(); + + @Override + public int getMaxNumericScale() { + return 38; + } + + @Override + public int getMaxNumericPrecision() { + return 38; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java new file mode 100644 index 0000000..3f40c27 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.Iterator; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.tools.RuleSet; +import org.beam.dsls.sql.rel.BeamRelNode; +import org.beam.dsls.sql.rule.BeamFilterRule; +import org.beam.dsls.sql.rule.BeamIOSinkRule; +import org.beam.dsls.sql.rule.BeamIOSourceRule; +import org.beam.dsls.sql.rule.BeamProjectRule; + +/** + * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard + * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode} + * + */ +public class BeamRuleSets { + private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet + .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, + BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE) + .build(); + + public static RuleSet[] getRuleSets() { + return new RuleSet[] { new BeamRuleSet( + ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) }; + } + + private static class BeamRuleSet implements RuleSet { + final ImmutableSet<RelOptRule> rules; + + public BeamRuleSet(ImmutableSet<RelOptRule> rules) { + this.rules = rules; + } + + public BeamRuleSet(ImmutableList<RelOptRule> rules) { + this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); + } + + @Override + public Iterator<RelOptRule> iterator() { + return rules.iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java new file mode 100644 index 0000000..94b341c --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlExplainLevel; +import org.beam.dsls.sql.rel.BeamRelNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for {@code BeamRelNode}. + */ +public class BeamSQLRelUtils { + private static final Logger LOG = LoggerFactory.getLogger(BeamSQLRelUtils.class); + + private static final AtomicInteger sequence = new AtomicInteger(0); + private static final AtomicInteger classSequence = new AtomicInteger(0); + + public static String getStageName(BeamRelNode relNode) { + return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + + sequence.getAndIncrement(); + } + + public static String getClassName(BeamRelNode relNode) { + return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + + "_" + classSequence.getAndIncrement(); + } + + public static BeamRelNode getBeamRelInput(RelNode input) { + if (input instanceof RelSubset) { + // go with known best input + input = ((RelSubset) input).getBest(); + } + return (BeamRelNode) input; + } + + public static String explain(final RelNode rel) { + return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES); + } + + public static String explain(final RelNode rel, SqlExplainLevel detailLevel) { + String explain = ""; + try { + explain = RelOptUtil.toString(rel); + } catch (StackOverflowError e) { + LOG.error("StackOverflowError occurred while extracting plan. " + + "Please report it to the dev@ mailing list."); + LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e); + LOG.error("Forcing plan to empty string and continue... " + + "SQL Runner may not working properly after."); + } + return explain; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java new file mode 100644 index 0000000..9581fcd --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import java.io.Serializable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.beam.dsls.sql.rel.BeamRelNode; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interface to explain, submit a SQL query. + * + */ +public class BeamSqlRunner implements Serializable { + /** + * + */ + private static final long serialVersionUID = -4708693435115005182L; + + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class); + + private SchemaPlus schema = Frameworks.createRootSchema(true); + + private BeamQueryPlanner planner = new BeamQueryPlanner(schema); + + /** + * Add a schema. + * + */ + public void addSchema(String schemaName, Schema scheme) { + schema.add(schemaName, schema); + } + + /** + * add a {@link BaseBeamTable} to schema repository. + * + */ + public void addTable(String tableName, BaseBeamTable table) { + schema.add(tableName, table); + planner.getSourceTables().put(tableName, table); + } + + /** + * submit as a Beam pipeline. + * + */ + public void submitQuery(String sqlString) throws Exception { + planner.submitToRun(sqlString); + planner.planner.close(); + } + + /** + * explain and display the execution plan. + * + */ + public String explainQuery(String sqlString) + throws ValidationException, RelConversionException, SqlParseException { + BeamRelNode exeTree = planner.convertToBeamRel(sqlString); + String beamPlan = RelOptUtil.toString(exeTree); + System.out.println(String.format("beamPlan>\n%s", beamPlan)); + + planner.planner.close(); + return beamPlan; + } + + protected BeamQueryPlanner getPlanner() { + return planner; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java new file mode 100644 index 0000000..a3475bb --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +/** + * Generic exception for un-supported operations. + * + */ +public class BeamSqlUnsupportedException extends RuntimeException { + /** + * + */ + private static final long serialVersionUID = 3445015747629217342L; + + public BeamSqlUnsupportedException(String string) { + super(string); + } + + public BeamSqlUnsupportedException() { + super(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java new file mode 100644 index 0000000..702381d --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.planner; + +import org.apache.calcite.sql.util.SqlShuttle; + +/** + * Unsupported operation to visit a RelNode. + * + */ +public class UnsupportedOperatorsVisitor extends SqlShuttle { + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java new file mode 100644 index 0000000..d98c584 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface. + * It defines data sources, validate a SQL statement, and convert it as a Beam + * pipeline. + */ +package org.beam.dsls.sql.planner; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java new file mode 100644 index 0000000..64f2d1f --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rel; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; +import org.beam.dsls.sql.planner.BeamPipelineCreator; +import org.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.beam.dsls.sql.schema.BeamSQLRow; +import org.beam.dsls.sql.transform.BeamSQLFilterFn; + +/** + * BeamRelNode to replace a {@code Filter} node. + * + */ +public class BeamFilterRel extends Filter implements BeamRelNode { + + public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new BeamFilterRel(getCluster(), traitSet, input, condition); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + + RelNode input = getInput(); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); + + BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); + + PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, + ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); + + planCreator.setLatestStream(projectStream); + + return planCreator.getPipeline(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java new file mode 100644 index 0000000..46654e5 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rel; + +import com.google.common.base.Joiner; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rex.RexNode; +import org.beam.dsls.sql.planner.BeamPipelineCreator; +import org.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * BeamRelNode to replace a {@code TableModify} node. + * + */ +public class BeamIOSinkRel extends TableModify implements BeamRelNode { + public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, + Prepare.CatalogReader catalogReader, RelNode child, Operation operation, + List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, + sourceExpressionList, flattened); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), + getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + + RelNode input = getInput(); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); + + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); + + BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName); + + upstream.apply(stageName, targetTable.buildIOWriter()); + + planCreator.setHasPersistent(true); + + return planCreator.getPipeline(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java new file mode 100644 index 0000000..f14db92 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rel; + +import com.google.common.base.Joiner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.TableScan; +import org.beam.dsls.sql.planner.BeamPipelineCreator; +import org.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.beam.dsls.sql.schema.BaseBeamTable; +import org.beam.dsls.sql.schema.BeamSQLRow; + +/** + * BeamRelNode to replace a {@code TableScan} node. + * + */ +public class BeamIOSourceRel extends TableScan implements BeamRelNode { + + public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", ""); + + BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName, + sourceTable.buildIOReader()); + + planCreator.setLatestStream(sourceStream); + + return planCreator.getPipeline(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java new file mode 100644 index 0000000..50fe8e0 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rel; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; + +/** + * Convertion for Beam SQL. + * + */ +public enum BeamLogicalConvention implements Convention { + INSTANCE; + + @Override + public Class getInterface() { + return BeamRelNode.class; + } + + @Override + public String getName() { + return "BEAM_LOGICAL"; + } + + @Override + public RelTraitDef getTraitDef() { + return ConventionTraitDef.INSTANCE; + } + + @Override + public boolean satisfies(RelTrait trait) { + return this == trait; + } + + @Override + public void register(RelOptPlanner planner) { + } + + @Override + public String toString() { + return getName(); + } + + @Override + public boolean canConvertConvention(Convention toConvention) { + return false; + } + + @Override + public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java new file mode 100644 index 0000000..e41d74e --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rel; + +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; +import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; +import org.beam.dsls.sql.planner.BeamPipelineCreator; +import org.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.beam.dsls.sql.schema.BeamSQLRecordType; +import org.beam.dsls.sql.schema.BeamSQLRow; +import org.beam.dsls.sql.transform.BeamSQLProjectFn; + +/** + * BeamRelNode to replace a {@code Project} node. + * + */ +public class BeamProjectRel extends Project implements BeamRelNode { + + /** + * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}. + * + */ + public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, + List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, + RelDataType rowType) { + return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + RelNode input = getInput(); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + + String stageName = BeamSQLRelUtils.getStageName(this); + + PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); + + BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); + + PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo + .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); + + planCreator.setLatestStream(projectStream); + + return planCreator.getPipeline(); + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java new file mode 100644 index 0000000..07ffee5 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rel; + +import org.apache.beam.sdk.Pipeline; +import org.apache.calcite.rel.RelNode; +import org.beam.dsls.sql.planner.BeamPipelineCreator; + +/** + * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's + * called by {@link BeamPipelineCreator}. + * + */ +public interface BeamRelNode extends RelNode { + + /** + * A {@link BeamRelNode} is a recursive structure, the + * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search) + * algorithm. + * + */ + Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java new file mode 100644 index 0000000..13dc962 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. + * + */ +package org.beam.dsls.sql.rel; http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java new file mode 100644 index 0000000..2ad7c07 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.beam.dsls.sql.rel.BeamFilterRel; +import org.beam.dsls.sql.rel.BeamLogicalConvention; + +/** + * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. + * + */ +public class BeamFilterRule extends ConverterRule { + public static final BeamFilterRule INSTANCE = new BeamFilterRule(); + + private BeamFilterRule() { + super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Filter filter = (Filter) rel; + final RelNode input = filter.getInput(); + + return new BeamFilterRel(filter.getCluster(), + filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + filter.getCondition()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java new file mode 100644 index 0000000..a44c002 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rule; + +import java.util.List; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; +import org.beam.dsls.sql.rel.BeamIOSinkRel; +import org.beam.dsls.sql.rel.BeamLogicalConvention; + +/** + * A {@code ConverterRule} to replace {@link TableModify} with + * {@link BeamIOSinkRel}. + * + */ +public class BeamIOSinkRule extends ConverterRule { + public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); + + private BeamIOSinkRule() { + super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSinkRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableModify tableModify = (TableModify) rel; + final RelNode input = tableModify.getInput(); + + final RelOptCluster cluster = tableModify.getCluster(); + final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); + final RelOptTable relOptTable = tableModify.getTable(); + final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); + final RelNode convertedInput = convert(input, + input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); + final TableModify.Operation operation = tableModify.getOperation(); + final List<String> updateColumnList = tableModify.getUpdateColumnList(); + final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); + final boolean flattened = tableModify.isFlattened(); + + final Table table = tableModify.getTable().unwrap(Table.class); + + switch (table.getJdbcTableType()) { + case TABLE: + case STREAM: + if (operation != TableModify.Operation.INSERT) { + throw new UnsupportedOperationException( + String.format("Streams doesn't support %s modify operation", operation)); + } + return new BeamIOSinkRel(cluster, traitSet, + relOptTable, catalogReader, convertedInput, operation, updateColumnList, + sourceExpressionList, flattened); + default: + throw new IllegalArgumentException( + String.format("Unsupported table type: %s", table.getJdbcTableType())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java new file mode 100644 index 0000000..9e4778b --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.beam.dsls.sql.rel.BeamIOSourceRel; +import org.beam.dsls.sql.rel.BeamLogicalConvention; + +/** + * A {@code ConverterRule} to replace {@link TableScan} with + * {@link BeamIOSourceRel}. + * + */ +public class BeamIOSourceRule extends ConverterRule { + public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); + + private BeamIOSourceRule() { + super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSourceRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableScan scan = (TableScan) rel; + + return new BeamIOSourceRel(scan.getCluster(), + scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java new file mode 100644 index 0000000..117a056 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.beam.dsls.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; +import org.beam.dsls.sql.rel.BeamLogicalConvention; +import org.beam.dsls.sql.rel.BeamProjectRel; + +/** + * A {@code ConverterRule} to replace {@link Project} with + * {@link BeamProjectRel}. + * + */ +public class BeamProjectRule extends ConverterRule { + public static final BeamProjectRule INSTANCE = new BeamProjectRule(); + + private BeamProjectRule() { + super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Project project = (Project) rel; + final RelNode input = project.getInput(); + + return new BeamProjectRel(project.getCluster(), + project.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + project.getProjects(), project.getRowType()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java new file mode 100644 index 0000000..56ddcf3 --- /dev/null +++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.calcite.plan.RelOptRule} to generate {@link org.beam.dsls.sql.rel.BeamRelNode}. + */ +package org.beam.dsls.sql.rule;
