http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java deleted file mode 100644 index 144acbf..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator.string; - -import java.util.List; - -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * String position operator. - * - * <p> - * example: - * POSITION(string1 IN string2) - * POSITION(string1 IN string2 FROM integer) - * </p> - */ -public class BeamSqlPositionExpression extends BeamSqlExpression { - public BeamSqlPositionExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.INTEGER); - } - - @Override public boolean accept() { - if (operands.size() < 2 || operands.size() > 3) { - return false; - } - - if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) - || !SqlTypeName.CHAR_TYPES.contains(opType(1))) { - return false; - } - - if (operands.size() == 3 - && !SqlTypeName.INT_TYPES.contains(opType(2))) { - return false; - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String targetStr = opValueEvaluated(0, inputRow); - String containingStr = opValueEvaluated(1, inputRow); - int from = -1; - if (operands.size() == 3) { - Number tmp = opValueEvaluated(2, inputRow); - from = tmp.intValue(); - } - - int idx = containingStr.indexOf(targetStr, from); - - return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java deleted file mode 100644 index d931db9..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator.string; - -import java.util.List; - -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Base class for all string unary operators. - */ -public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression { - public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - @Override public boolean accept() { - if (operands.size() != 1) { - return false; - } - - if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) { - return false; - } - - return true; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java deleted file mode 100644 index 8b33125..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator.string; - -import java.util.List; - -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'SUBSTRING' operator. - * - * <p> - * SUBSTRING(string FROM integer) - * SUBSTRING(string FROM integer FOR integer) - * </p> - */ -public class BeamSqlSubstringExpression extends BeamSqlExpression { - public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public boolean accept() { - if (operands.size() < 2 || operands.size() > 3) { - return false; - } - - if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) - || !SqlTypeName.INT_TYPES.contains(opType(1))) { - return false; - } - - if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) { - return false; - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - int idx = opValueEvaluated(1, inputRow); - int startIdx = idx; - if (startIdx > 0) { - // NOTE: SQL substring is 1 based(rather than 0 based) - startIdx -= 1; - } else if (startIdx < 0) { - // NOTE: SQL also support negative index... - startIdx += str.length(); - } else { - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ""); - } - - if (operands.size() == 3) { - int length = opValueEvaluated(2, inputRow); - if (length < 0) { - length = 0; - } - int endIdx = Math.min(startIdx + length, str.length()); - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx)); - } else { - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx)); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java deleted file mode 100644 index 5e6c2bb..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.fun.SqlTrimFunction; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Trim operator. - * - * <p> - * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2) - * </p> - */ -public class BeamSqlTrimExpression extends BeamSqlExpression { - public BeamSqlTrimExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public boolean accept() { - if (operands.size() != 1 && operands.size() != 3) { - return false; - } - - if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) { - return false; - } - - if (operands.size() == 3 - && ( - SqlTypeName.SYMBOL != opType(0) - || !SqlTypeName.CHAR_TYPES.contains(opType(1)) - || !SqlTypeName.CHAR_TYPES.contains(opType(2))) - ) { - return false; - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - if (operands.size() == 1) { - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, - opValueEvaluated(0, inputRow).toString().trim()); - } else { - SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow); - String targetStr = opValueEvaluated(1, inputRow); - String containingStr = opValueEvaluated(2, inputRow); - - switch (type) { - case LEADING: - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr)); - case TRAILING: - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr)); - case BOTH: - default: - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, - trailingTrim(leadingTrim(containingStr, targetStr), targetStr)); - } - } - } - - static String leadingTrim(String containingStr, String targetStr) { - int idx = 0; - while (containingStr.startsWith(targetStr, idx)) { - idx += targetStr.length(); - } - - return containingStr.substring(idx); - } - - static String trailingTrim(String containingStr, String targetStr) { - int idx = containingStr.length() - targetStr.length(); - while (containingStr.startsWith(targetStr, idx)) { - idx -= targetStr.length(); - } - - idx += targetStr.length(); - return containingStr.substring(0, idx); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java deleted file mode 100644 index efa9c95..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator.string; - -import java.util.List; - -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'UPPER' operator. - */ -public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression { - public BeamSqlUpperExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java deleted file mode 100644 index f2c63f3..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * String operators. - */ -package org.apache.beam.dsls.sql.interpreter.operator.string; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java deleted file mode 100644 index 178d35f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * interpreter generate runnable 'code' to execute SQL relational expressions. - */ -package org.apache.beam.dsls.sql.interpreter; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java deleted file mode 100644 index b26e8c4..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * BeamSQL provides a new interface to run a SQL statement with Beam. - */ -package org.apache.beam.dsls.sql; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java deleted file mode 100644 index 93f9a2f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.Lex; -import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.rel.RelCollationTraitDef; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlOperatorTable; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.sql.util.ChainedSqlOperatorTable; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.Planner; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The core component to handle through a SQL statement, from explain execution plan, - * to generate a Beam pipeline. - * - */ -public class BeamQueryPlanner { - private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); - - protected final Planner planner; - private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); - - public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT); - - public BeamQueryPlanner(SchemaPlus schema) { - final List<RelTraitDef> traitDefs = new ArrayList<>(); - traitDefs.add(ConventionTraitDef.INSTANCE); - traitDefs.add(RelCollationTraitDef.INSTANCE); - - List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); - sqlOperatorTables.add(SqlStdOperatorTable.instance()); - sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, - Collections.<String>emptyList(), TYPE_FACTORY)); - - FrameworkConfig config = Frameworks.newConfigBuilder() - .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) - .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) - .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM) - .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)) - .build(); - this.planner = Frameworks.getPlanner(config); - - for (String t : schema.getTableNames()) { - sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); - } - } - - /** - * Parse input SQL query, and return a {@link SqlNode} as grammar tree. - */ - public SqlNode parseQuery(String sqlQuery) throws SqlParseException{ - return planner.parse(sqlQuery); - } - - /** - * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, - * which is linked with the given {@code pipeline}. The final output stream is returned as - * {@code PCollection} so more operations can be applied. - */ - public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline - , BeamSqlEnv sqlEnv) throws Exception { - BeamRelNode relNode = convertToBeamRel(sqlStatement); - - // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. - return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv); - } - - /** - * It parses and validate the input query, then convert into a - * {@link BeamRelNode} tree. - * - */ - public BeamRelNode convertToBeamRel(String sqlStatement) - throws ValidationException, RelConversionException, SqlParseException { - BeamRelNode beamRelNode; - try { - beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); - } finally { - planner.close(); - } - return beamRelNode; - } - - private RelNode validateAndConvert(SqlNode sqlNode) - throws ValidationException, RelConversionException { - SqlNode validated = validateNode(sqlNode); - LOG.info("SQL:\n" + validated); - RelNode relNode = convertToRelNode(validated); - return convertToBeamRel(relNode); - } - - private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException { - RelTraitSet traitSet = relNode.getTraitSet(); - - LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode)); - - // PlannerImpl.transform() optimizes RelNode with ruleset - return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode); - } - - private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException { - return planner.rel(sqlNode).rel; - } - - private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { - return planner.validate(sqlNode); - } - - public Map<String, BaseBeamTable> getSourceTables() { - return sourceTables; - } - - public Planner getPlanner() { - return planner; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java deleted file mode 100644 index c89a740..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.type.RelDataTypeSystemImpl; - -/** - * customized data type in Beam. - * - */ -public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl { - public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem(); - - @Override - public int getMaxNumericScale() { - return 38; - } - - @Override - public int getMaxNumericPrecision() { - return 38; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java deleted file mode 100644 index 552ff8f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import java.util.Iterator; -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.rule.BeamAggregationRule; -import org.apache.beam.dsls.sql.rule.BeamFilterRule; -import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; -import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; -import org.apache.beam.dsls.sql.rule.BeamIntersectRule; -import org.apache.beam.dsls.sql.rule.BeamJoinRule; -import org.apache.beam.dsls.sql.rule.BeamMinusRule; -import org.apache.beam.dsls.sql.rule.BeamProjectRule; -import org.apache.beam.dsls.sql.rule.BeamSortRule; -import org.apache.beam.dsls.sql.rule.BeamUnionRule; -import org.apache.beam.dsls.sql.rule.BeamValuesRule; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.tools.RuleSet; - -/** - * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard - * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode} - * - */ -public class BeamRuleSets { - private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet - .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, - BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, - BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, - BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE, - BeamJoinRule.INSTANCE) - .build(); - - public static RuleSet[] getRuleSets() { - return new RuleSet[] { new BeamRuleSet( - ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) }; - } - - private static class BeamRuleSet implements RuleSet { - final ImmutableSet<RelOptRule> rules; - - public BeamRuleSet(ImmutableSet<RelOptRule> rules) { - this.rules = rules; - } - - public BeamRuleSet(ImmutableList<RelOptRule> rules) { - this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); - } - - @Override - public Iterator<RelOptRule> iterator() { - return rules.iterator(); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java deleted file mode 100644 index 0506c5b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * {@link org.apache.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface. - * It defines data sources, validate a SQL statement, and convert it as a Beam - * pipeline. - */ -package org.apache.beam.dsls.sql.planner; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java deleted file mode 100644 index 9dcb079..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.WithTimestamps; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.linq4j.Ord; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Util; -import org.joda.time.Duration; - -/** - * {@link BeamRelNode} to replace a {@link Aggregate} node. - * - */ -public class BeamAggregationRel extends Aggregate implements BeamRelNode { - private int windowFieldIdx = -1; - private WindowFn<BeamSqlRow, BoundedWindow> windowFn; - private Trigger trigger; - private Duration allowedLatence = Duration.ZERO; - - public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits - , RelNode child, boolean indicator, - ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls - , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) { - super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); - this.windowFn = windowFn; - this.trigger = trigger; - this.windowFieldIdx = windowFieldIdx; - this.allowedLatence = allowedLatence; - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this) + "_"; - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - if (windowFieldIdx != -1) { - upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps - .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) - .setCoder(upstream.getCoder()); - } - - PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window", - Window.into(windowFn) - .triggering(trigger) - .withAllowedLateness(allowedLatence) - .accumulatingFiredPanes()); - - BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply( - stageName + "exCombineBy", - WithKeys - .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( - windowFieldIdx, groupSet))) - .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); - - - BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply( - stageName + "combineBy", - Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( - new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(), - CalciteUtils.toBeamRowType(input.getRowType())))) - .setCoder(KvCoder.of(keyCoder, aggCoder)); - - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", - ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( - CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx))); - mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return mergedStream; - } - - /** - * Type of sub-rowrecord used as Group-By keys. - */ - private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) { - BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType); - List<String> fieldNames = new ArrayList<>(); - List<Integer> fieldTypes = new ArrayList<>(); - for (int i : groupSet.asList()) { - if (i != windowFieldIdx) { - fieldNames.add(inputRowType.getFieldsName().get(i)); - fieldTypes.add(inputRowType.getFieldsType().get(i)); - } - } - return BeamSqlRowType.create(fieldNames, fieldTypes); - } - - /** - * Type of sub-rowrecord, that represents the list of aggregation fields. - */ - private BeamSqlRowType exAggFieldsSchema() { - List<String> fieldNames = new ArrayList<>(); - List<Integer> fieldTypes = new ArrayList<>(); - for (AggregateCall ac : getAggCallList()) { - fieldNames.add(ac.name); - fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); - } - - return BeamSqlRowType.create(fieldNames, fieldTypes); - } - - @Override - public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator - , ImmutableBitSet groupSet, - List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { - return new BeamAggregationRel(getCluster(), traitSet, input, indicator - , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence); - } - - public void setWindowFn(WindowFn windowFn) { - this.windowFn = windowFn; - } - - public void setTrigger(Trigger trigger) { - this.trigger = trigger; - } - - public RelWriter explainTerms(RelWriter pw) { - // We skip the "groups" element if it is a singleton of "group". - pw.item("group", groupSet) - .itemIf("window", windowFn, windowFn != null) - .itemIf("trigger", trigger, trigger != null) - .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1) - .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE) - .itemIf("indicator", indicator, indicator) - .itemIf("aggs", aggCalls, pw.nest()); - if (!pw.nest()) { - for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) { - pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e); - } - } - return pw; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java deleted file mode 100644 index f802104..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; -import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rex.RexNode; - -/** - * BeamRelNode to replace a {@code Filter} node. - * - */ -public class BeamFilterRel extends Filter implements BeamRelNode { - - public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, - RexNode condition) { - super(cluster, traits, child, condition); - } - - @Override - public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { - return new BeamFilterRel(getCluster(), traitSet, input, condition); - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - - BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - - PCollection<BeamSqlRow> filterStream = upstream.apply(stageName, - ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); - filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return filterStream; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java deleted file mode 100644 index d70f94a..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import com.google.common.base.Joiner; -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.Prepare; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.TableModify; -import org.apache.calcite.rex.RexNode; - -/** - * BeamRelNode to replace a {@code TableModify} node. - * - */ -public class BeamIOSinkRel extends TableModify implements BeamRelNode { - public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, - Prepare.CatalogReader catalogReader, RelNode child, Operation operation, - List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { - super(cluster, traits, table, catalogReader, child, operation, updateColumnList, - sourceExpressionList, flattened); - } - - @Override - public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), - getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); - } - - /** - * Note that {@code BeamIOSinkRel} returns the input PCollection, - * which is the persisted PCollection. - */ - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - - String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - - BaseBeamTable targetTable = sqlEnv.findTable(sourceName); - - upstream.apply(stageName, targetTable.buildIOWriter()); - - return upstream; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java deleted file mode 100644 index 6754991..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import com.google.common.base.Joiner; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.core.TableScan; - -/** - * BeamRelNode to replace a {@code TableScan} node. - * - */ -public class BeamIOSourceRel extends TableScan implements BeamRelNode { - - public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { - super(cluster, traitSet, table); - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - - TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName); - if (inputPCollections.has(sourceTupleTag)) { - //choose PCollection from input PCollectionTuple if exists there. - PCollection<BeamSqlRow> sourceStream = inputPCollections - .get(new TupleTag<BeamSqlRow>(sourceName)); - return sourceStream; - } else { - //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). - BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); - return sourceTable.buildIOReader(inputPCollections.getPipeline()) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java deleted file mode 100644 index 7cab171..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Intersect; -import org.apache.calcite.rel.core.SetOp; - -/** - * {@code BeamRelNode} to replace a {@code Intersect} node. - * - * <p>This is used to combine two SELECT statements, but returns rows only from the - * first SELECT statement that are identical to a row in the second SELECT statement. - */ -public class BeamIntersectRel extends Intersect implements BeamRelNode { - private BeamSetOperatorRelBase delegate; - public BeamIntersectRel( - RelOptCluster cluster, - RelTraitSet traits, - List<RelNode> inputs, - boolean all) { - super(cluster, traits, inputs, all); - delegate = new BeamSetOperatorRelBase(this, - BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all); - } - - @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { - return new BeamIntersectRel(getCluster(), traitSet, inputs, all); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - return delegate.buildBeamPipeline(inputPCollections, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java deleted file mode 100644 index 3ebf152..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.transform.BeamJoinTransforms; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.CorrelationId; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.util.Pair; - -/** - * {@code BeamRelNode} to replace a {@code Join} node. - * - * <p>Support for join can be categorized into 3 cases: - * <ul> - * <li>BoundedTable JOIN BoundedTable</li> - * <li>UnboundedTable JOIN UnboundedTable</li> - * <li>BoundedTable JOIN UnboundedTable</li> - * </ul> - * - * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both - * sides match. - * - * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some - * constraints: - * - * <ul> - * <li>{@code FULL OUTER JOIN} is not supported.</li> - * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li> - * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li> - * </ul> - * - * - * <p>There are also some general constraints: - * - * <ul> - * <li>Only equi-join is supported.</li> - * <li>CROSS JOIN is not supported.</li> - * </ul> - */ -public class BeamJoinRel extends Join implements BeamRelNode { - public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, - RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { - super(cluster, traits, left, right, condition, variablesSet, joinType); - } - - @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, - RelNode right, JoinRelType joinType, boolean semiJoinDone) { - return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, - joinType); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, - BeamSqlEnv sqlEnv) - throws Exception { - BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); - BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); - PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); - - final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); - PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); - - String stageName = BeamSqlRelUtils.getStageName(this); - WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); - WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); - - // extract the join fields - List<Pair<Integer, Integer>> pairs = extractJoinColumns( - leftRelNode.getRowType().getFieldCount()); - - // build the extract key type - // the name of the join field is not important - List<String> names = new ArrayList<>(pairs.size()); - List<Integer> types = new ArrayList<>(pairs.size()); - for (int i = 0; i < pairs.size(); i++) { - names.add("c" + i); - types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); - } - BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types); - - Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); - - // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow> - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows - .apply(stageName + "_left_ExtractJoinFields", - MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) - .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); - - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows - .apply(stageName + "_right_ExtractJoinFields", - MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) - .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); - - // prepare the NullRows - BeamSqlRow leftNullRow = buildNullRow(leftRelNode); - BeamSqlRow rightNullRow = buildNullRow(rightRelNode); - - // a regular join - if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED - && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) - || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED - && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) { - try { - leftWinFn.verifyCompatibility(rightWinFn); - } catch (IncompatibleWindowException e) { - throw new IllegalArgumentException( - "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e); - } - - return standardJoin(extractedLeftRows, extractedRightRows, - leftNullRow, rightNullRow, stageName); - } else if ( - (leftRows.isBounded() == PCollection.IsBounded.BOUNDED - && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) - || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED - && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) - ) { - // if one of the sides is Bounded & the other is Unbounded - // then do a sideInput join - // when doing a sideInput join, the windowFn does not need to match - // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be - // the unbounded - if (joinType == JoinRelType.FULL) { - throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join " - + "a bounded table with an unbounded table."); - } - - if ((joinType == JoinRelType.LEFT - && leftRows.isBounded() == PCollection.IsBounded.BOUNDED) - || (joinType == JoinRelType.RIGHT - && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) { - throw new UnsupportedOperationException( - "LEFT side of an OUTER JOIN must be Unbounded table."); - } - - return sideInputJoin(extractedLeftRows, extractedRightRows, - leftNullRow, rightNullRow); - } else { - throw new UnsupportedOperationException( - "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn); - } - } - - private PCollection<BeamSqlRow> standardJoin( - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, - BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { - PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null; - switch (joinType) { - case LEFT: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow); - break; - case RIGHT: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow); - break; - case FULL: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow, - rightNullRow); - break; - case INNER: - default: - joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join - .innerJoin(extractedLeftRows, extractedRightRows); - break; - } - - PCollection<BeamSqlRow> ret = joinedRows - .apply(stageName + "_JoinParts2WholeRow", - MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - return ret; - } - - public PCollection<BeamSqlRow> sideInputJoin( - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, - BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { - // we always make the Unbounded table on the left to do the sideInput join - // (will convert the result accordingly before return) - boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED); - JoinRelType realJoinType = - (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType; - - PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows = - swapped ? extractedRightRows : extractedLeftRows; - PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows = - swapped ? extractedLeftRows : extractedRightRows; - BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; - - // swapped still need to pass down because, we need to swap the result back. - return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, - realRightNullRow, swapped); - } - - private PCollection<BeamSqlRow> sideInputJoinHelper( - JoinRelType joinType, - PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows, - PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows, - BeamSqlRow rightNullRow, boolean swapped) { - final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows - .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap()); - - PCollection<BeamSqlRow> ret = leftRows - .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( - joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) - .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return ret; - } - - private BeamSqlRow buildNullRow(BeamRelNode relNode) { - BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); - BeamSqlRow nullRow = new BeamSqlRow(leftType); - for (int i = 0; i < leftType.size(); i++) { - nullRow.addField(i, null); - } - return nullRow; - } - - private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { - // it's a CROSS JOIN because: condition == true - if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { - throw new UnsupportedOperationException("CROSS JOIN is not supported!"); - } - - RexCall call = (RexCall) condition; - List<Pair<Integer, Integer>> pairs = new ArrayList<>(); - if ("AND".equals(call.getOperator().getName())) { - List<RexNode> operands = call.getOperands(); - for (RexNode rexNode : operands) { - Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); - pairs.add(pair); - } - } else if ("=".equals(call.getOperator().getName())) { - pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); - } else { - throw new UnsupportedOperationException( - "Operator " + call.getOperator().getName() + " is not supported in join condition"); - } - - return pairs; - } - - private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, - int leftRowColumnCount) { - List<RexNode> operands = oneCondition.getOperands(); - final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), - ((RexInputRef) operands.get(1)).getIndex()); - - final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), - ((RexInputRef) operands.get(1)).getIndex()); - final int rightIndex = rightIndex1 - leftRowColumnCount; - - return new Pair<>(leftIndex, rightIndex); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java deleted file mode 100644 index 704a374..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import org.apache.calcite.plan.Convention; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelTrait; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.RelTraitSet; - -/** - * Convertion for Beam SQL. - * - */ -public enum BeamLogicalConvention implements Convention { - INSTANCE; - - @Override - public Class getInterface() { - return BeamRelNode.class; - } - - @Override - public String getName() { - return "BEAM_LOGICAL"; - } - - @Override - public RelTraitDef getTraitDef() { - return ConventionTraitDef.INSTANCE; - } - - @Override - public boolean satisfies(RelTrait trait) { - return this == trait; - } - - @Override - public void register(RelOptPlanner planner) { - } - - @Override - public String toString() { - return getName(); - } - - @Override - public boolean canConvertConvention(Convention toConvention) { - return false; - } - - @Override - public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java deleted file mode 100644 index b558f4b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Minus; -import org.apache.calcite.rel.core.SetOp; - -/** - * {@code BeamRelNode} to replace a {@code Minus} node. - * - * <p>Corresponds to the SQL {@code EXCEPT} operator. - */ -public class BeamMinusRel extends Minus implements BeamRelNode { - - private BeamSetOperatorRelBase delegate; - - public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, - boolean all) { - super(cluster, traits, inputs, all); - delegate = new BeamSetOperatorRelBase(this, - BeamSetOperatorRelBase.OpType.MINUS, inputs, all); - } - - @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { - return new BeamMinusRel(getCluster(), traitSet, inputs, all); - } - - @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - return delegate.buildBeamPipeline(inputPCollections, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java deleted file mode 100644 index 8f8e5ce..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; -import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; - -/** - * BeamRelNode to replace a {@code Project} node. - * - */ -public class BeamProjectRel extends Project implements BeamRelNode { - - /** - * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}. - * - */ - public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, - List<? extends RexNode> projects, RelDataType rowType) { - super(cluster, traits, input, projects, rowType); - } - - @Override - public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, - RelDataType rowType) { - return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType); - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this); - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - - BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); - - PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo - .of(new BeamSqlProjectFn(getRelTypeName(), executor, - CalciteUtils.toBeamRowType(rowType)))); - projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return projectStream; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java deleted file mode 100644 index d4c98a3..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.rel; - -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.rel.RelNode; - -/** - * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added. - */ -public interface BeamRelNode extends RelNode { - - /** - * A {@link BeamRelNode} is a recursive structure, the - * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) - * algorithm. - */ - PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) - throws Exception; -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java deleted file mode 100644 index 939c9c8..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.calcite.rel.RelNode; - -/** - * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} - * and {@code BeamMinusRel}. - */ -public class BeamSetOperatorRelBase { - /** - * Set operator type. - */ - public enum OpType implements Serializable { - UNION, - INTERSECT, - MINUS - } - - private BeamRelNode beamRelNode; - private List<RelNode> inputs; - private boolean all; - private OpType opType; - - public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, - List<RelNode> inputs, boolean all) { - this.beamRelNode = beamRelNode; - this.opType = opType; - this.inputs = inputs; - this.all = all; - } - - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) - .buildBeamPipeline(inputPCollections, sqlEnv); - PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) - .buildBeamPipeline(inputPCollections, sqlEnv); - - WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); - WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); - if (!leftWindow.isCompatible(rightWindow)) { - throw new IllegalArgumentException( - "inputs of " + opType + " have different window strategy: " - + leftWindow + " VS " + rightWindow); - } - - final TupleTag<BeamSqlRow> leftTag = new TupleTag<>(); - final TupleTag<BeamSqlRow> rightTag = new TupleTag<>(); - - // co-group - String stageName = BeamSqlRelUtils.getStageName(beamRelNode); - PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple - .of(leftTag, leftRows.apply( - stageName + "_CreateLeftIndex", MapElements.via( - new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) - .and(rightTag, rightRows.apply( - stageName + "_CreateRightIndex", MapElements.via( - new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) - .apply(CoGroupByKey.<BeamSqlRow>create()); - PCollection<BeamSqlRow> ret = coGbkResultCollection - .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, - opType, all))); - return ret; - } -}