redesign BeamSqlExpression to execute Calcite SQL expression. Changes: 1. revert BEAM dependency to 0.6.0 to avoid impact of changes in master branch; 2. updates as discussion during review;
refine BeamSqlRowCoder Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/464cc275 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/464cc275 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/464cc275 Branch: refs/heads/DSL_SQL Commit: 464cc275952305bf51ecfdf056e784441c9c2272 Parents: aa07a1d Author: mingmxu <[email protected]> Authored: Sat Apr 22 22:20:25 2017 -0700 Committer: Davor Bonaci <[email protected]> Committed: Thu May 4 00:12:11 2017 -0700 ---------------------------------------------------------------------- dsls/sql/pom.xml | 47 ++++++- .../beam/dsls/sql/example/BeamSqlExample.java | 5 - .../exception/BeamInvalidOperatorException.java | 34 +++++ .../exception/BeamSqlUnsupportedException.java | 34 +++++ .../sql/exception/InvalidFieldException.java | 34 +++++ .../beam/dsls/sql/exception/package-info.java | 23 +++ .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 140 +++++++++++++++++++ .../sql/interpreter/BeamSQLSpELExecutor.java | 127 ----------------- .../dsls/sql/interpreter/CalciteToSpEL.java | 81 ----------- .../operator/BeamSqlAndExpression.java | 60 ++++++++ .../operator/BeamSqlCompareExpression.java | 94 +++++++++++++ .../operator/BeamSqlEqualExpression.java | 48 +++++++ .../interpreter/operator/BeamSqlExpression.java | 62 ++++++++ .../operator/BeamSqlInputRefExpression.java | 46 ++++++ .../operator/BeamSqlIsNotNullExpression.java | 51 +++++++ .../operator/BeamSqlIsNullExpression.java | 51 +++++++ .../BeamSqlLargerThanEqualExpression.java | 49 +++++++ .../operator/BeamSqlLargerThanExpression.java | 49 +++++++ .../BeamSqlLessThanEqualExpression.java | 49 +++++++ .../operator/BeamSqlLessThanExpression.java | 49 +++++++ .../operator/BeamSqlNotEqualExpression.java | 48 +++++++ .../operator/BeamSqlOrExpression.java | 60 ++++++++ .../interpreter/operator/BeamSqlPrimitive.java | 102 ++++++++++++++ .../sql/interpreter/operator/package-info.java | 22 +++ .../beam/dsls/sql/planner/BeamQueryPlanner.java | 8 +- .../beam/dsls/sql/planner/BeamSqlRunner.java | 15 +- .../planner/BeamSqlUnsupportedException.java | 38 ----- .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 4 +- .../beam/dsls/sql/rel/BeamProjectRel.java | 4 +- .../beam/dsls/sql/schema/BaseBeamTable.java | 5 - .../beam/dsls/sql/schema/BeamSQLRecordType.java | 4 - .../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 15 +- .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 76 ++++++---- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 48 ++++--- .../dsls/sql/schema/InvalidFieldException.java | 34 ----- .../sql/schema/kafka/BeamKafkaCSVTable.java | 5 - .../dsls/sql/schema/kafka/BeamKafkaTable.java | 5 - .../dsls/sql/transform/BeamSQLFilterFn.java | 4 - .../sql/transform/BeamSQLOutputToConsoleFn.java | 4 - .../dsls/sql/transform/BeamSQLProjectFn.java | 5 - .../sql/interpreter/BeamSQLFnExecutorTest.java | 101 +++++++++++++ .../interpreter/BeamSQLFnExecutorTestBase.java | 91 ++++++++++++ .../operator/BeamNullExperssionTest.java | 53 +++++++ .../operator/BeamSqlAndOrExpressionTest.java | 59 ++++++++ .../operator/BeamSqlCompareExpressionTest.java | 108 ++++++++++++++ .../operator/BeamSqlInputRefExpressionTest.java | 58 ++++++++ .../operator/BeamSqlPrimitiveTest.java | 60 ++++++++ .../beam/dsls/sql/planner/BasePlanner.java | 28 +++- .../sql/planner/BeamPlannerExplainTest.java | 5 +- .../dsls/sql/planner/BeamPlannerSubmitTest.java | 7 +- .../dsls/sql/planner/MockedBeamSQLTable.java | 14 +- 51 files changed, 1803 insertions(+), 420 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index 21c8def..e2f09be 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -116,7 +116,42 @@ </plugin> </plugins> </build> - + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <version>0.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>0.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-kafka</artifactId> + <version>0.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <version>0.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-common-runner-api</artifactId> + <version>0.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-construction-java</artifactId> + <version>0.6.0</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> <dependency> <groupId>junit</groupId> @@ -130,6 +165,12 @@ <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-lite</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.beam</groupId> @@ -142,10 +183,6 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-expression</artifactId> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index d32bc59..303835f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -66,11 +66,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; */ public class BeamSqlExample implements Serializable { - /** - * - */ - private static final long serialVersionUID = 3673487843555563904L; - public static void main(String[] args) throws Exception { BeamSqlRunner runner = new BeamSqlRunner(); runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders")); http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java new file mode 100644 index 0000000..281ef89 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.exception; + +/** + * operation is not supported. + * + */ +public class BeamInvalidOperatorException extends RuntimeException { + + public BeamInvalidOperatorException(String string) { + super(string); + } + + public BeamInvalidOperatorException() { + super(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java new file mode 100644 index 0000000..02e843b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.exception; + +/** + * Generic exception for un-supported features/functions in BeamSQL. + * + */ +public class BeamSqlUnsupportedException extends RuntimeException { + + public BeamSqlUnsupportedException(String string) { + super(string); + } + + public BeamSqlUnsupportedException() { + super(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java new file mode 100644 index 0000000..82ebabe --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.exception; + +/** + * Exception when the field value and field type is not compatible. + * + */ +public class InvalidFieldException extends RuntimeException { + + public InvalidFieldException() { + super(); + } + + public InvalidFieldException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java new file mode 100644 index 0000000..619100c --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Exceptions in BeamSQL. + * + */ +package org.apache.beam.dsls.sql.exception; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java new file mode 100644 index 0000000..32e2ffc --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.rel.BeamFilterRel; +import org.apache.beam.dsls.sql.rel.BeamProjectRel; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; + +/** + * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. + * {@code BeamSQLFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression}, + * which can be evaluated against the {@link BeamSQLRow}. + * + */ +public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor { + protected List<BeamSqlExpression> exps; + + public BeamSQLFnExecutor(BeamRelNode relNode) { + this.exps = new ArrayList<>(); + if (relNode instanceof BeamFilterRel) { + BeamFilterRel filterNode = (BeamFilterRel) relNode; + RexNode condition = filterNode.getCondition(); + exps.add(buildExpression(condition)); + } else if (relNode instanceof BeamProjectRel) { + BeamProjectRel projectNode = (BeamProjectRel) relNode; + List<RexNode> projects = projectNode.getProjects(); + for (RexNode rexNode : projects) { + exps.add(buildExpression(rexNode)); + } + } else { + throw new BeamSqlUnsupportedException( + String.format("%s is not supported yet", relNode.getClass().toString())); + } + } + + /** + * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively, + * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}. + */ + private BeamSqlExpression buildExpression(RexNode rexNode) { + if (rexNode instanceof RexLiteral) { + RexLiteral node = (RexLiteral) rexNode; + return BeamSqlPrimitive.of(node.getTypeName(), node.getValue()); + } else if (rexNode instanceof RexInputRef) { + RexInputRef node = (RexInputRef) rexNode; + return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex()); + } else if (rexNode instanceof RexCall) { + RexCall node = (RexCall) rexNode; + String opName = node.op.getName(); + List<BeamSqlExpression> subExps = new ArrayList<>(); + for (RexNode subNode : node.operands) { + subExps.add(buildExpression(subNode)); + } + switch (opName) { + case "AND": + return new BeamSqlAndExpression(subExps); + case "OR": + return new BeamSqlOrExpression(subExps); + + case "=": + return new BeamSqlEqualExpression(subExps); + case "<>=": + return new BeamSqlNotEqualExpression(subExps); + case ">": + return new BeamSqlLargerThanExpression(subExps); + case ">=": + return new BeamSqlLargerThanEqualExpression(subExps); + case "<": + return new BeamSqlLessThanExpression(subExps); + case "<=": + return new BeamSqlLessThanEqualExpression(subExps); + + case "IS NULL": + return new BeamSqlIsNullExpression(subExps.get(0)); + case "IS NOT NULL": + return new BeamSqlIsNotNullExpression(subExps.get(0)); + default: + throw new BeamSqlUnsupportedException(); + } + } else { + throw new BeamSqlUnsupportedException( + String.format("%s is not supported yet", rexNode.getClass().toString())); + } + } + + @Override + public void prepare() { + } + + @Override + public List<Object> execute(BeamSQLRow inputRecord) { + List<Object> results = new ArrayList<>(); + for (BeamSqlExpression exp : exps) { + results.add(exp.evaluate(inputRecord).getValue()); + } + return results; + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java deleted file mode 100644 index 9c9c37f..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.interpreter; - -import static com.google.common.base.Preconditions.checkArgument; -import java.util.ArrayList; -import java.util.List; - -import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException; -import org.apache.beam.dsls.sql.rel.BeamFilterRel; -import org.apache.beam.dsls.sql.rel.BeamProjectRel; -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.springframework.expression.Expression; -import org.springframework.expression.ExpressionParser; -import org.springframework.expression.spel.SpelParserConfiguration; -import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.expression.spel.support.StandardEvaluationContext; - -/** - * {@code BeamSQLSpELExecutor} is one implementation, to convert Calcite SQL - * relational expression to SpEL expression. - * - */ -public class BeamSQLSpELExecutor implements BeamSQLExpressionExecutor { - /** - * - */ - private static final long serialVersionUID = 6777232573390074408L; - - private List<String> spelString; - private List<Expression> spelExpressions; - - public BeamSQLSpELExecutor(BeamRelNode relNode) { - this.spelString = new ArrayList<>(); - if (relNode instanceof BeamFilterRel) { - String filterSpEL = CalciteToSpEL - .rexcall2SpEL((RexCall) ((BeamFilterRel) relNode).getCondition()); - spelString.add(filterSpEL); - } else if (relNode instanceof BeamProjectRel) { - spelString.addAll(createProjectExps((BeamProjectRel) relNode)); - // List<ProjectRule> projectRules = - // for (int idx = 0; idx < projectRules.size(); ++idx) { - // spelString.add(projectRules.get(idx).getProjectExp()); - // } - } else { - throw new BeamSqlUnsupportedException( - String.format("%s is not supported yet", relNode.getClass().toString())); - } - } - - @Override - public void prepare() { - this.spelExpressions = new ArrayList<>(); - - SpelParserConfiguration config = new SpelParserConfiguration(true, true); - ExpressionParser parser = new SpelExpressionParser(config); - for (String el : spelString) { - spelExpressions.add(parser.parseExpression(el)); - } - } - - @Override - public List<Object> execute(BeamSQLRow inputRecord) { - StandardEvaluationContext inContext = new StandardEvaluationContext(); - inContext.setVariable("in", inputRecord); - - List<Object> results = new ArrayList<>(); - for (Expression ep : spelExpressions) { - results.add(ep.getValue(inContext)); - } - return results; - } - - @Override - public void close() { - - } - - private List<String> createProjectExps(BeamProjectRel projectRel) { - List<String> rules = new ArrayList<>(); - - List<RexNode> exps = projectRel.getProjects(); - - for (int idx = 0; idx < exps.size(); ++idx) { - RexNode node = exps.get(idx); - if (node == null) { - rules.add("null"); - } - - if (node instanceof RexLiteral) { - rules.add(((RexLiteral) node).getValue() + ""); - } else { - if (node instanceof RexInputRef) { - rules.add("#in.getFieldValue(" + ((RexInputRef) node).getIndex() + ")"); - } - if (node instanceof RexCall) { - rules.add(CalciteToSpEL.rexcall2SpEL((RexCall) node)); - } - } - } - - checkArgument(rules.size() == exps.size(), "missing projects rules after conversion."); - - return rules; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java deleted file mode 100644 index 6cdc31b..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.interpreter; - -import com.google.common.base.Joiner; -import java.util.ArrayList; -import java.util.List; - -import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; - -/** - * {@code CalciteToSpEL} is used in {@link BeamSQLSpELExecutor}, to convert a - * relational expression {@link RexCall} to SpEL expression. - * - */ -public class CalciteToSpEL { - - public static String rexcall2SpEL(RexCall cdn) { - List<String> parts = new ArrayList<>(); - for (RexNode subcdn : cdn.operands) { - if (subcdn instanceof RexCall) { - parts.add(rexcall2SpEL((RexCall) subcdn)); - } else { - parts.add(subcdn instanceof RexInputRef - ? "#in.getFieldValue(" + ((RexInputRef) subcdn).getIndex() + ")" : subcdn.toString()); - } - } - - String opName = cdn.op.getName(); - switch (cdn.op.getClass().getSimpleName()) { - case "SqlMonotonicBinaryOperator": // +-* - case "SqlBinaryOperator": // > < = >= <= <> OR AND || / . - switch (cdn.op.getName().toUpperCase()) { - case "AND": - return String.format(" ( %s ) ", Joiner.on("&&").join(parts)); - case "OR": - return String.format(" ( %s ) ", Joiner.on("||").join(parts)); - case "=": - return String.format(" ( %s ) ", Joiner.on("==").join(parts)); - case "<>": - return String.format(" ( %s ) ", Joiner.on("!=").join(parts)); - default: - return String.format(" ( %s ) ", Joiner.on(cdn.op.getName().toUpperCase()).join(parts)); - } - case "SqlCaseOperator": // CASE - return String.format(" (%s ? %s : %s)", parts.get(0), parts.get(1), parts.get(2)); - case "SqlCastFunction": // CAST - return parts.get(0); - case "SqlPostfixOperator": - switch (opName.toUpperCase()) { - case "IS NULL": - return String.format(" null == %s ", parts.get(0)); - case "IS NOT NULL": - return String.format(" null != %s ", parts.get(0)); - default: - throw new BeamSqlUnsupportedException(); - } - default: - throw new BeamSqlUnsupportedException(); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java new file mode 100644 index 0000000..55473b5 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for 'AND' operation. + */ +public class BeamSqlAndExpression extends BeamSqlExpression { + + private BeamSqlAndExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + public BeamSqlAndExpression(List<BeamSqlExpression> operands) { + this(operands, SqlTypeName.BOOLEAN); + } + + @Override + public boolean accept() { + for (BeamSqlExpression exp : operands) { + // only accept BOOLEAN expression as operand + if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) { + return false; + } + } + return true; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) { + boolean result = true; + for (BeamSqlExpression exp : operands) { + BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord); + result = result && expOut.getValue(); + if (!result) { + break; + } + } + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java new file mode 100644 index 0000000..bfb798d --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@link BeamSqlCompareExpression} is used for compare operations. + * + * <p>See {@link BeamSqlEqualExpression}, {@link BeamSqlLessThanExpression}, + * {@link BeamSqlLessThanEqualExpression}, {@link BeamSqlLargerThanExpression}, + * {@link BeamSqlLargerThanEqualExpression} and {@link BeamSqlNotEqualExpression} for more details. + * + */ +public abstract class BeamSqlCompareExpression extends BeamSqlExpression { + + private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + public BeamSqlCompareExpression(List<BeamSqlExpression> operands) { + this(operands, SqlTypeName.BOOLEAN); + } + + /** + * Compare operation must have 2 operands. + */ + @Override + public boolean accept() { + return operands.size() == 2; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) { + Object leftValue = operands.get(0).evaluate(inputRecord).getValue(); + Object rightValue = operands.get(1).evaluate(inputRecord).getValue(); + switch (operands.get(0).outputType) { + case BIGINT: + case DECIMAL: + case DOUBLE: + case FLOAT: + case INTEGER: + case SMALLINT: + case TINYINT: + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, + compare((Number) leftValue, (Number) rightValue)); + case BOOLEAN: + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, + compare((Boolean) leftValue, (Boolean) rightValue)); + case VARCHAR: + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, + compare((CharSequence) leftValue, (CharSequence) rightValue)); + default: + throw new BeamSqlUnsupportedException(toString()); + } + } + + /** + * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}. + */ + public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue); + + /** + * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}. + */ + public abstract Boolean compare(Boolean leftValue, Boolean rightValue); + + /** + * Compare between Number values, including {@link SqlTypeName#BIGINT}, + * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT}, + * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}. + */ + public abstract Boolean compare(Number leftValue, Number rightValue); + + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java new file mode 100644 index 0000000..4bc487b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; + +/** + * {@code BeamSqlExpression} for {@code =} operation. + */ +public class BeamSqlEqualExpression extends BeamSqlCompareExpression { + + public BeamSqlEqualExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + return !(leftValue ^ rightValue); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() == (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java new file mode 100644 index 0000000..c44795f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite. + * + * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression} + * as its operands, and return a value with type {@link SqlTypeName}. + * + */ +public abstract class BeamSqlExpression implements Serializable{ + protected List<BeamSqlExpression> operands; + protected SqlTypeName outputType; + + protected BeamSqlExpression(){} + + public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + this.operands = operands; + this.outputType = outputType; + } + + /** + * assertion to make sure the input and output are supported in this expression. + */ + public abstract boolean accept(); + + /** + * Apply input record {@link BeamSQLRow} to this expression, + * the output value is wrapped with {@link BeamSqlPrimitive}. + */ + public abstract BeamSqlPrimitive evaluate(BeamSQLRow inputRecord); + + public List<BeamSqlExpression> getOperands() { + return operands; + } + + public SqlTypeName getOutputType() { + return outputType; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java new file mode 100644 index 0000000..612108f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * An primitive operation for direct field extraction. + */ +public class BeamSqlInputRefExpression extends BeamSqlExpression{ + private int inputRef; + + public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) { + super(null, sqlTypeName); + this.inputRef = inputRef; + } + + @Override + public boolean accept() { + // TODO Auto-generated method stub + return false; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) { + return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef)); + } + + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java new file mode 100644 index 0000000..784584e --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for 'IS NOT NULL' operation. + */ +public class BeamSqlIsNotNullExpression extends BeamSqlExpression { + + private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + public BeamSqlIsNotNullExpression(BeamSqlExpression operand){ + this(Arrays.asList(operand), SqlTypeName.BOOLEAN); + } + + /** + * only one operand is required. + */ + @Override + public boolean accept() { + return operands.size() == 1; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) { + Object leftValue = operands.get(0).evaluate(inputRecord).getValue(); + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java new file mode 100644 index 0000000..b09ddbf --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for 'IS NULL' operation. + */ +public class BeamSqlIsNullExpression extends BeamSqlExpression { + + private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + public BeamSqlIsNullExpression(BeamSqlExpression operand){ + this(Arrays.asList(operand), SqlTypeName.BOOLEAN); + } + + /** + * only one operand is required. + */ + @Override + public boolean accept() { + return operands.size() == 1; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) { + Object leftValue = operands.get(0).evaluate(inputRecord).getValue(); + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java new file mode 100644 index 0000000..d78c020 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; + +/** + * {@code BeamSqlExpression} for {@code >=} operation. + */ +public class BeamSqlLargerThanEqualExpression extends BeamSqlCompareExpression { + + public BeamSqlLargerThanEqualExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new BeamInvalidOperatorException(">= is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() >= (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java new file mode 100644 index 0000000..0b0d6f1 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; + +/** + * {@code BeamSqlExpression} for {@code >} operation. + */ +public class BeamSqlLargerThanExpression extends BeamSqlCompareExpression { + + public BeamSqlLargerThanExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new BeamInvalidOperatorException("> is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() > (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java new file mode 100644 index 0000000..b6f7c9a --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; + +/** + * {@code BeamSqlExpression} for {@code <=} operation. + */ +public class BeamSqlLessThanEqualExpression extends BeamSqlCompareExpression { + + public BeamSqlLessThanEqualExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new BeamInvalidOperatorException("<= is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() <= (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java new file mode 100644 index 0000000..216a621 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; + +/** + * {@code BeamSqlExpression} for {@code <} operation. + */ +public class BeamSqlLessThanExpression extends BeamSqlCompareExpression { + + public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new BeamInvalidOperatorException("< is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() < (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java new file mode 100644 index 0000000..2b093bf --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; + +/** + * {@code BeamSqlExpression} for {@code <>} operation. + */ +public class BeamSqlNotEqualExpression extends BeamSqlCompareExpression { + + public BeamSqlNotEqualExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + return leftValue ^ rightValue; + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() != (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java new file mode 100644 index 0000000..4d07af8 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for 'OR' operation. + */ +public class BeamSqlOrExpression extends BeamSqlExpression { + + private BeamSqlOrExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + public BeamSqlOrExpression(List<BeamSqlExpression> operands) { + this(operands, SqlTypeName.BOOLEAN); + } + + @Override + public boolean accept() { + for (BeamSqlExpression exp : operands) { + // only accept BOOLEAN expression as operand + if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) { + return false; + } + } + return true; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) { + boolean result = false; + for (BeamSqlExpression exp : operands) { + BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord); + result = result || expOut.getValue(); + if (result) { + break; + } + } + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java new file mode 100644 index 0000000..71852ff --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.math.BigDecimal; +import java.util.List; +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}. + * It holds the value, and return it directly during {@link #evaluate(BeamSQLRow)}. + * + */ +public class BeamSqlPrimitive<T> extends BeamSqlExpression{ + private SqlTypeName outputType; + private T value; + + private BeamSqlPrimitive() { + } + + private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + /** + * A builder function to create from Type and value directly. + */ + public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){ + BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<T>(); + exp.outputType = outputType; + exp.value = value; + if (!exp.accept()) { + throw new BeamInvalidOperatorException( + String.format("value [%s] doesn't match type [%s].", value, outputType)); + } + return exp; + } + + public SqlTypeName getOutputType() { + return outputType; + } + + public T getValue() { + return value; + } + + @Override + public boolean accept() { + if (value == null) { + return true; + } + + switch (outputType) { + case BIGINT: + return value instanceof Long; + case DECIMAL: + return value instanceof BigDecimal; + case DOUBLE: + return value instanceof Double; + case FLOAT: + return value instanceof Float; + case INTEGER: + return value instanceof Integer; + case SMALLINT: + return value instanceof Short; + case TINYINT: + return value instanceof Byte; + case BOOLEAN: + return value instanceof Boolean; + case CHAR: + return value instanceof Character; + case VARCHAR: + return value instanceof String; + default: + throw new BeamSqlUnsupportedException(outputType.name()); + } + } + + @Override + public BeamSqlPrimitive<T> evaluate(BeamSQLRow inputRecord) { + return this; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java new file mode 100644 index 0000000..9b0a9a7 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}. + */ +package org.apache.beam.dsls.sql.interpreter.operator; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index aac86d6..935dae7 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -122,7 +122,13 @@ public class BeamQueryPlanner { */ public BeamRelNode convertToBeamRel(String sqlStatement) throws ValidationException, RelConversionException, SqlParseException { - return (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); + BeamRelNode beamRelNode; + try { + beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); + } finally { + planner.close(); + } + return beamRelNode; } private RelNode validateAndConvert(SqlNode sqlNode) http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java index e457e80..708c507 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java @@ -36,17 +36,17 @@ import org.slf4j.LoggerFactory; * */ public class BeamSqlRunner implements Serializable { - /** - * - */ - private static final long serialVersionUID = -4708693435115005182L; - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class); private SchemaPlus schema = Frameworks.createRootSchema(true); private BeamQueryPlanner planner = new BeamQueryPlanner(schema); + public BeamSqlRunner() { + //disable assertions in Calcite. + ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false); + } + /** * Add a schema. * @@ -70,7 +70,6 @@ public class BeamSqlRunner implements Serializable { */ public void submitQuery(String sqlString) throws Exception { planner.submitToRun(sqlString); - planner.planner.close(); } /** @@ -78,12 +77,10 @@ public class BeamSqlRunner implements Serializable { * */ public String explainQuery(String sqlString) - throws ValidationException, RelConversionException, SqlParseException { + throws ValidationException, RelConversionException, SqlParseException { BeamRelNode exeTree = planner.convertToBeamRel(sqlString); String beamPlan = RelOptUtil.toString(exeTree); System.out.println(String.format("beamPlan>\n%s", beamPlan)); - - planner.planner.close(); return beamPlan; } http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java deleted file mode 100644 index 7cb5243..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -/** - * Generic exception for un-supported operations. - * - */ -public class BeamSqlUnsupportedException extends RuntimeException { - /** - * - */ - private static final long serialVersionUID = 3445015747629217342L; - - public BeamSqlUnsupportedException(String string) { - super(string); - } - - public BeamSqlUnsupportedException() { - super(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index 10dd1be..477be5a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -18,7 +18,7 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor; import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRow; @@ -58,7 +58,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode { PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); - BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); + BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index dd731f8..7e27ab3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; -import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor; import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; @@ -69,7 +69,7 @@ public class BeamProjectRel extends Project implements BeamRelNode { PCollection<BeamSQLRow> upstream = planCreator.getLatestStream(); - BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this); + BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java index 81829e9..2ecfa38 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java @@ -37,11 +37,6 @@ import org.apache.calcite.schema.Statistics; * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. */ public abstract class BaseBeamTable implements ScannableTable, Serializable { - - /** - * - */ - private static final long serialVersionUID = -1262988061830914193L; private RelDataType relDataType; protected BeamSQLRecordType beamSqlRecordType; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java index 661b155..e4013bc 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java @@ -30,10 +30,6 @@ import org.apache.calcite.sql.type.SqlTypeName; */ //@DefaultCoder(BeamSQLRecordTypeCoder.class) public class BeamSQLRecordType implements Serializable { - /** - * - */ - private static final long serialVersionUID = -5318734648766104712L; private List<String> fieldsName = new ArrayList<>(); private List<SqlTypeName> fieldsType = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java index ec330f1..b88a195 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java @@ -54,35 +54,34 @@ public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> { for (SqlTypeName fieldType : value.getFieldsType()) { stringCoder.encode(fieldType.name(), outStream, nested); } - outStream.flush(); + //add a dummy field to indicate the end of record + intCoder.encode(value.size(), outStream, context); } @Override public BeamSQLRecordType decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { BeamSQLRecordType typeRecord = new BeamSQLRecordType(); - Context nested = context.nested(); - int size = intCoder.decode(inStream, nested); + int size = intCoder.decode(inStream, context.nested()); for (int idx = 0; idx < size; ++idx) { - typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested)); + typeRecord.getFieldsName().add(stringCoder.decode(inStream, context.nested())); } for (int idx = 0; idx < size; ++idx) { - typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested))); + typeRecord.getFieldsType().add( + SqlTypeName.valueOf(stringCoder.decode(inStream, context.nested()))); } + intCoder.decode(inStream, context); return typeRecord; } @Override public List<? extends Coder<?>> getCoderArguments() { - // TODO Auto-generated method stub return null; } @Override public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - // TODO Auto-generated method stub - } } http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java index b65e23b..f9dab8a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List; +import org.apache.beam.dsls.sql.exception.InvalidFieldException; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -28,10 +29,6 @@ import org.apache.calcite.sql.type.SqlTypeName; * */ public class BeamSQLRow implements Serializable { - /** - * - */ - private static final long serialVersionUID = 4569220242480160895L; private List<Integer> nullFields = new ArrayList<>(); private List<Object> dataValues; @@ -42,12 +39,15 @@ public class BeamSQLRow implements Serializable { this.dataValues = new ArrayList<>(); for (int idx = 0; idx < dataType.size(); ++idx) { dataValues.add(null); + nullFields.add(idx); } } public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) { - this.dataValues = dataValues; - this.dataType = dataType; + this(dataType); + for (int idx = 0; idx < dataValues.size(); ++idx) { + addField(idx, dataValues.get(idx)); + } } public void addField(String fieldName, Object fieldValue) { @@ -56,19 +56,29 @@ public class BeamSQLRow implements Serializable { public void addField(int index, Object fieldValue) { if (fieldValue == null) { - dataValues.set(index, fieldValue); - if (!nullFields.contains(index)) { - nullFields.add(index); - } return; + } else { + if (nullFields.contains(index)) { + nullFields.remove(nullFields.indexOf(index)); + } } SqlTypeName fieldType = dataType.getFieldsType().get(index); switch (fieldType) { case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; case TINYINT: - if (!(fieldValue instanceof Integer)) { + if (!(fieldValue instanceof Byte)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); } @@ -97,24 +107,24 @@ public class BeamSQLRow implements Serializable { String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); } break; - case TIME: - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; default: throw new UnsupportedDataTypeException(fieldType); } dataValues.set(index, fieldValue); } + public short getShort(int idx) { + return (Short) getFieldValue(idx); + } public int getInteger(int idx) { return (Integer) getFieldValue(idx); } + public float getFloat(int idx) { + return (Float) getFieldValue(idx); + } + public double getDouble(int idx) { return (Double) getFieldValue(idx); } @@ -145,48 +155,52 @@ public class BeamSQLRow implements Serializable { switch (fieldType) { case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } case TINYINT: - if (!(fieldValue instanceof Integer)) { + if (!(fieldValue instanceof Byte)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); } else { - return Integer.valueOf(fieldValue.toString()); + return fieldValue; } case DOUBLE: if (!(fieldValue instanceof Double)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); } else { - return Double.valueOf(fieldValue.toString()); + return fieldValue; } case BIGINT: if (!(fieldValue instanceof Long)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); } else { - return Long.valueOf(fieldValue.toString()); + return fieldValue; } case FLOAT: if (!(fieldValue instanceof Float)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); } else { - return Float.valueOf(fieldValue.toString()); + return fieldValue; } case VARCHAR: if (!(fieldValue instanceof String)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); } else { - return fieldValue.toString(); - } - case TIME: - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { return fieldValue; } default:
