http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java deleted file mode 100644 index ed89c49..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Base class for the PI function. - */ -public class BeamSqlPiExpression extends BeamSqlExpression { - - public BeamSqlPiExpression() { - this.outputType = SqlTypeName.DOUBLE; - } - - @Override public boolean accept() { - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java deleted file mode 100644 index e2bdd05..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathBinaryExpression} for 'POWER' function. - */ -public class BeamSqlPowerExpression extends BeamSqlMathBinaryExpression { - - public BeamSqlPowerExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.DOUBLE); - } - - @Override - public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp, - BeamSqlPrimitive rightOp) { - return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions - .power(SqlFunctions.toDouble(leftOp.getValue()), - SqlFunctions.toDouble(rightOp.getValue()))); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java deleted file mode 100644 index d2d04c3..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathUnaryExpression} for 'RADIANS' function. - */ -public class BeamSqlRadiansExpression extends BeamSqlMathUnaryExpression { - - public BeamSqlRadiansExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.DOUBLE); - } - - @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) { - return BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.radians(SqlFunctions.toDouble(op.getValue()))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java deleted file mode 100644 index 8df6f67..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import java.util.Random; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathUnaryExpression} for 'RAND([seed])' function. - */ -public class BeamSqlRandExpression extends BeamSqlExpression { - private Random rand = new Random(); - private Integer seed = null; - - public BeamSqlRandExpression(List<BeamSqlExpression> subExps) { - super(subExps, SqlTypeName.DOUBLE); - } - - @Override - public boolean accept() { - return true; - } - - @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - if (operands.size() == 1) { - int rowSeed = opValueEvaluated(0, inputRecord); - if (seed == null || seed != rowSeed) { - rand.setSeed(rowSeed); - } - } - return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, rand.nextDouble()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java deleted file mode 100644 index dfd76b8..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import java.util.Random; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathUnaryExpression} for 'RAND_INTEGER([seed, ] numeric)' - * function. - */ -public class BeamSqlRandIntegerExpression extends BeamSqlExpression { - private Random rand = new Random(); - private Integer seed = null; - - public BeamSqlRandIntegerExpression(List<BeamSqlExpression> subExps) { - super(subExps, SqlTypeName.INTEGER); - } - - @Override - public boolean accept() { - return true; - } - - @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { - int numericIdx = 0; - if (operands.size() == 2) { - int rowSeed = opValueEvaluated(0, inputRecord); - if (seed == null || seed != rowSeed) { - rand.setSeed(rowSeed); - } - numericIdx = 1; - } - return BeamSqlPrimitive.of(SqlTypeName.INTEGER, - rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java deleted file mode 100644 index 9349ce5..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.math.BigDecimal; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathBinaryExpression} for 'ROUND' function. - */ -public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression { - - private final BeamSqlPrimitive zero = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0); - - public BeamSqlRoundExpression(List<BeamSqlExpression> operands) { - super(operands, operands.get(0).getOutputType()); - checkForSecondOperand(operands); - } - - private void checkForSecondOperand(List<BeamSqlExpression> operands) { - if (numberOfOperands() == 1) { - operands.add(1, zero); - } - } - - @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp, - BeamSqlPrimitive rightOp) { - BeamSqlPrimitive result = null; - switch (leftOp.getOutputType()) { - case SMALLINT: - result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT, - (short) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue()))); - break; - case TINYINT: - result = BeamSqlPrimitive.of(SqlTypeName.TINYINT, - (byte) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue()))); - break; - case INTEGER: - result = BeamSqlPrimitive - .of(SqlTypeName.INTEGER, roundInt(leftOp.getInteger(), toInt(rightOp.getValue()))); - break; - case BIGINT: - result = BeamSqlPrimitive - .of(SqlTypeName.BIGINT, roundLong(leftOp.getLong(), toInt(rightOp.getValue()))); - break; - case DOUBLE: - result = BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, roundDouble(leftOp.getDouble(), toInt(rightOp.getValue()))); - break; - case FLOAT: - result = BeamSqlPrimitive.of(SqlTypeName.FLOAT, - (float) roundDouble(leftOp.getFloat(), toInt(rightOp.getValue()))); - break; - case DECIMAL: - result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, - roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue()))); - break; - default: - break; - } - return result; - } - - private int roundInt(int v1, int v2) { - return SqlFunctions.sround(v1, v2); - } - - private double roundDouble(double v1, int v2) { - return SqlFunctions.sround(v1, v2); - } - - private BigDecimal roundBigDecimal(BigDecimal v1, int v2) { - return SqlFunctions.sround(v1, v2); - } - - private long roundLong(long v1, int v2) { - return SqlFunctions.sround(v1, v2); - } - - private int toInt(Object value) { - return SqlFunctions.toInt(value); - } - - private BigDecimal toBigDecimal(Object value) { - return SqlFunctions.toBigDecimal(value); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java deleted file mode 100644 index b26ef91..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathUnaryExpression} for 'SIGN' function. - */ -public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression { - - public BeamSqlSignExpression(List<BeamSqlExpression> operands) { - super(operands, operands.get(0).getOutputType()); - } - - @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) { - BeamSqlPrimitive result = null; - switch (op.getOutputType()) { - case TINYINT: - result = BeamSqlPrimitive - .of(SqlTypeName.TINYINT, (byte) SqlFunctions.sign(SqlFunctions.toByte(op.getValue()))); - break; - case SMALLINT: - result = BeamSqlPrimitive - .of(SqlTypeName.SMALLINT, (short) SqlFunctions.sign(SqlFunctions.toShort(op.getValue()))); - break; - case INTEGER: - result = BeamSqlPrimitive - .of(SqlTypeName.INTEGER, SqlFunctions.sign(SqlFunctions.toInt(op.getValue()))); - break; - case BIGINT: - result = BeamSqlPrimitive - .of(SqlTypeName.BIGINT, SqlFunctions.sign(SqlFunctions.toLong(op.getValue()))); - break; - case FLOAT: - result = BeamSqlPrimitive - .of(SqlTypeName.FLOAT, (float) SqlFunctions.sign(SqlFunctions.toFloat(op.getValue()))); - break; - case DOUBLE: - result = BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.sign(SqlFunctions.toDouble(op.getValue()))); - break; - case DECIMAL: - result = BeamSqlPrimitive - .of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue()))); - break; - default: - break; - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java deleted file mode 100644 index 1b8023e..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathUnaryExpression} for 'SIN' function. - */ -public class BeamSqlSinExpression extends BeamSqlMathUnaryExpression { - - public BeamSqlSinExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.DOUBLE); - } - - @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) { - return BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.sin(SqlFunctions.toDouble(op.getValue()))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java deleted file mode 100644 index c86f8b9..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathUnaryExpression} for 'TAN' function. - */ -public class BeamSqlTanExpression extends BeamSqlMathUnaryExpression { - - public BeamSqlTanExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.DOUBLE); - } - - @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) { - return BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.tan(SqlFunctions.toDouble(op.getValue()))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java deleted file mode 100644 index 8201360..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlMathBinaryExpression} for 'TRUNCATE' function. - */ -public class BeamSqlTruncateExpression extends BeamSqlMathBinaryExpression { - - public BeamSqlTruncateExpression(List<BeamSqlExpression> operands) { - super(operands, operands.get(0).getOutputType()); - } - - @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp, - BeamSqlPrimitive rightOp) { - BeamSqlPrimitive result = null; - int rightIntOperand = SqlFunctions.toInt(rightOp.getValue()); - switch (leftOp.getOutputType()) { - case SMALLINT: - result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT, - (short) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand)); - break; - case TINYINT: - result = BeamSqlPrimitive.of(SqlTypeName.TINYINT, - (byte) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand)); - break; - case INTEGER: - result = BeamSqlPrimitive.of(SqlTypeName.INTEGER, - SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand)); - break; - case BIGINT: - result = BeamSqlPrimitive - .of(SqlTypeName.BIGINT, SqlFunctions.struncate(leftOp.getLong(), rightIntOperand)); - break; - case FLOAT: - result = BeamSqlPrimitive.of(SqlTypeName.FLOAT, - (float) SqlFunctions.struncate(SqlFunctions.toFloat(leftOp.getValue()), - rightIntOperand)); - break; - case DOUBLE: - result = BeamSqlPrimitive.of(SqlTypeName.DOUBLE, - SqlFunctions.struncate(SqlFunctions.toDouble(leftOp.getValue()), rightIntOperand)); - break; - case DECIMAL: - result = BeamSqlPrimitive - .of(SqlTypeName.DECIMAL, SqlFunctions.struncate(leftOp.getDecimal(), rightIntOperand)); - break; - default: - break; - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java deleted file mode 100644 index 09c0780..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * MATH functions/operators. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator.math; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java deleted file mode 100644 index f913d7f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java deleted file mode 100644 index 44ab804..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'CHAR_LENGTH' operator. - */ -public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression { - public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.INTEGER); - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java deleted file mode 100644 index bd298fc..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * String concat operator. - */ -public class BeamSqlConcatExpression extends BeamSqlExpression { - - protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - public BeamSqlConcatExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public boolean accept() { - if (operands.size() != 2) { - return false; - } - - for (BeamSqlExpression exp : getOperands()) { - if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) { - return false; - } - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String left = opValueEvaluated(0, inputRow); - String right = opValueEvaluated(1, inputRow); - - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, - new StringBuilder(left.length() + right.length()) - .append(left).append(right).toString()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java deleted file mode 100644 index 79cd26f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'INITCAP' operator. - */ -public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression { - public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - - StringBuilder ret = new StringBuilder(str); - boolean isInit = true; - for (int i = 0; i < str.length(); i++) { - if (Character.isWhitespace(str.charAt(i))) { - isInit = true; - continue; - } - - if (isInit) { - ret.setCharAt(i, Character.toUpperCase(str.charAt(i))); - isInit = false; - } else { - ret.setCharAt(i, Character.toLowerCase(str.charAt(i))); - } - } - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java deleted file mode 100644 index 384c012..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'LOWER' operator. - */ -public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression { - public BeamSqlLowerExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java deleted file mode 100644 index 44e4624..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'OVERLAY' operator. - * - * <p> - * OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ]) - * </p> - */ -public class BeamSqlOverlayExpression extends BeamSqlExpression { - public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public boolean accept() { - if (operands.size() < 3 || operands.size() > 4) { - return false; - } - - if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) - || !SqlTypeName.CHAR_TYPES.contains(opType(1)) - || !SqlTypeName.INT_TYPES.contains(opType(2))) { - return false; - } - - if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) { - return false; - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - String replaceStr = opValueEvaluated(1, inputRow); - int idx = opValueEvaluated(2, inputRow); - // the index is 1 based. - idx -= 1; - int length = replaceStr.length(); - if (operands.size() == 4) { - length = opValueEvaluated(3, inputRow); - } - - StringBuilder result = new StringBuilder( - str.length() + replaceStr.length() - length); - result.append(str.substring(0, idx)) - .append(replaceStr) - .append(str.substring(idx + length)); - - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java deleted file mode 100644 index 683902c..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * String position operator. - * - * <p> - * example: - * POSITION(string1 IN string2) - * POSITION(string1 IN string2 FROM integer) - * </p> - */ -public class BeamSqlPositionExpression extends BeamSqlExpression { - public BeamSqlPositionExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.INTEGER); - } - - @Override public boolean accept() { - if (operands.size() < 2 || operands.size() > 3) { - return false; - } - - if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) - || !SqlTypeName.CHAR_TYPES.contains(opType(1))) { - return false; - } - - if (operands.size() == 3 - && !SqlTypeName.INT_TYPES.contains(opType(2))) { - return false; - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String targetStr = opValueEvaluated(0, inputRow); - String containingStr = opValueEvaluated(1, inputRow); - int from = -1; - if (operands.size() == 3) { - Number tmp = opValueEvaluated(2, inputRow); - from = tmp.intValue(); - } - - int idx = containingStr.indexOf(targetStr, from); - - return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java deleted file mode 100644 index d6099ab..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Base class for all string unary operators. - */ -public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression { - public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - super(operands, outputType); - } - - @Override public boolean accept() { - if (operands.size() != 1) { - return false; - } - - if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) { - return false; - } - - return true; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java deleted file mode 100644 index 759bfa3..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'SUBSTRING' operator. - * - * <p> - * SUBSTRING(string FROM integer) - * SUBSTRING(string FROM integer FOR integer) - * </p> - */ -public class BeamSqlSubstringExpression extends BeamSqlExpression { - public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public boolean accept() { - if (operands.size() < 2 || operands.size() > 3) { - return false; - } - - if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) - || !SqlTypeName.INT_TYPES.contains(opType(1))) { - return false; - } - - if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) { - return false; - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - int idx = opValueEvaluated(1, inputRow); - int startIdx = idx; - if (startIdx > 0) { - // NOTE: SQL substring is 1 based(rather than 0 based) - startIdx -= 1; - } else if (startIdx < 0) { - // NOTE: SQL also support negative index... - startIdx += str.length(); - } else { - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ""); - } - - if (operands.size() == 3) { - int length = opValueEvaluated(2, inputRow); - if (length < 0) { - length = 0; - } - int endIdx = Math.min(startIdx + length, str.length()); - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx)); - } else { - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx)); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java deleted file mode 100644 index 19d411b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.fun.SqlTrimFunction; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Trim operator. - * - * <p> - * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2) - * </p> - */ -public class BeamSqlTrimExpression extends BeamSqlExpression { - public BeamSqlTrimExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public boolean accept() { - if (operands.size() != 1 && operands.size() != 3) { - return false; - } - - if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) { - return false; - } - - if (operands.size() == 3 - && ( - SqlTypeName.SYMBOL != opType(0) - || !SqlTypeName.CHAR_TYPES.contains(opType(1)) - || !SqlTypeName.CHAR_TYPES.contains(opType(2))) - ) { - return false; - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - if (operands.size() == 1) { - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, - opValueEvaluated(0, inputRow).toString().trim()); - } else { - SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow); - String targetStr = opValueEvaluated(1, inputRow); - String containingStr = opValueEvaluated(2, inputRow); - - switch (type) { - case LEADING: - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr)); - case TRAILING: - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr)); - case BOTH: - default: - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, - trailingTrim(leadingTrim(containingStr, targetStr), targetStr)); - } - } - } - - static String leadingTrim(String containingStr, String targetStr) { - int idx = 0; - while (containingStr.startsWith(targetStr, idx)) { - idx += targetStr.length(); - } - - return containingStr.substring(idx); - } - - static String trailingTrim(String containingStr, String targetStr) { - int idx = containingStr.length() - targetStr.length(); - while (containingStr.startsWith(targetStr, idx)) { - idx -= targetStr.length(); - } - - idx += targetStr.length(); - return containingStr.substring(0, idx); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java deleted file mode 100644 index cf27597..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * 'UPPER' operator. - */ -public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression { - public BeamSqlUpperExpression(List<BeamSqlExpression> operands) { - super(operands, SqlTypeName.VARCHAR); - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - String str = opValueEvaluated(0, inputRow); - return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java deleted file mode 100644 index 8b55034..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * String operators. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java deleted file mode 100644 index af3634a..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * interpreter generate runnable 'code' to execute SQL relational expressions. - */ -package org.apache.beam.sdk.extensions.sql.interpreter; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java deleted file mode 100644 index ba6235f..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.planner; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; -import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.Lex; -import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.rel.RelCollationTraitDef; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlOperatorTable; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.sql.util.ChainedSqlOperatorTable; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.Planner; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The core component to handle through a SQL statement, from explain execution plan, - * to generate a Beam pipeline. - * - */ -public class BeamQueryPlanner { - private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); - - protected final Planner planner; - private Map<String, BaseBeamTable> sourceTables = new HashMap<>(); - - public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT); - - public BeamQueryPlanner(SchemaPlus schema) { - final List<RelTraitDef> traitDefs = new ArrayList<>(); - traitDefs.add(ConventionTraitDef.INSTANCE); - traitDefs.add(RelCollationTraitDef.INSTANCE); - - List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>(); - sqlOperatorTables.add(SqlStdOperatorTable.instance()); - sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false, - Collections.<String>emptyList(), TYPE_FACTORY)); - - FrameworkConfig config = Frameworks.newConfigBuilder() - .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) - .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) - .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM) - .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)) - .build(); - this.planner = Frameworks.getPlanner(config); - - for (String t : schema.getTableNames()) { - sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); - } - } - - /** - * Parse input SQL query, and return a {@link SqlNode} as grammar tree. - */ - public SqlNode parseQuery(String sqlQuery) throws SqlParseException{ - return planner.parse(sqlQuery); - } - - /** - * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, - * which is linked with the given {@code pipeline}. The final output stream is returned as - * {@code PCollection} so more operations can be applied. - */ - public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline - , BeamSqlEnv sqlEnv) throws Exception { - BeamRelNode relNode = convertToBeamRel(sqlStatement); - - // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. - return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv); - } - - /** - * It parses and validate the input query, then convert into a - * {@link BeamRelNode} tree. - * - */ - public BeamRelNode convertToBeamRel(String sqlStatement) - throws ValidationException, RelConversionException, SqlParseException { - BeamRelNode beamRelNode; - try { - beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement)); - } finally { - planner.close(); - } - return beamRelNode; - } - - private RelNode validateAndConvert(SqlNode sqlNode) - throws ValidationException, RelConversionException { - SqlNode validated = validateNode(sqlNode); - LOG.info("SQL:\n" + validated); - RelNode relNode = convertToRelNode(validated); - return convertToBeamRel(relNode); - } - - private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException { - RelTraitSet traitSet = relNode.getTraitSet(); - - LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode)); - - // PlannerImpl.transform() optimizes RelNode with ruleset - return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode); - } - - private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException { - return planner.rel(sqlNode).rel; - } - - private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { - return planner.validate(sqlNode); - } - - public Map<String, BaseBeamTable> getSourceTables() { - return sourceTables; - } - - public Planner getPlanner() { - return planner; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java deleted file mode 100644 index fba4638..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.planner; - -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.type.RelDataTypeSystemImpl; - -/** - * customized data type in Beam. - * - */ -public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl { - public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem(); - - @Override - public int getMaxNumericScale() { - return 38; - } - - @Override - public int getMaxNumericPrecision() { - return 38; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java deleted file mode 100644 index e907321..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.planner; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import java.util.Iterator; -import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.rule.BeamAggregationRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamFilterRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamIOSinkRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamIOSourceRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamIntersectRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamJoinRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamMinusRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamProjectRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamSortRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamUnionRule; -import org.apache.beam.sdk.extensions.sql.rule.BeamValuesRule; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.tools.RuleSet; - -/** - * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard - * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode} - * - */ -public class BeamRuleSets { - private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet - .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, - BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, - BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, - BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE, - BeamJoinRule.INSTANCE) - .build(); - - public static RuleSet[] getRuleSets() { - return new RuleSet[] { new BeamRuleSet( - ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) }; - } - - private static class BeamRuleSet implements RuleSet { - final ImmutableSet<RelOptRule> rules; - - public BeamRuleSet(ImmutableSet<RelOptRule> rules) { - this.rules = rules; - } - - public BeamRuleSet(ImmutableList<RelOptRule> rules) { - this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build(); - } - - @Override - public Iterator<RelOptRule> iterator() { - return rules.iterator(); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java deleted file mode 100644 index 680ccbd..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * {@link org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner} is the main interface. - * It defines data sources, validate a SQL statement, and convert it as a Beam - * pipeline. - */ -package org.apache.beam.sdk.extensions.sql.planner; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java deleted file mode 100644 index 66ab892..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.rel; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.transform.BeamAggregationTransforms; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.WithTimestamps; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.calcite.linq4j.Ord; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelWriter; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Util; -import org.joda.time.Duration; - -/** - * {@link BeamRelNode} to replace a {@link Aggregate} node. - * - */ -public class BeamAggregationRel extends Aggregate implements BeamRelNode { - private int windowFieldIdx = -1; - private WindowFn<BeamSqlRow, BoundedWindow> windowFn; - private Trigger trigger; - private Duration allowedLatence = Duration.ZERO; - - public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits - , RelNode child, boolean indicator, - ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls - , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) { - super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); - this.windowFn = windowFn; - this.trigger = trigger; - this.windowFieldIdx = windowFieldIdx; - this.allowedLatence = allowedLatence; - } - - @Override - public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections - , BeamSqlEnv sqlEnv) throws Exception { - RelNode input = getInput(); - String stageName = BeamSqlRelUtils.getStageName(this) + "_"; - - PCollection<BeamSqlRow> upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); - if (windowFieldIdx != -1) { - upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps - .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) - .setCoder(upstream.getCoder()); - } - - PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window", - Window.into(windowFn) - .triggering(trigger) - .withAllowedLateness(allowedLatence) - .accumulatingFiredPanes()); - - BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply( - stageName + "exCombineBy", - WithKeys - .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( - windowFieldIdx, groupSet))) - .setCoder(KvCoder.of(keyCoder, upstream.getCoder())); - - - BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply( - stageName + "combineBy", - Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( - new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(), - CalciteUtils.toBeamRowType(input.getRowType())))) - .setCoder(KvCoder.of(keyCoder, aggCoder)); - - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", - ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( - CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx))); - mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); - - return mergedStream; - } - - /** - * Type of sub-rowrecord used as Group-By keys. - */ - private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) { - BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType); - List<String> fieldNames = new ArrayList<>(); - List<Integer> fieldTypes = new ArrayList<>(); - for (int i : groupSet.asList()) { - if (i != windowFieldIdx) { - fieldNames.add(inputRowType.getFieldsName().get(i)); - fieldTypes.add(inputRowType.getFieldsType().get(i)); - } - } - return BeamSqlRowType.create(fieldNames, fieldTypes); - } - - /** - * Type of sub-rowrecord, that represents the list of aggregation fields. - */ - private BeamSqlRowType exAggFieldsSchema() { - List<String> fieldNames = new ArrayList<>(); - List<Integer> fieldTypes = new ArrayList<>(); - for (AggregateCall ac : getAggCallList()) { - fieldNames.add(ac.name); - fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName())); - } - - return BeamSqlRowType.create(fieldNames, fieldTypes); - } - - @Override - public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator - , ImmutableBitSet groupSet, - List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { - return new BeamAggregationRel(getCluster(), traitSet, input, indicator - , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence); - } - - public void setWindowFn(WindowFn windowFn) { - this.windowFn = windowFn; - } - - public void setTrigger(Trigger trigger) { - this.trigger = trigger; - } - - public RelWriter explainTerms(RelWriter pw) { - // We skip the "groups" element if it is a singleton of "group". - pw.item("group", groupSet) - .itemIf("window", windowFn, windowFn != null) - .itemIf("trigger", trigger, trigger != null) - .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1) - .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE) - .itemIf("indicator", indicator, indicator) - .itemIf("aggs", aggCalls, pw.nest()); - if (!pw.nest()) { - for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) { - pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e); - } - } - return pw; - } - -}
