http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java new file mode 100644 index 0000000..b80e045 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.utils; + +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Utility methods for Calcite related operations. + */ +public class CalciteUtils { + private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>(); + private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>(); + static { + JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT); + JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT); + JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER); + JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT); + + JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT); + JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE); + + JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL); + + JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR); + JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR); + + JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE); + JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME); + JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP); + + JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN); + + for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) { + CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey()); + } + } + + /** + * Get the corresponding {@code SqlTypeName} for an integer sql type. + */ + public static SqlTypeName toCalciteType(int type) { + return JAVA_TO_CALCITE_MAPPING.get(type); + } + + /** + * Get the integer sql type from Calcite {@code SqlTypeName}. + */ + public static Integer toJavaType(SqlTypeName typeName) { + return CALCITE_TO_JAVA_MAPPING.get(typeName); + } + + /** + * Get the {@code SqlTypeName} for the specified column of a table. + */ + public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) { + return toCalciteType(schema.getFieldsType().get(index)); + } + + /** + * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table. + */ + public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) { + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (RelDataTypeField f : tableInfo.getFieldList()) { + fieldNames.add(f.getName()); + fieldTypes.add(toJavaType(f.getType().getSqlTypeName())); + } + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + + /** + * Create an instance of {@code RelDataType} so it can be used to create a table. + */ + public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) { + return new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a) { + RelDataTypeFactory.FieldInfoBuilder builder = a.builder(); + for (int idx = 0; idx < that.getFieldsName().size(); ++idx) { + builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx))); + } + return builder.build(); + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java new file mode 100644 index 0000000..b00ed0c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utility classes. + */ +package org.apache.beam.sdk.extensions.sql.impl.utils; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java deleted file mode 100644 index 28f83e4..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; - -/** - * {@code BeamSqlExpressionExecutor} fills the gap between relational - * expressions in Calcite SQL and executable code. - * - */ -public interface BeamSqlExpressionExecutor extends Serializable { - - /** - * invoked before data processing. - */ - void prepare(); - - /** - * apply transformation to input record {@link BeamSqlRow}. - * - */ - List<Object> execute(BeamSqlRow inputRow); - - void close(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java deleted file mode 100644 index 3084cd5..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter; - -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCastExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlReinterpretExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlUdfExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowEndExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowStartExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAbsExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAcosExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAsinExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtan2Expression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtanExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCeilExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCosExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCotExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlDegreesExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlExpExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlFloorExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLnExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLogExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPiExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPowerExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRadiansExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandIntegerExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRoundExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSignExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSinExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTanExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTruncateExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression; -import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlUserDefinedFunction; -import org.apache.calcite.util.NlsString; - -/** - * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. - * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression}, - * which can be evaluated against the {@link BeamSqlRow}. - * - */ -public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { - protected List<BeamSqlExpression> exps; - - public BeamSqlFnExecutor(BeamRelNode relNode) { - this.exps = new ArrayList<>(); - if (relNode instanceof BeamFilterRel) { - BeamFilterRel filterNode = (BeamFilterRel) relNode; - RexNode condition = filterNode.getCondition(); - exps.add(buildExpression(condition)); - } else if (relNode instanceof BeamProjectRel) { - BeamProjectRel projectNode = (BeamProjectRel) relNode; - List<RexNode> projects = projectNode.getProjects(); - for (RexNode rexNode : projects) { - exps.add(buildExpression(rexNode)); - } - } else { - throw new UnsupportedOperationException( - String.format("%s is not supported yet!", relNode.getClass().toString())); - } - } - - /** - * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively, - * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}. - */ - static BeamSqlExpression buildExpression(RexNode rexNode) { - BeamSqlExpression ret = null; - if (rexNode instanceof RexLiteral) { - RexLiteral node = (RexLiteral) rexNode; - SqlTypeName type = node.getTypeName(); - Object value = node.getValue(); - - if (SqlTypeName.CHAR_TYPES.contains(type) - && node.getValue() instanceof NlsString) { - // NlsString is not serializable, we need to convert - // it to string explicitly. - return BeamSqlPrimitive.of(type, ((NlsString) value).getValue()); - } else if (type == SqlTypeName.DATE && value instanceof Calendar) { - // does this actually make sense? - // Calcite actually treat Calendar as the java type of Date Literal - return BeamSqlPrimitive.of(type, ((Calendar) value).getTime()); - } else { - // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different - // e.g. sql: "select 1" - // here the literal 1 will be parsed as a RexLiteral where: - // node.getType().getSqlTypeName() = INTEGER (the display type) - // node.getSqlTypeName() = DECIMAL (the actual internal storage format) - // So we need to do a convert here. - // check RexBuilder#makeLiteral for more information. - SqlTypeName realType = node.getType().getSqlTypeName(); - Object realValue = value; - if (type == SqlTypeName.DECIMAL) { - BigDecimal rawValue = (BigDecimal) value; - switch (realType) { - case TINYINT: - realValue = (byte) rawValue.intValue(); - break; - case SMALLINT: - realValue = (short) rawValue.intValue(); - break; - case INTEGER: - realValue = rawValue.intValue(); - break; - case BIGINT: - realValue = rawValue.longValue(); - break; - case DECIMAL: - realValue = rawValue; - break; - default: - throw new IllegalStateException("type/realType mismatch: " - + type + " VS " + realType); - } - } else if (type == SqlTypeName.DOUBLE) { - Double rawValue = (Double) value; - if (realType == SqlTypeName.FLOAT) { - realValue = rawValue.floatValue(); - } - } - return BeamSqlPrimitive.of(realType, realValue); - } - } else if (rexNode instanceof RexInputRef) { - RexInputRef node = (RexInputRef) rexNode; - ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex()); - } else if (rexNode instanceof RexCall) { - RexCall node = (RexCall) rexNode; - String opName = node.op.getName(); - List<BeamSqlExpression> subExps = new ArrayList<>(); - for (RexNode subNode : node.getOperands()) { - subExps.add(buildExpression(subNode)); - } - switch (opName) { - // logical operators - case "AND": - ret = new BeamSqlAndExpression(subExps); - break; - case "OR": - ret = new BeamSqlOrExpression(subExps); - break; - case "NOT": - ret = new BeamSqlNotExpression(subExps); - break; - case "=": - ret = new BeamSqlEqualsExpression(subExps); - break; - case "<>": - ret = new BeamSqlNotEqualsExpression(subExps); - break; - case ">": - ret = new BeamSqlGreaterThanExpression(subExps); - break; - case ">=": - ret = new BeamSqlGreaterThanOrEqualsExpression(subExps); - break; - case "<": - ret = new BeamSqlLessThanExpression(subExps); - break; - case "<=": - ret = new BeamSqlLessThanOrEqualsExpression(subExps); - break; - - // arithmetic operators - case "+": - ret = new BeamSqlPlusExpression(subExps); - break; - case "-": - ret = new BeamSqlMinusExpression(subExps); - break; - case "*": - ret = new BeamSqlMultiplyExpression(subExps); - break; - case "/": - case "/INT": - ret = new BeamSqlDivideExpression(subExps); - break; - case "MOD": - ret = new BeamSqlModExpression(subExps); - break; - - case "ABS": - ret = new BeamSqlAbsExpression(subExps); - break; - case "ROUND": - ret = new BeamSqlRoundExpression(subExps); - break; - case "LN": - ret = new BeamSqlLnExpression(subExps); - break; - case "LOG10": - ret = new BeamSqlLogExpression(subExps); - break; - case "EXP": - ret = new BeamSqlExpExpression(subExps); - break; - case "ACOS": - ret = new BeamSqlAcosExpression(subExps); - break; - case "ASIN": - ret = new BeamSqlAsinExpression(subExps); - break; - case "ATAN": - ret = new BeamSqlAtanExpression(subExps); - break; - case "COT": - ret = new BeamSqlCotExpression(subExps); - break; - case "DEGREES": - ret = new BeamSqlDegreesExpression(subExps); - break; - case "RADIANS": - ret = new BeamSqlRadiansExpression(subExps); - break; - case "COS": - ret = new BeamSqlCosExpression(subExps); - break; - case "SIN": - ret = new BeamSqlSinExpression(subExps); - break; - case "TAN": - ret = new BeamSqlTanExpression(subExps); - break; - case "SIGN": - ret = new BeamSqlSignExpression(subExps); - break; - case "POWER": - ret = new BeamSqlPowerExpression(subExps); - break; - case "PI": - ret = new BeamSqlPiExpression(); - break; - case "ATAN2": - ret = new BeamSqlAtan2Expression(subExps); - break; - case "TRUNCATE": - ret = new BeamSqlTruncateExpression(subExps); - break; - case "RAND": - ret = new BeamSqlRandExpression(subExps); - break; - case "RAND_INTEGER": - ret = new BeamSqlRandIntegerExpression(subExps); - break; - - // string operators - case "||": - ret = new BeamSqlConcatExpression(subExps); - break; - case "POSITION": - ret = new BeamSqlPositionExpression(subExps); - break; - case "CHAR_LENGTH": - case "CHARACTER_LENGTH": - ret = new BeamSqlCharLengthExpression(subExps); - break; - case "UPPER": - ret = new BeamSqlUpperExpression(subExps); - break; - case "LOWER": - ret = new BeamSqlLowerExpression(subExps); - break; - case "TRIM": - ret = new BeamSqlTrimExpression(subExps); - break; - case "SUBSTRING": - ret = new BeamSqlSubstringExpression(subExps); - break; - case "OVERLAY": - ret = new BeamSqlOverlayExpression(subExps); - break; - case "INITCAP": - ret = new BeamSqlInitCapExpression(subExps); - break; - - // date functions - case "Reinterpret": - return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName()); - case "CEIL": - if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { - return new BeamSqlCeilExpression(subExps); - } else { - return new BeamSqlDateCeilExpression(subExps); - } - case "FLOOR": - if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { - return new BeamSqlFloorExpression(subExps); - } else { - return new BeamSqlDateFloorExpression(subExps); - } - case "EXTRACT_DATE": - case "EXTRACT": - return new BeamSqlExtractExpression(subExps); - - case "LOCALTIME": - case "CURRENT_TIME": - return new BeamSqlCurrentTimeExpression(subExps); - - case "CURRENT_TIMESTAMP": - case "LOCALTIMESTAMP": - return new BeamSqlCurrentTimestampExpression(subExps); - - case "CURRENT_DATE": - return new BeamSqlCurrentDateExpression(); - - - case "CASE": - ret = new BeamSqlCaseExpression(subExps); - break; - case "CAST": - ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName()); - break; - - case "IS NULL": - ret = new BeamSqlIsNullExpression(subExps.get(0)); - break; - case "IS NOT NULL": - ret = new BeamSqlIsNotNullExpression(subExps.get(0)); - break; - - case "HOP": - case "TUMBLE": - case "SESSION": - ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName()); - break; - case "HOP_START": - case "TUMBLE_START": - case "SESSION_START": - ret = new BeamSqlWindowStartExpression(); - break; - case "HOP_END": - case "TUMBLE_END": - case "SESSION_END": - ret = new BeamSqlWindowEndExpression(); - break; - default: - //handle UDF - if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) { - SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator(); - ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction(); - ret = new BeamSqlUdfExpression(fn.method, subExps, - ((RexCall) rexNode).type.getSqlTypeName()); - } else { - throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!"); - } - } - } else { - throw new UnsupportedOperationException( - String.format("%s is not supported yet!", rexNode.getClass().toString())); - } - - if (ret != null && !ret.accept()) { - throw new IllegalStateException(ret.getClass().getSimpleName() - + " does not accept the operands.(" + rexNode + ")"); - } - - return ret; - } - - @Override - public void prepare() { - } - - @Override - public List<Object> execute(BeamSqlRow inputRow) { - List<Object> results = new ArrayList<>(); - for (BeamSqlExpression exp : exps) { - results.add(exp.evaluate(inputRow).getValue()); - } - return results; - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java deleted file mode 100644 index bfbb33e..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL. - */ -public class BeamSqlCaseExpression extends BeamSqlExpression { - public BeamSqlCaseExpression(List<BeamSqlExpression> operands) { - // the return type of CASE is the type of the `else` condition - super(operands, operands.get(operands.size() - 1).getOutputType()); - } - - @Override public boolean accept() { - // `when`-`then` pair + `else` - if (operands.size() % 2 != 1) { - return false; - } - - for (int i = 0; i < operands.size() - 1; i += 2) { - if (opType(i) != SqlTypeName.BOOLEAN) { - return false; - } else if (opType(i + 1) != outputType) { - return false; - } - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - for (int i = 0; i < operands.size() - 1; i += 2) { - if (opValueEvaluated(i, inputRow)) { - return BeamSqlPrimitive.of( - outputType, - opValueEvaluated(i + 1, inputRow) - ); - } - } - return BeamSqlPrimitive.of(outputType, - opValueEvaluated(operands.size() - 1, inputRow)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java deleted file mode 100644 index 08abcc6..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.sql.Date; -import java.sql.Timestamp; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.DateTimeFormatterBuilder; -import org.joda.time.format.DateTimeParser; - -/** - * Base class to support 'CAST' operations for all {@link SqlTypeName}. - */ -public class BeamSqlCastExpression extends BeamSqlExpression { - - private static final int index = 0; - private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss"; - private static final String outputDateFormat = "yyyy-MM-dd"; - /** - * Date and Timestamp formats used to parse - * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}. - */ - private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder() - .append(null/*printer*/, new DateTimeParser[] { - // date formats - DateTimeFormat.forPattern("yy-MM-dd").getParser(), - DateTimeFormat.forPattern("yy/MM/dd").getParser(), - DateTimeFormat.forPattern("yy.MM.dd").getParser(), - DateTimeFormat.forPattern("yyMMdd").getParser(), - DateTimeFormat.forPattern("yyyyMMdd").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd").getParser(), - DateTimeFormat.forPattern("yyyy/MM/dd").getParser(), - DateTimeFormat.forPattern("yyyy.MM.dd").getParser(), - // datetime formats - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter() - .withPivotYear(2020); - - public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) { - super(operands, castType); - } - - @Override - public boolean accept() { - return numberOfOperands() == 1; - } - - @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - SqlTypeName castOutputType = getOutputType(); - switch (castOutputType) { - case INTEGER: - return BeamSqlPrimitive - .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow))); - case DOUBLE: - return BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow))); - case SMALLINT: - return BeamSqlPrimitive - .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow))); - case TINYINT: - return BeamSqlPrimitive - .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow))); - case BIGINT: - return BeamSqlPrimitive - .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow))); - case DECIMAL: - return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, - SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow))); - case FLOAT: - return BeamSqlPrimitive - .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow))); - case CHAR: - case VARCHAR: - return BeamSqlPrimitive - .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString()); - case DATE: - return BeamSqlPrimitive - .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat)); - case TIMESTAMP: - return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat)); - } - throw new UnsupportedOperationException( - String.format("Cast to type %s not supported", castOutputType)); - } - - private Date toDate(Object inputDate, String outputFormat) { - try { - return Date - .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat)); - } catch (IllegalArgumentException | UnsupportedOperationException e) { - throw new UnsupportedOperationException("Can't be cast to type 'Date'"); - } - } - - private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) { - try { - return Timestamp.valueOf( - dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute() - .roundCeilingCopy().toString(outputFormat)); - } catch (IllegalArgumentException | UnsupportedOperationException e) { - throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java deleted file mode 100644 index cb8baac..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite. - * - * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression} - * as its operands, and return a value with type {@link SqlTypeName}. - * - */ -public abstract class BeamSqlExpression implements Serializable { - protected List<BeamSqlExpression> operands; - protected SqlTypeName outputType; - - protected BeamSqlExpression(){} - - public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - this.operands = operands; - this.outputType = outputType; - } - - public BeamSqlExpression op(int idx) { - return operands.get(idx); - } - - public SqlTypeName opType(int idx) { - return op(idx).getOutputType(); - } - - public <T> T opValueEvaluated(int idx, BeamSqlRow row) { - return (T) op(idx).evaluate(row).getValue(); - } - - /** - * assertion to make sure the input and output are supported in this expression. - */ - public abstract boolean accept(); - - /** - * Apply input record {@link BeamSqlRow} to this expression, - * the output value is wrapped with {@link BeamSqlPrimitive}. - */ - public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow); - - public List<BeamSqlExpression> getOperands() { - return operands; - } - - public SqlTypeName getOutputType() { - return outputType; - } - - public int numberOfOperands() { - return operands.size(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java deleted file mode 100644 index 7ba4a46..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * An primitive operation for direct field extraction. - */ -public class BeamSqlInputRefExpression extends BeamSqlExpression { - private int inputRef; - - public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) { - super(null, sqlTypeName); - this.inputRef = inputRef; - } - - @Override - public boolean accept() { - return true; - } - - @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java deleted file mode 100644 index 6a8216b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.math.BigDecimal; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.util.NlsString; - -/** - * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}. - * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}. - * - */ -public class BeamSqlPrimitive<T> extends BeamSqlExpression { - private T value; - - private BeamSqlPrimitive() { - } - - private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - /** - * A builder function to create from Type and value directly. - */ - public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){ - BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>(); - exp.outputType = outputType; - exp.value = value; - if (!exp.accept()) { - throw new IllegalArgumentException( - String.format("value [%s] doesn't match type [%s].", value, outputType)); - } - return exp; - } - - public SqlTypeName getOutputType() { - return outputType; - } - - public T getValue() { - return value; - } - - public long getLong() { - return (Long) getValue(); - } - - public double getDouble() { - return (Double) getValue(); - } - - public float getFloat() { - return (Float) getValue(); - } - - public int getInteger() { - return (Integer) getValue(); - } - - public short getShort() { - return (Short) getValue(); - } - - public byte getByte() { - return (Byte) getValue(); - } - public boolean getBoolean() { - return (Boolean) getValue(); - } - - public String getString() { - return (String) getValue(); - } - - public Date getDate() { - return (Date) getValue(); - } - - public BigDecimal getDecimal() { - return (BigDecimal) getValue(); - } - - @Override - public boolean accept() { - if (value == null) { - return true; - } - - switch (outputType) { - case BIGINT: - return value instanceof Long; - case DECIMAL: - return value instanceof BigDecimal; - case DOUBLE: - return value instanceof Double; - case FLOAT: - return value instanceof Float; - case INTEGER: - return value instanceof Integer; - case SMALLINT: - return value instanceof Short; - case TINYINT: - return value instanceof Byte; - case BOOLEAN: - return value instanceof Boolean; - case CHAR: - case VARCHAR: - return value instanceof String || value instanceof NlsString; - case TIME: - return value instanceof GregorianCalendar; - case TIMESTAMP: - case DATE: - return value instanceof Date; - case INTERVAL_HOUR: - return value instanceof BigDecimal; - case INTERVAL_MINUTE: - return value instanceof BigDecimal; - case SYMBOL: - // for SYMBOL, it supports anything... - return true; - default: - throw new UnsupportedOperationException(outputType.name()); - } - } - - @Override - public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) { - return this; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java deleted file mode 100644 index 7b4894a..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlExpression} for REINTERPRET. - * - * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES} - * to {@code BIGINT} is supported. - */ -public class BeamSqlReinterpretExpression extends BeamSqlExpression { - public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - @Override public boolean accept() { - return getOperands().size() == 1 - && outputType == SqlTypeName.BIGINT - && SqlTypeName.DATETIME_TYPES.contains(opType(0)); - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - if (opType(0) == SqlTypeName.TIME) { - GregorianCalendar date = opValueEvaluated(0, inputRow); - return BeamSqlPrimitive.of(outputType, date.getTimeInMillis()); - - } else { - Date date = opValueEvaluated(0, inputRow); - return BeamSqlPrimitive.of(outputType, date.getTime()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java deleted file mode 100644 index 42e511d..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * invoke a UDF function. - */ -public class BeamSqlUdfExpression extends BeamSqlExpression { - //as Method is not Serializable, need to keep class/method information, and rebuild it. - private transient Method method; - private String className; - private String methodName; - private List<String> paraClassName = new ArrayList<>(); - - public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps, - SqlTypeName sqlTypeName) { - super(subExps, sqlTypeName); - this.method = method; - - this.className = method.getDeclaringClass().getName(); - this.methodName = method.getName(); - for (Class<?> c : method.getParameterTypes()) { - paraClassName.add(c.getName()); - } - } - - @Override - public boolean accept() { - return true; - } - - @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - if (method == null) { - reConstructMethod(); - } - try { - List<Object> paras = new ArrayList<>(); - for (BeamSqlExpression e : getOperands()) { - paras.add(e.evaluate(inputRow).getValue()); - } - - return BeamSqlPrimitive.of(getOutputType(), - method.invoke(null, paras.toArray(new Object[]{}))); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - /** - * re-construct method from class/method. - */ - private void reConstructMethod() { - try { - List<Class<?>> paraClass = new ArrayList<>(); - for (String pc : paraClassName) { - paraClass.add(Class.forName(pc)); - } - method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {})); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java deleted file mode 100644 index 76f602c..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.util.Date; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation. - * - * <p>These operators returns the <em>end</em> timestamp of window. - */ -public class BeamSqlWindowEndExpression extends BeamSqlExpression { - - @Override - public boolean accept() { - return true; - } - - @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { - return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - new Date(inputRow.getWindowEnd().getMillis())); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java deleted file mode 100644 index 21ec6dc..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.util.Date; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation. - * - * <p>These functions don't change the timestamp field, instead it's used to indicate - * the event_timestamp field, and how the window is defined. - */ -public class BeamSqlWindowExpression extends BeamSqlExpression { - - public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - @Override - public boolean accept() { - return operands.get(0).getOutputType().equals(SqlTypeName.DATE) - || operands.get(0).getOutputType().equals(SqlTypeName.TIME) - || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP); - } - - @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { - return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - (Date) operands.get(0).evaluate(inputRow).getValue()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java deleted file mode 100644 index a38fd12..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.util.Date; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START}, - * {@code SESSION_START} operation. - * - * <p>These operators returns the <em>start</em> timestamp of window. - */ -public class BeamSqlWindowStartExpression extends BeamSqlExpression { - - @Override - public boolean accept() { - return true; - } - - @Override - public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { - return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - new Date(inputRow.getWindowStart().getMillis())); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java deleted file mode 100644 index 67a35fc..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic; - -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Base class for all arithmetic operators. - */ -public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { - private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>(); - static { - ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT); - ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT); - ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER); - ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT); - ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT); - ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE); - ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL); - } - - protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) { - super(operands, deduceOutputType(operands.get(0).getOutputType(), - operands.get(1).getOutputType())); - } - - protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { - BigDecimal left = BigDecimal.valueOf( - Double.valueOf(opValueEvaluated(0, inputRow).toString())); - BigDecimal right = BigDecimal.valueOf( - Double.valueOf(opValueEvaluated(1, inputRow).toString())); - - BigDecimal result = calc(left, right); - return getCorrectlyTypedResult(result); - } - - protected abstract BigDecimal calc(BigDecimal left, BigDecimal right); - - protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) { - int leftIndex = ORDERED_APPROX_TYPES.indexOf(left); - int rightIndex = ORDERED_APPROX_TYPES.indexOf(right); - if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT) - && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) { - return SqlTypeName.DOUBLE; - } - - if (leftIndex < rightIndex) { - return right; - } else if (leftIndex > rightIndex) { - return left; - } else { - return left; - } - } - - @Override public boolean accept() { - if (operands.size() != 2) { - return false; - } - - for (BeamSqlExpression operand : operands) { - if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) { - return false; - } - } - return true; - } - - protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) { - Number actualValue; - switch (outputType) { - case TINYINT: - actualValue = rawResult.byteValue(); - break; - case SMALLINT: - actualValue = rawResult.shortValue(); - break; - case INTEGER: - actualValue = rawResult.intValue(); - break; - case BIGINT: - actualValue = rawResult.longValue(); - break; - case FLOAT: - actualValue = rawResult.floatValue(); - break; - case DOUBLE: - actualValue = rawResult.doubleValue(); - break; - case DECIMAL: - default: - actualValue = rawResult; - } - return BeamSqlPrimitive.of(outputType, actualValue); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java deleted file mode 100644 index fbe3fc4..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic; - -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; - -/** - * '/' operator. - */ -public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression { - public BeamSqlDivideExpression(List<BeamSqlExpression> operands) { - super(operands); - } - - @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { - return left.divide(right, 10, RoundingMode.HALF_EVEN); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java deleted file mode 100644 index 0241574..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic; - -import java.math.BigDecimal; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; - -/** - * '-' operator. - */ -public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression { - public BeamSqlMinusExpression(List<BeamSqlExpression> operands) { - super(operands); - } - - @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { - return left.subtract(right); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java deleted file mode 100644 index fc137da..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic; - -import java.math.BigDecimal; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; - -/** - * '%' operator. - */ -public class BeamSqlModExpression extends BeamSqlArithmeticExpression { - public BeamSqlModExpression(List<BeamSqlExpression> operands) { - super(operands, operands.get(1).getOutputType()); - } - - @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { - return BigDecimal.valueOf(left.doubleValue() % right.doubleValue()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java deleted file mode 100644 index 7ea974c..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic; - -import java.math.BigDecimal; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; - -/** - * '*' operator. - */ -public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression { - public BeamSqlMultiplyExpression(List<BeamSqlExpression> operands) { - super(operands); - } - - @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { - return left.multiply(right); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java deleted file mode 100644 index 3ce806f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic; - -import java.math.BigDecimal; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; - -/** - * '+' operator. - */ -public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression { - public BeamSqlPlusExpression(List<BeamSqlExpression> operands) { - super(operands); - } - - @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { - return left.add(right); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java deleted file mode 100644 index 5f8d649..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Arithmetic operators. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java deleted file mode 100644 index 9b6b527..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@link BeamSqlCompareExpression} is used for compare operations. - * - * <p>See {@link BeamSqlEqualsExpression}, {@link BeamSqlLessThanExpression}, - * {@link BeamSqlLessThanOrEqualsExpression}, {@link BeamSqlGreaterThanExpression}, - * {@link BeamSqlGreaterThanOrEqualsExpression} and {@link BeamSqlNotEqualsExpression} - * for more details. - * - */ -public abstract class BeamSqlCompareExpression extends BeamSqlExpression { - - private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - public BeamSqlCompareExpression(List<BeamSqlExpression> operands) { - this(operands, SqlTypeName.BOOLEAN); - } - - /** - * Compare operation must have 2 operands. - */ - @Override - public boolean accept() { - return operands.size() == 2; - } - - @Override - public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { - Object leftValue = operands.get(0).evaluate(inputRow).getValue(); - Object rightValue = operands.get(1).evaluate(inputRow).getValue(); - switch (operands.get(0).getOutputType()) { - case BIGINT: - case DECIMAL: - case DOUBLE: - case FLOAT: - case INTEGER: - case SMALLINT: - case TINYINT: - return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, - compare((Number) leftValue, (Number) rightValue)); - case BOOLEAN: - return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, - compare((Boolean) leftValue, (Boolean) rightValue)); - case VARCHAR: - return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, - compare((CharSequence) leftValue, (CharSequence) rightValue)); - default: - throw new UnsupportedOperationException(toString()); - } - } - - /** - * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}. - */ - public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue); - - /** - * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}. - */ - public abstract Boolean compare(Boolean leftValue, Boolean rightValue); - - /** - * Compare between Number values, including {@link SqlTypeName#BIGINT}, - * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT}, - * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}. - */ - public abstract Boolean compare(Number leftValue, Number rightValue); - - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java deleted file mode 100644 index b9767e3..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; - -/** - * {@code BeamSqlExpression} for {@code =} operation. - */ -public class BeamSqlEqualsExpression extends BeamSqlCompareExpression { - - public BeamSqlEqualsExpression(List<BeamSqlExpression> operands) { - super(operands); - } - - @Override - public Boolean compare(CharSequence leftValue, CharSequence rightValue) { - return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0; - } - - @Override - public Boolean compare(Boolean leftValue, Boolean rightValue) { - return !(leftValue ^ rightValue); - } - - @Override - public Boolean compare(Number leftValue, Number rightValue) { - return (leftValue == null && rightValue == null) - || (leftValue != null && rightValue != null - && leftValue.floatValue() == (rightValue).floatValue()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java deleted file mode 100644 index 5fdf27b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; - -/** - * {@code BeamSqlExpression} for {@code >} operation. - */ -public class BeamSqlGreaterThanExpression extends BeamSqlCompareExpression { - - public BeamSqlGreaterThanExpression(List<BeamSqlExpression> operands) { - super(operands); - } - - @Override - public Boolean compare(CharSequence leftValue, CharSequence rightValue) { - return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0; - } - - @Override - public Boolean compare(Boolean leftValue, Boolean rightValue) { - throw new IllegalArgumentException("> is not supported for Boolean."); - } - - @Override - public Boolean compare(Number leftValue, Number rightValue) { - return (leftValue == null && rightValue == null) - || (leftValue != null && rightValue != null - && leftValue.floatValue() > (rightValue).floatValue()); - } - -}
