http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java new file mode 100644 index 0000000..cb6a523 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.string; + +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * 'OVERLAY' operator. + * + * <p> + * OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ]) + * </p> + */ +public class BeamSqlOverlayExpression extends BeamSqlExpression { + public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.VARCHAR); + } + + @Override public boolean accept() { + if (operands.size() < 3 || operands.size() > 4) { + return false; + } + + if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) + || !SqlTypeName.CHAR_TYPES.contains(opType(1)) + || !SqlTypeName.INT_TYPES.contains(opType(2))) { + return false; + } + + if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) { + return false; + } + + return true; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); + String replaceStr = opValueEvaluated(1, inputRow); + int idx = opValueEvaluated(2, inputRow); + // the index is 1 based. + idx -= 1; + int length = replaceStr.length(); + if (operands.size() == 4) { + length = opValueEvaluated(3, inputRow); + } + + StringBuilder result = new StringBuilder( + str.length() + replaceStr.length() - length); + result.append(str.substring(0, idx)) + .append(replaceStr) + .append(str.substring(idx + length)); + + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString()); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java new file mode 100644 index 0000000..144acbf --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.string; + +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * String position operator. + * + * <p> + * example: + * POSITION(string1 IN string2) + * POSITION(string1 IN string2 FROM integer) + * </p> + */ +public class BeamSqlPositionExpression extends BeamSqlExpression { + public BeamSqlPositionExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.INTEGER); + } + + @Override public boolean accept() { + if (operands.size() < 2 || operands.size() > 3) { + return false; + } + + if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) + || !SqlTypeName.CHAR_TYPES.contains(opType(1))) { + return false; + } + + if (operands.size() == 3 + && !SqlTypeName.INT_TYPES.contains(opType(2))) { + return false; + } + + return true; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String targetStr = opValueEvaluated(0, inputRow); + String containingStr = opValueEvaluated(1, inputRow); + int from = -1; + if (operands.size() == 3) { + Number tmp = opValueEvaluated(2, inputRow); + from = tmp.intValue(); + } + + int idx = containingStr.indexOf(targetStr, from); + + return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java new file mode 100644 index 0000000..d931db9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.string; + +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Base class for all string unary operators. + */ +public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression { + public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + @Override public boolean accept() { + if (operands.size() != 1) { + return false; + } + + if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) { + return false; + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java new file mode 100644 index 0000000..8b33125 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.string; + +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * 'SUBSTRING' operator. + * + * <p> + * SUBSTRING(string FROM integer) + * SUBSTRING(string FROM integer FOR integer) + * </p> + */ +public class BeamSqlSubstringExpression extends BeamSqlExpression { + public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.VARCHAR); + } + + @Override public boolean accept() { + if (operands.size() < 2 || operands.size() > 3) { + return false; + } + + if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) + || !SqlTypeName.INT_TYPES.contains(opType(1))) { + return false; + } + + if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) { + return false; + } + + return true; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); + int idx = opValueEvaluated(1, inputRow); + int startIdx = idx; + if (startIdx > 0) { + // NOTE: SQL substring is 1 based(rather than 0 based) + startIdx -= 1; + } else if (startIdx < 0) { + // NOTE: SQL also support negative index... + startIdx += str.length(); + } else { + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ""); + } + + if (operands.size() == 3) { + int length = opValueEvaluated(2, inputRow); + if (length < 0) { + length = 0; + } + int endIdx = Math.min(startIdx + length, str.length()); + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx)); + } else { + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java new file mode 100644 index 0000000..5e6c2bb --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.string; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.fun.SqlTrimFunction; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Trim operator. + * + * <p> + * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2) + * </p> + */ +public class BeamSqlTrimExpression extends BeamSqlExpression { + public BeamSqlTrimExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.VARCHAR); + } + + @Override public boolean accept() { + if (operands.size() != 1 && operands.size() != 3) { + return false; + } + + if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) { + return false; + } + + if (operands.size() == 3 + && ( + SqlTypeName.SYMBOL != opType(0) + || !SqlTypeName.CHAR_TYPES.contains(opType(1)) + || !SqlTypeName.CHAR_TYPES.contains(opType(2))) + ) { + return false; + } + + return true; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + if (operands.size() == 1) { + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, + opValueEvaluated(0, inputRow).toString().trim()); + } else { + SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow); + String targetStr = opValueEvaluated(1, inputRow); + String containingStr = opValueEvaluated(2, inputRow); + + switch (type) { + case LEADING: + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr)); + case TRAILING: + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr)); + case BOTH: + default: + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, + trailingTrim(leadingTrim(containingStr, targetStr), targetStr)); + } + } + } + + static String leadingTrim(String containingStr, String targetStr) { + int idx = 0; + while (containingStr.startsWith(targetStr, idx)) { + idx += targetStr.length(); + } + + return containingStr.substring(idx); + } + + static String trailingTrim(String containingStr, String targetStr) { + int idx = containingStr.length() - targetStr.length(); + while (containingStr.startsWith(targetStr, idx)) { + idx -= targetStr.length(); + } + + idx += targetStr.length(); + return containingStr.substring(0, idx); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java new file mode 100644 index 0000000..efa9c95 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.string; + +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * 'UPPER' operator. + */ +public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression { + public BeamSqlUpperExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.VARCHAR); + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + String str = opValueEvaluated(0, inputRow); + return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java new file mode 100644 index 0000000..f2c63f3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * String operators. + */ +package org.apache.beam.dsls.sql.interpreter.operator.string; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java new file mode 100644 index 0000000..178d35f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * interpreter generate runnable 'code' to execute SQL relational expressions. + */ +package org.apache.beam.dsls.sql.interpreter; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java new file mode 100644 index 0000000..b26e8c4 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL provides a new interface to run a SQL statement with Beam. + */ +package org.apache.beam.dsls.sql; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java new file mode 100644 index 0000000..93f9a2f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The core component to handle through a SQL statement, from explain execution plan, + * to generate a Beam pipeline. + * + */ +public class BeamQueryPlanner { + private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); + + protected final Planner planner; + private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); + + public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT); + + public BeamQueryPlanner(SchemaPlus schema) { + final List<RelTraitDef> traitDefs = new ArrayList<>(); + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); + sqlOperatorTables.add(SqlStdOperatorTable.instance()); + sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, + Collections.<String>emptyList(), TYPE_FACTORY)); + + FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) + .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) + .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM) + .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)) + .build(); + this.planner = Frameworks.getPlanner(config); + + for (String t : schema.getTableNames()) { + sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); + } + } + + /** + * Parse input SQL query, and return a {@link SqlNode} as grammar tree. + */ + public SqlNode parseQuery(String sqlQuery) throws SqlParseException{ + return planner.parse(sqlQuery); + } + + /** + * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, + * which is linked with the given {@code pipeline}. The final output stream is returned as + * {@code PCollection} so more operations can be applied. + */ + public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception { + BeamRelNode relNode = convertToBeamRel(sqlStatement); + + // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. + return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv); + } + + /** + * It parses and validate the input query, then convert into a + * {@link BeamRelNode} tree. + * + */ + public BeamRelNode convertToBeamRel(String sqlStatement) + throws ValidationException, RelConversionException, SqlParseException { + BeamRelNode beamRelNode; + try { + beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); + } finally { + planner.close(); + } + return beamRelNode; + } + + private RelNode validateAndConvert(SqlNode sqlNode) + throws ValidationException, RelConversionException { + SqlNode validated = validateNode(sqlNode); + LOG.info("SQL:\n" + validated); + RelNode relNode = convertToRelNode(validated); + return convertToBeamRel(relNode); + } + + private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException { + RelTraitSet traitSet = relNode.getTraitSet(); + + LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode)); + + // PlannerImpl.transform() optimizes RelNode with ruleset + return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode); + } + + private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException { + return planner.rel(sqlNode).rel; + } + + private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { + return planner.validate(sqlNode); + } + + public Map<String, BaseBeamTable> getSourceTables() { + return sourceTables; + } + + public Planner getPlanner() { + return planner; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java new file mode 100644 index 0000000..c89a740 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; + +/** + * customized data type in Beam. + * + */ +public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl { + public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem(); + + @Override + public int getMaxNumericScale() { + return 38; + } + + @Override + public int getMaxNumericPrecision() { + return 38; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java new file mode 100644 index 0000000..552ff8f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.Iterator; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.rule.BeamAggregationRule; +import org.apache.beam.dsls.sql.rule.BeamFilterRule; +import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; +import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; +import org.apache.beam.dsls.sql.rule.BeamIntersectRule; +import org.apache.beam.dsls.sql.rule.BeamJoinRule; +import org.apache.beam.dsls.sql.rule.BeamMinusRule; +import org.apache.beam.dsls.sql.rule.BeamProjectRule; +import org.apache.beam.dsls.sql.rule.BeamSortRule; +import org.apache.beam.dsls.sql.rule.BeamUnionRule; +import org.apache.beam.dsls.sql.rule.BeamValuesRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.tools.RuleSet; + +/** + * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard + * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode} + * + */ +public class BeamRuleSets { + private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet + .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, + BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, + BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, + BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE, + BeamJoinRule.INSTANCE) + .build(); + + public static RuleSet[] getRuleSets() { + return new RuleSet[] { new BeamRuleSet( + ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) }; + } + + private static class BeamRuleSet implements RuleSet { + final ImmutableSet<RelOptRule> rules; + + public BeamRuleSet(ImmutableSet<RelOptRule> rules) { + this.rules = rules; + } + + public BeamRuleSet(ImmutableList<RelOptRule> rules) { + this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); + } + + @Override + public Iterator<RelOptRule> iterator() { + return rules.iterator(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java new file mode 100644 index 0000000..0506c5b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface. + * It defines data sources, validate a SQL statement, and convert it as a Beam + * pipeline. + */ +package org.apache.beam.dsls.sql.planner; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java new file mode 100644 index 0000000..9dcb079 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.joda.time.Duration; + +/** + * {@link BeamRelNode} to replace a {@link Aggregate} node. + * + */ +public class BeamAggregationRel extends Aggregate implements BeamRelNode { + private int windowFieldIdx = -1; + private WindowFn<BeamSqlRow, BoundedWindow> windowFn; + private Trigger trigger; + private Duration allowedLatence = Duration.ZERO; + + public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits + , RelNode child, boolean indicator, + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls + , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + this.windowFn = windowFn; + this.trigger = trigger; + this.windowFieldIdx = windowFieldIdx; + this.allowedLatence = allowedLatence; + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this) + "_"; + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + if (windowFieldIdx != -1) { + upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + .setCoder(upstream.getCoder()); + } + + PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window", + Window.into(windowFn) + .triggering(trigger) + .withAllowedLateness(allowedLatence) + .accumulatingFiredPanes()); + + BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); + PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply( + stageName + "exCombineBy", + WithKeys + .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( + windowFieldIdx, groupSet))) + .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); + + + BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); + + PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply( + stageName + "combineBy", + Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( + new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(), + CalciteUtils.toBeamRowType(input.getRowType())))) + .setCoder(KvCoder.of(keyCoder, aggCoder)); + + PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", + ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( + CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx))); + mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return mergedStream; + } + + /** + * Type of sub-rowrecord used as Group-By keys. + */ + private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) { + BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType); + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (int i : groupSet.asList()) { + if (i != windowFieldIdx) { + fieldNames.add(inputRowType.getFieldsName().get(i)); + fieldTypes.add(inputRowType.getFieldsType().get(i)); + } + } + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + + /** + * Type of sub-rowrecord, that represents the list of aggregation fields. + */ + private BeamSqlRowType exAggFieldsSchema() { + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (AggregateCall ac : getAggCallList()) { + fieldNames.add(ac.name); + fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); + } + + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + + @Override + public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator + , ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + return new BeamAggregationRel(getCluster(), traitSet, input, indicator + , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence); + } + + public void setWindowFn(WindowFn windowFn) { + this.windowFn = windowFn; + } + + public void setTrigger(Trigger trigger) { + this.trigger = trigger; + } + + public RelWriter explainTerms(RelWriter pw) { + // We skip the "groups" element if it is a singleton of "group". + pw.item("group", groupSet) + .itemIf("window", windowFn, windowFn != null) + .itemIf("trigger", trigger, trigger != null) + .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1) + .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE) + .itemIf("indicator", indicator, indicator) + .itemIf("aggs", aggCalls, pw.nest()); + if (!pw.nest()) { + for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) { + pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e); + } + } + return pw; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java new file mode 100644 index 0000000..f802104 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code Filter} node. + * + */ +public class BeamFilterRel extends Filter implements BeamRelNode { + + public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, + RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new BeamFilterRel(getCluster(), traitSet, input, condition); + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this); + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + + BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); + + PCollection<BeamSqlRow> filterStream = upstream.apply(stageName, + ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); + filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return filterStream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java new file mode 100644 index 0000000..d70f94a --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import com.google.common.base.Joiner; +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code TableModify} node. + * + */ +public class BeamIOSinkRel extends TableModify implements BeamRelNode { + public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, + Prepare.CatalogReader catalogReader, RelNode child, Operation operation, + List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, + sourceExpressionList, flattened); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs), + getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); + } + + /** + * Note that {@code BeamIOSinkRel} returns the input PCollection, + * which is the persisted PCollection. + */ + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this); + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); + + BaseBeamTable targetTable = sqlEnv.findTable(sourceName); + + upstream.apply(stageName, targetTable.buildIOWriter()); + + return upstream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java new file mode 100644 index 0000000..6754991 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import com.google.common.base.Joiner; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.TableScan; + +/** + * BeamRelNode to replace a {@code TableScan} node. + * + */ +public class BeamIOSourceRel extends TableScan implements BeamRelNode { + + public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); + + TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName); + if (inputPCollections.has(sourceTupleTag)) { + //choose PCollection from input PCollectionTuple if exists there. + PCollection<BeamSqlRow> sourceStream = inputPCollections + .get(new TupleTag<BeamSqlRow>(sourceName)); + return sourceStream; + } else { + //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). + BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); + return sourceTable.buildIOReader(inputPCollections.getPipeline()) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java new file mode 100644 index 0000000..7cab171 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Intersect} node. + * + * <p>This is used to combine two SELECT statements, but returns rows only from the + * first SELECT statement that are identical to a row in the second SELECT statement. + */ +public class BeamIntersectRel extends Intersect implements BeamRelNode { + private BeamSetOperatorRelBase delegate; + public BeamIntersectRel( + RelOptCluster cluster, + RelTraitSet traits, + List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamIntersectRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java new file mode 100644 index 0000000..3ebf152 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.dsls.sql.transform.BeamJoinTransforms; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; + +/** + * {@code BeamRelNode} to replace a {@code Join} node. + * + * <p>Support for join can be categorized into 3 cases: + * <ul> + * <li>BoundedTable JOIN BoundedTable</li> + * <li>UnboundedTable JOIN UnboundedTable</li> + * <li>BoundedTable JOIN UnboundedTable</li> + * </ul> + * + * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both + * sides match. + * + * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some + * constraints: + * + * <ul> + * <li>{@code FULL OUTER JOIN} is not supported.</li> + * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li> + * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li> + * </ul> + * + * + * <p>There are also some general constraints: + * + * <ul> + * <li>Only equi-join is supported.</li> + * <li>CROSS JOIN is not supported.</li> + * </ul> + */ +public class BeamJoinRel extends Join implements BeamRelNode { + public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { + super(cluster, traits, left, right, condition, variablesSet, joinType); + } + + @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, + RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet, + joinType); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, + BeamSqlEnv sqlEnv) + throws Exception { + BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left); + BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType()); + PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + + final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right); + PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv); + + String stageName = BeamSqlRelUtils.getStageName(this); + WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn(); + + // extract the join fields + List<Pair<Integer, Integer>> pairs = extractJoinColumns( + leftRelNode.getRowType().getFieldCount()); + + // build the extract key type + // the name of the join field is not important + List<String> names = new ArrayList<>(pairs.size()); + List<Integer> types = new ArrayList<>(pairs.size()); + for (int i = 0; i < pairs.size(); i++) { + names.add("c" + i); + types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey())); + } + BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types); + + Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType); + + // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow> + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows + .apply(stageName + "_left_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder())); + + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows + .apply(stageName + "_right_ExtractJoinFields", + MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs))) + .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder())); + + // prepare the NullRows + BeamSqlRow leftNullRow = buildNullRow(leftRelNode); + BeamSqlRow rightNullRow = buildNullRow(rightRelNode); + + // a regular join + if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) { + try { + leftWinFn.verifyCompatibility(rightWinFn); + } catch (IncompatibleWindowException e) { + throw new IllegalArgumentException( + "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e); + } + + return standardJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow, stageName); + } else if ( + (leftRows.isBounded() == PCollection.IsBounded.BOUNDED + && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED) + || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED) + ) { + // if one of the sides is Bounded & the other is Unbounded + // then do a sideInput join + // when doing a sideInput join, the windowFn does not need to match + // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be + // the unbounded + if (joinType == JoinRelType.FULL) { + throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join " + + "a bounded table with an unbounded table."); + } + + if ((joinType == JoinRelType.LEFT + && leftRows.isBounded() == PCollection.IsBounded.BOUNDED) + || (joinType == JoinRelType.RIGHT + && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) { + throw new UnsupportedOperationException( + "LEFT side of an OUTER JOIN must be Unbounded table."); + } + + return sideInputJoin(extractedLeftRows, extractedRightRows, + leftNullRow, rightNullRow); + } else { + throw new UnsupportedOperationException( + "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn); + } + } + + private PCollection<BeamSqlRow> standardJoin( + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) { + PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null; + switch (joinType) { + case LEFT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow); + break; + case RIGHT: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow); + break; + case FULL: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow, + rightNullRow); + break; + case INNER: + default: + joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join + .innerJoin(extractedLeftRows, extractedRightRows); + break; + } + + PCollection<BeamSqlRow> ret = joinedRows + .apply(stageName + "_JoinParts2WholeRow", + MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + return ret; + } + + public PCollection<BeamSqlRow> sideInputJoin( + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows, + BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) { + // we always make the Unbounded table on the left to do the sideInput join + // (will convert the result accordingly before return) + boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED); + JoinRelType realJoinType = + (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType; + + PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows = + swapped ? extractedRightRows : extractedLeftRows; + PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows = + swapped ? extractedLeftRows : extractedRightRows; + BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow; + + // swapped still need to pass down because, we need to swap the result back. + return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows, + realRightNullRow, swapped); + } + + private PCollection<BeamSqlRow> sideInputJoinHelper( + JoinRelType joinType, + PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows, + PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows, + BeamSqlRow rightNullRow, boolean swapped) { + final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows + .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap()); + + PCollection<BeamSqlRow> ret = leftRows + .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn( + joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView)) + .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return ret; + } + + private BeamSqlRow buildNullRow(BeamRelNode relNode) { + BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); + BeamSqlRow nullRow = new BeamSqlRow(leftType); + for (int i = 0; i < leftType.size(); i++) { + nullRow.addField(i, null); + } + return nullRow; + } + + private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { + // it's a CROSS JOIN because: condition == true + if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) { + throw new UnsupportedOperationException("CROSS JOIN is not supported!"); + } + + RexCall call = (RexCall) condition; + List<Pair<Integer, Integer>> pairs = new ArrayList<>(); + if ("AND".equals(call.getOperator().getName())) { + List<RexNode> operands = call.getOperands(); + for (RexNode rexNode : operands) { + Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount); + pairs.add(pair); + } + } else if ("=".equals(call.getOperator().getName())) { + pairs.add(extractOneJoinColumn(call, leftRowColumnCount)); + } else { + throw new UnsupportedOperationException( + "Operator " + call.getOperator().getName() + " is not supported in join condition"); + } + + return pairs; + } + + private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition, + int leftRowColumnCount) { + List<RexNode> operands = oneCondition.getOperands(); + final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + + final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(), + ((RexInputRef) operands.get(1)).getIndex()); + final int rightIndex = rightIndex1 - leftRowColumnCount; + + return new Pair<>(leftIndex, rightIndex); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java new file mode 100644 index 0000000..704a374 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; + +/** + * Convertion for Beam SQL. + * + */ +public enum BeamLogicalConvention implements Convention { + INSTANCE; + + @Override + public Class getInterface() { + return BeamRelNode.class; + } + + @Override + public String getName() { + return "BEAM_LOGICAL"; + } + + @Override + public RelTraitDef getTraitDef() { + return ConventionTraitDef.INSTANCE; + } + + @Override + public boolean satisfies(RelTrait trait) { + return this == trait; + } + + @Override + public void register(RelOptPlanner planner) { + } + + @Override + public String toString() { + return getName(); + } + + @Override + public boolean canConvertConvention(Convention toConvention) { + return false; + } + + @Override + public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java new file mode 100644 index 0000000..b558f4b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.SetOp; + +/** + * {@code BeamRelNode} to replace a {@code Minus} node. + * + * <p>Corresponds to the SQL {@code EXCEPT} operator. + */ +public class BeamMinusRel extends Minus implements BeamRelNode { + + private BeamSetOperatorRelBase delegate; + + public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, + boolean all) { + super(cluster, traits, inputs, all); + delegate = new BeamSetOperatorRelBase(this, + BeamSetOperatorRelBase.OpType.MINUS, inputs, all); + } + + @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) { + return new BeamMinusRel(getCluster(), traitSet, inputs, all); + } + + @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java new file mode 100644 index 0000000..8f8e5ce --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; + +/** + * BeamRelNode to replace a {@code Project} node. + * + */ +public class BeamProjectRel extends Project implements BeamRelNode { + + /** + * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}. + * + */ + public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, + List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, + RelDataType rowType) { + return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + RelNode input = getInput(); + String stageName = BeamSqlRelUtils.getStageName(this); + + PCollection<BeamSqlRow> upstream = + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); + + BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); + + PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo + .of(new BeamSqlProjectFn(getRelTypeName(), executor, + CalciteUtils.toBeamRowType(rowType)))); + projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); + + return projectStream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java new file mode 100644 index 0000000..d4c98a3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.calcite.rel.RelNode; + +/** + * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added. + */ +public interface BeamRelNode extends RelNode { + + /** + * A {@link BeamRelNode} is a recursive structure, the + * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) + * algorithm. + */ + PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java new file mode 100644 index 0000000..939c9c8 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.rel.RelNode; + +/** + * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel} + * and {@code BeamMinusRel}. + */ +public class BeamSetOperatorRelBase { + /** + * Set operator type. + */ + public enum OpType implements Serializable { + UNION, + INTERSECT, + MINUS + } + + private BeamRelNode beamRelNode; + private List<RelNode> inputs; + private boolean all; + private OpType opType; + + public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, + List<RelNode> inputs, boolean all) { + this.beamRelNode = beamRelNode; + this.opType = opType; + this.inputs = inputs; + this.all = all; + } + + public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) + .buildBeamPipeline(inputPCollections, sqlEnv); + PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) + .buildBeamPipeline(inputPCollections, sqlEnv); + + WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); + WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); + if (!leftWindow.isCompatible(rightWindow)) { + throw new IllegalArgumentException( + "inputs of " + opType + " have different window strategy: " + + leftWindow + " VS " + rightWindow); + } + + final TupleTag<BeamSqlRow> leftTag = new TupleTag<>(); + final TupleTag<BeamSqlRow> rightTag = new TupleTag<>(); + + // co-group + String stageName = BeamSqlRelUtils.getStageName(beamRelNode); + PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple + .of(leftTag, leftRows.apply( + stageName + "_CreateLeftIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) + .and(rightTag, rightRows.apply( + stageName + "_CreateRightIndex", MapElements.via( + new BeamSetOperatorsTransforms.BeamSqlRow2KvFn()))) + .apply(CoGroupByKey.<BeamSqlRow>create()); + PCollection<BeamSqlRow> ret = coGbkResultCollection + .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag, + opType, all))); + return ret; + } +}
