http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java new file mode 100644 index 0000000..ffc6833 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import com.google.common.base.Joiner; +import java.math.BigDecimal; +import java.sql.Types; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import org.apache.beam.sdk.extensions.sql.BeamSql; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.util.Pair; +import org.junit.Rule; + +/** + * Base class for all built-in functions integration tests. + */ +public class BeamSqlBuiltinFunctionsIntegrationTestBase { + private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>(); + static { + JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT); + JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT); + JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER); + JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT); + JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT); + JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE); + JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL); + JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR); + JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE); + JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN); + } + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + protected PCollection<BeamSqlRow> getTestPCollection() { + BeamSqlRowType type = BeamSqlRowType.create( + Arrays.asList("ts", "c_tinyint", "c_smallint", + "c_integer", "c_bigint", "c_float", "c_double", "c_decimal", + "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"), + Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT, + Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL, + Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT) + ); + try { + return MockedBoundedTable + .of(type) + .addRows( + parseDate("1986-02-15 11:35:26"), + (byte) 1, + (short) 1, + 1, + 1L, + 1.0f, + 1.0, + BigDecimal.ONE, + (byte) 127, + (short) 32767, + 2147483647, + 9223372036854775807L + ) + .buildIOReader(pipeline) + .setCoder(new BeamSqlRowCoder(type)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected static Date parseDate(String str) { + try { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + return sdf.parse(str); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + /** + * Helper class to make write integration test for built-in functions easier. + * + * <p>example usage: + * <pre>{@code + * ExpressionChecker checker = new ExpressionChecker() + * .addExpr("1 + 1", 2) + * .addExpr("1.0 + 1", 2.0) + * .addExpr("1 + 1.0", 2.0) + * .addExpr("1.0 + 1.0", 2.0) + * .addExpr("c_tinyint + c_tinyint", (byte) 2); + * checker.buildRunAndCheck(inputCollections); + * }</pre> + */ + public class ExpressionChecker { + private transient List<Pair<String, Object>> exps = new ArrayList<>(); + + public ExpressionChecker addExpr(String expression, Object expectedValue) { + exps.add(Pair.of(expression, expectedValue)); + return this; + } + + private String getSql() { + List<String> expStrs = new ArrayList<>(); + for (Pair<String, Object> pair : exps) { + expStrs.add(pair.getKey()); + } + return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION"; + } + + /** + * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result. + */ + public void buildRunAndCheck() { + PCollection<BeamSqlRow> inputCollection = getTestPCollection(); + System.out.println("SQL:>\n" + getSql()); + try { + List<String> names = new ArrayList<>(); + List<Integer> types = new ArrayList<>(); + List<Object> values = new ArrayList<>(); + + for (Pair<String, Object> pair : exps) { + names.add(pair.getKey()); + types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass())); + values.add(pair.getValue()); + } + + PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder + .of(BeamSqlRowType.create(names, types)) + .addRows(values) + .getRows() + ); + inputCollection.getPipeline().run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java new file mode 100644 index 0000000..14de5b6 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import java.math.BigDecimal; +import java.sql.Types; +import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Test; + +/** + * Integration test for comparison operators. + */ +public class BeamSqlComparisonOperatorsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + + @Test + public void testEquals() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_1 = c_tinyint_1", true) + .addExpr("c_tinyint_1 = c_tinyint_2", false) + .addExpr("c_smallint_1 = c_smallint_1", true) + .addExpr("c_smallint_1 = c_smallint_2", false) + .addExpr("c_integer_1 = c_integer_1", true) + .addExpr("c_integer_1 = c_integer_2", false) + .addExpr("c_bigint_1 = c_bigint_1", true) + .addExpr("c_bigint_1 = c_bigint_2", false) + .addExpr("c_float_1 = c_float_1", true) + .addExpr("c_float_1 = c_float_2", false) + .addExpr("c_double_1 = c_double_1", true) + .addExpr("c_double_1 = c_double_2", false) + .addExpr("c_decimal_1 = c_decimal_1", true) + .addExpr("c_decimal_1 = c_decimal_2", false) + .addExpr("c_varchar_1 = c_varchar_1", true) + .addExpr("c_varchar_1 = c_varchar_2", false) + .addExpr("c_boolean_true = c_boolean_true", true) + .addExpr("c_boolean_true = c_boolean_false", false) + + ; + checker.buildRunAndCheck(); + } + + @Test + public void testNotEquals() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_1 <> c_tinyint_1", false) + .addExpr("c_tinyint_1 <> c_tinyint_2", true) + .addExpr("c_smallint_1 <> c_smallint_1", false) + .addExpr("c_smallint_1 <> c_smallint_2", true) + .addExpr("c_integer_1 <> c_integer_1", false) + .addExpr("c_integer_1 <> c_integer_2", true) + .addExpr("c_bigint_1 <> c_bigint_1", false) + .addExpr("c_bigint_1 <> c_bigint_2", true) + .addExpr("c_float_1 <> c_float_1", false) + .addExpr("c_float_1 <> c_float_2", true) + .addExpr("c_double_1 <> c_double_1", false) + .addExpr("c_double_1 <> c_double_2", true) + .addExpr("c_decimal_1 <> c_decimal_1", false) + .addExpr("c_decimal_1 <> c_decimal_2", true) + .addExpr("c_varchar_1 <> c_varchar_1", false) + .addExpr("c_varchar_1 <> c_varchar_2", true) + .addExpr("c_boolean_true <> c_boolean_true", false) + .addExpr("c_boolean_true <> c_boolean_false", true) + ; + checker.buildRunAndCheck(); + } + + @Test + public void testGreaterThan() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_2 > c_tinyint_1", true) + .addExpr("c_tinyint_1 > c_tinyint_1", false) + .addExpr("c_tinyint_1 > c_tinyint_2", false) + + .addExpr("c_smallint_2 > c_smallint_1", true) + .addExpr("c_smallint_1 > c_smallint_1", false) + .addExpr("c_smallint_1 > c_smallint_2", false) + + .addExpr("c_integer_2 > c_integer_1", true) + .addExpr("c_integer_1 > c_integer_1", false) + .addExpr("c_integer_1 > c_integer_2", false) + + .addExpr("c_bigint_2 > c_bigint_1", true) + .addExpr("c_bigint_1 > c_bigint_1", false) + .addExpr("c_bigint_1 > c_bigint_2", false) + + .addExpr("c_float_2 > c_float_1", true) + .addExpr("c_float_1 > c_float_1", false) + .addExpr("c_float_1 > c_float_2", false) + + .addExpr("c_double_2 > c_double_1", true) + .addExpr("c_double_1 > c_double_1", false) + .addExpr("c_double_1 > c_double_2", false) + + .addExpr("c_decimal_2 > c_decimal_1", true) + .addExpr("c_decimal_1 > c_decimal_1", false) + .addExpr("c_decimal_1 > c_decimal_2", false) + + .addExpr("c_varchar_2 > c_varchar_1", true) + .addExpr("c_varchar_1 > c_varchar_1", false) + .addExpr("c_varchar_1 > c_varchar_2", false) + ; + + checker.buildRunAndCheck(); + } + + @Test(expected = RuntimeException.class) + public void testGreaterThanException() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_boolean_false > c_boolean_true", false); + checker.buildRunAndCheck(); + } + + @Test + public void testGreaterThanOrEquals() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_2 >= c_tinyint_1", true) + .addExpr("c_tinyint_1 >= c_tinyint_1", true) + .addExpr("c_tinyint_1 >= c_tinyint_2", false) + + .addExpr("c_smallint_2 >= c_smallint_1", true) + .addExpr("c_smallint_1 >= c_smallint_1", true) + .addExpr("c_smallint_1 >= c_smallint_2", false) + + .addExpr("c_integer_2 >= c_integer_1", true) + .addExpr("c_integer_1 >= c_integer_1", true) + .addExpr("c_integer_1 >= c_integer_2", false) + + .addExpr("c_bigint_2 >= c_bigint_1", true) + .addExpr("c_bigint_1 >= c_bigint_1", true) + .addExpr("c_bigint_1 >= c_bigint_2", false) + + .addExpr("c_float_2 >= c_float_1", true) + .addExpr("c_float_1 >= c_float_1", true) + .addExpr("c_float_1 >= c_float_2", false) + + .addExpr("c_double_2 >= c_double_1", true) + .addExpr("c_double_1 >= c_double_1", true) + .addExpr("c_double_1 >= c_double_2", false) + + .addExpr("c_decimal_2 >= c_decimal_1", true) + .addExpr("c_decimal_1 >= c_decimal_1", true) + .addExpr("c_decimal_1 >= c_decimal_2", false) + + .addExpr("c_varchar_2 >= c_varchar_1", true) + .addExpr("c_varchar_1 >= c_varchar_1", true) + .addExpr("c_varchar_1 >= c_varchar_2", false) + ; + + checker.buildRunAndCheck(); + } + + @Test(expected = RuntimeException.class) + public void testGreaterThanOrEqualsException() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_boolean_false >= c_boolean_true", false); + checker.buildRunAndCheck(); + } + + @Test + public void testLessThan() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_2 < c_tinyint_1", false) + .addExpr("c_tinyint_1 < c_tinyint_1", false) + .addExpr("c_tinyint_1 < c_tinyint_2", true) + + .addExpr("c_smallint_2 < c_smallint_1", false) + .addExpr("c_smallint_1 < c_smallint_1", false) + .addExpr("c_smallint_1 < c_smallint_2", true) + + .addExpr("c_integer_2 < c_integer_1", false) + .addExpr("c_integer_1 < c_integer_1", false) + .addExpr("c_integer_1 < c_integer_2", true) + + .addExpr("c_bigint_2 < c_bigint_1", false) + .addExpr("c_bigint_1 < c_bigint_1", false) + .addExpr("c_bigint_1 < c_bigint_2", true) + + .addExpr("c_float_2 < c_float_1", false) + .addExpr("c_float_1 < c_float_1", false) + .addExpr("c_float_1 < c_float_2", true) + + .addExpr("c_double_2 < c_double_1", false) + .addExpr("c_double_1 < c_double_1", false) + .addExpr("c_double_1 < c_double_2", true) + + .addExpr("c_decimal_2 < c_decimal_1", false) + .addExpr("c_decimal_1 < c_decimal_1", false) + .addExpr("c_decimal_1 < c_decimal_2", true) + + .addExpr("c_varchar_2 < c_varchar_1", false) + .addExpr("c_varchar_1 < c_varchar_1", false) + .addExpr("c_varchar_1 < c_varchar_2", true) + ; + + checker.buildRunAndCheck(); + } + + @Test(expected = RuntimeException.class) + public void testLessThanException() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_boolean_false < c_boolean_true", false); + checker.buildRunAndCheck(); + } + + @Test + public void testLessThanOrEquals() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_2 <= c_tinyint_1", false) + .addExpr("c_tinyint_1 <= c_tinyint_1", true) + .addExpr("c_tinyint_1 <= c_tinyint_2", true) + + .addExpr("c_smallint_2 <= c_smallint_1", false) + .addExpr("c_smallint_1 <= c_smallint_1", true) + .addExpr("c_smallint_1 <= c_smallint_2", true) + + .addExpr("c_integer_2 <= c_integer_1", false) + .addExpr("c_integer_1 <= c_integer_1", true) + .addExpr("c_integer_1 <= c_integer_2", true) + + .addExpr("c_bigint_2 <= c_bigint_1", false) + .addExpr("c_bigint_1 <= c_bigint_1", true) + .addExpr("c_bigint_1 <= c_bigint_2", true) + + .addExpr("c_float_2 <= c_float_1", false) + .addExpr("c_float_1 <= c_float_1", true) + .addExpr("c_float_1 <= c_float_2", true) + + .addExpr("c_double_2 <= c_double_1", false) + .addExpr("c_double_1 <= c_double_1", true) + .addExpr("c_double_1 <= c_double_2", true) + + .addExpr("c_decimal_2 <= c_decimal_1", false) + .addExpr("c_decimal_1 <= c_decimal_1", true) + .addExpr("c_decimal_1 <= c_decimal_2", true) + + .addExpr("c_varchar_2 <= c_varchar_1", false) + .addExpr("c_varchar_1 <= c_varchar_1", true) + .addExpr("c_varchar_1 <= c_varchar_2", true) + ; + + checker.buildRunAndCheck(); + } + + @Test(expected = RuntimeException.class) + public void testLessThanOrEqualsException() { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_boolean_false <= c_boolean_true", false); + checker.buildRunAndCheck(); + } + + @Test + public void testIsNullAndIsNotNull() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 IS NOT NULL", true) + .addExpr("NULL IS NOT NULL", false) + + .addExpr("1 IS NULL", false) + .addExpr("NULL IS NULL", true) + ; + + checker.buildRunAndCheck(); + } + + @Override protected PCollection<BeamSqlRow> getTestPCollection() { + BeamSqlRowType type = BeamSqlRowType.create( + Arrays.asList( + "c_tinyint_0", "c_tinyint_1", "c_tinyint_2", + "c_smallint_0", "c_smallint_1", "c_smallint_2", + "c_integer_0", "c_integer_1", "c_integer_2", + "c_bigint_0", "c_bigint_1", "c_bigint_2", + "c_float_0", "c_float_1", "c_float_2", + "c_double_0", "c_double_1", "c_double_2", + "c_decimal_0", "c_decimal_1", "c_decimal_2", + "c_varchar_0", "c_varchar_1", "c_varchar_2", + "c_boolean_false", "c_boolean_true" + ), + Arrays.asList( + Types.TINYINT, Types.TINYINT, Types.TINYINT, + Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, + Types.INTEGER, Types.INTEGER, Types.INTEGER, + Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.FLOAT, Types.FLOAT, Types.FLOAT, + Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, + Types.DECIMAL, Types.DECIMAL, Types.DECIMAL, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.BOOLEAN, Types.BOOLEAN + ) + ); + try { + return MockedBoundedTable + .of(type) + .addRows( + (byte) 0, (byte) 1, (byte) 2, + (short) 0, (short) 1, (short) 2, + 0, 1, 2, + 0L, 1L, 2L, + 0.0f, 1.0f, 2.0f, + 0.0, 1.0, 2.0, + BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE), + "a", "b", "c", + false, true + ) + .buildIOReader(pipeline) + .setCoder(new BeamSqlRowCoder(type)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java new file mode 100644 index 0000000..f4416ce --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import org.junit.Test; + +/** + * Integration test for conditional functions. + */ +public class BeamSqlConditionalFunctionsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + @Test + public void testConditionalFunctions() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr( + "CASE 1 WHEN 1 THEN 'hello' ELSE 'world' END", + "hello" + ) + .addExpr( + "CASE 2 " + + "WHEN 1 THEN 'hello' " + + "WHEN 3 THEN 'bond' " + + "ELSE 'world' END", + "world" + ) + .addExpr( + "CASE " + + "WHEN 1 = 1 THEN 'hello' " + + "ELSE 'world' END", + "hello" + ) + .addExpr( + "CASE " + + "WHEN 1 > 1 THEN 'hello' " + + "ELSE 'world' END", + "world" + ) + .addExpr("NULLIF(5, 4) ", 5) + .addExpr("COALESCE(1, 5) ", 1) + .addExpr("COALESCE(NULL, 5) ", 5) + ; + + checker.buildRunAndCheck(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java new file mode 100644 index 0000000..181c991 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Date; +import java.util.Iterator; +import org.apache.beam.sdk.extensions.sql.BeamSql; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Test; + +/** + * Integration test for date functions. + */ +public class BeamSqlDateFunctionsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + @Test public void testDateTimeFunctions() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("EXTRACT(YEAR FROM ts)", 1986L) + .addExpr("YEAR(ts)", 1986L) + .addExpr("QUARTER(ts)", 1L) + .addExpr("MONTH(ts)", 2L) + .addExpr("WEEK(ts)", 7L) + .addExpr("DAYOFMONTH(ts)", 15L) + .addExpr("DAYOFYEAR(ts)", 46L) + .addExpr("DAYOFWEEK(ts)", 7L) + .addExpr("HOUR(ts)", 11L) + .addExpr("MINUTE(ts)", 35L) + .addExpr("SECOND(ts)", 26L) + .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00")) + .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00")) + ; + checker.buildRunAndCheck(); + } + + @Test public void testDateTimeFunctions_currentTime() throws Exception { + String sql = "SELECT " + + "LOCALTIME as l," + + "LOCALTIMESTAMP as l1," + + "CURRENT_DATE as c1," + + "CURRENT_TIME as c2," + + "CURRENT_TIMESTAMP as c3" + + " FROM PCOLLECTION" + ; + PCollection<BeamSqlRow> rows = getTestPCollection().apply( + BeamSql.simpleQuery(sql)); + PAssert.that(rows).satisfies(new Checker()); + pipeline.run(); + } + + private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> { + @Override public Void apply(Iterable<BeamSqlRow> input) { + Iterator<BeamSqlRow> iter = input.iterator(); + assertTrue(iter.hasNext()); + BeamSqlRow row = iter.next(); + // LOCALTIME + Date date = new Date(); + assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000); + assertTrue(date.getTime() - row.getDate(1).getTime() < 1000); + assertTrue(date.getTime() - row.getDate(2).getTime() < 1000); + assertTrue(date.getTime() - row.getGregorianCalendar(3).getTime().getTime() < 1000); + assertTrue(date.getTime() - row.getDate(4).getTime() < 1000); + assertFalse(iter.hasNext()); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java new file mode 100644 index 0000000..b408d78 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import org.junit.Test; + +/** + * Integration test for logical functions. + */ +public class BeamSqlLogicalFunctionsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + @Test + public void testStringFunctions() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_integer = 1 AND c_bigint = 1", true) + .addExpr("c_integer = 1 OR c_bigint = 2", true) + .addExpr("NOT c_bigint = 2", true) + .addExpr("(NOT c_bigint = 2) AND (c_integer = 1 OR c_bigint = 3)", true) + .addExpr("c_integer = 2 AND c_bigint = 1", false) + .addExpr("c_integer = 2 OR c_bigint = 2", false) + .addExpr("NOT c_bigint = 1", false) + .addExpr("(NOT c_bigint = 2) AND (c_integer = 2 OR c_bigint = 3)", false) + ; + + checker.buildRunAndCheck(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java new file mode 100644 index 0000000..995caaf --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import java.math.BigDecimal; +import java.util.Random; +import org.apache.calcite.runtime.SqlFunctions; +import org.junit.Test; + +/** + * Integration test for built-in MATH functions. + */ +public class BeamSqlMathFunctionsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + private static final int INTEGER_VALUE = 1; + private static final long LONG_VALUE = 1L; + private static final short SHORT_VALUE = 1; + private static final byte BYTE_VALUE = 1; + private static final double DOUBLE_VALUE = 1.0; + private static final float FLOAT_VALUE = 1.0f; + private static final BigDecimal DECIMAL_VALUE = new BigDecimal(1); + + @Test + public void testAbs() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("ABS(c_integer)", Math.abs(INTEGER_VALUE)) + .addExpr("ABS(c_bigint)", Math.abs(LONG_VALUE)) + .addExpr("ABS(c_smallint)", (short) Math.abs(SHORT_VALUE)) + .addExpr("ABS(c_tinyint)", (byte) Math.abs(BYTE_VALUE)) + .addExpr("ABS(c_double)", Math.abs(DOUBLE_VALUE)) + .addExpr("ABS(c_float)", Math.abs(FLOAT_VALUE)) + .addExpr("ABS(c_decimal)", new BigDecimal(Math.abs(DECIMAL_VALUE.doubleValue()))) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testSqrt() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("SQRT(c_integer)", Math.sqrt(INTEGER_VALUE)) + .addExpr("SQRT(c_bigint)", Math.sqrt(LONG_VALUE)) + .addExpr("SQRT(c_smallint)", Math.sqrt(SHORT_VALUE)) + .addExpr("SQRT(c_tinyint)", Math.sqrt(BYTE_VALUE)) + .addExpr("SQRT(c_double)", Math.sqrt(DOUBLE_VALUE)) + .addExpr("SQRT(c_float)", Math.sqrt(FLOAT_VALUE)) + .addExpr("SQRT(c_decimal)", Math.sqrt(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testRound() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("ROUND(c_integer, 0)", SqlFunctions.sround(INTEGER_VALUE, 0)) + .addExpr("ROUND(c_bigint, 0)", SqlFunctions.sround(LONG_VALUE, 0)) + .addExpr("ROUND(c_smallint, 0)", (short) SqlFunctions.sround(SHORT_VALUE, 0)) + .addExpr("ROUND(c_tinyint, 0)", (byte) SqlFunctions.sround(BYTE_VALUE, 0)) + .addExpr("ROUND(c_double, 0)", SqlFunctions.sround(DOUBLE_VALUE, 0)) + .addExpr("ROUND(c_float, 0)", (float) SqlFunctions.sround(FLOAT_VALUE, 0)) + .addExpr("ROUND(c_decimal, 0)", + new BigDecimal(SqlFunctions.sround(DECIMAL_VALUE.doubleValue(), 0))) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testLn() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("LN(c_integer)", Math.log(INTEGER_VALUE)) + .addExpr("LN(c_bigint)", Math.log(LONG_VALUE)) + .addExpr("LN(c_smallint)", Math.log(SHORT_VALUE)) + .addExpr("LN(c_tinyint)", Math.log(BYTE_VALUE)) + .addExpr("LN(c_double)", Math.log(DOUBLE_VALUE)) + .addExpr("LN(c_float)", Math.log(FLOAT_VALUE)) + .addExpr("LN(c_decimal)", Math.log(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testLog10() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("LOG10(c_integer)", Math.log10(INTEGER_VALUE)) + .addExpr("LOG10(c_bigint)", Math.log10(LONG_VALUE)) + .addExpr("LOG10(c_smallint)", Math.log10(SHORT_VALUE)) + .addExpr("LOG10(c_tinyint)", Math.log10(BYTE_VALUE)) + .addExpr("LOG10(c_double)", Math.log10(DOUBLE_VALUE)) + .addExpr("LOG10(c_float)", Math.log10(FLOAT_VALUE)) + .addExpr("LOG10(c_decimal)", Math.log10(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testExp() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("EXP(c_integer)", Math.exp(INTEGER_VALUE)) + .addExpr("EXP(c_bigint)", Math.exp(LONG_VALUE)) + .addExpr("EXP(c_smallint)", Math.exp(SHORT_VALUE)) + .addExpr("EXP(c_tinyint)", Math.exp(BYTE_VALUE)) + .addExpr("EXP(c_double)", Math.exp(DOUBLE_VALUE)) + .addExpr("EXP(c_float)", Math.exp(FLOAT_VALUE)) + .addExpr("EXP(c_decimal)", Math.exp(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testAcos() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("ACOS(c_integer)", Math.acos(INTEGER_VALUE)) + .addExpr("ACOS(c_bigint)", Math.acos(LONG_VALUE)) + .addExpr("ACOS(c_smallint)", Math.acos(SHORT_VALUE)) + .addExpr("ACOS(c_tinyint)", Math.acos(BYTE_VALUE)) + .addExpr("ACOS(c_double)", Math.acos(DOUBLE_VALUE)) + .addExpr("ACOS(c_float)", Math.acos(FLOAT_VALUE)) + .addExpr("ACOS(c_decimal)", Math.acos(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testAsin() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("ASIN(c_integer)", Math.asin(INTEGER_VALUE)) + .addExpr("ASIN(c_bigint)", Math.asin(LONG_VALUE)) + .addExpr("ASIN(c_smallint)", Math.asin(SHORT_VALUE)) + .addExpr("ASIN(c_tinyint)", Math.asin(BYTE_VALUE)) + .addExpr("ASIN(c_double)", Math.asin(DOUBLE_VALUE)) + .addExpr("ASIN(c_float)", Math.asin(FLOAT_VALUE)) + .addExpr("ASIN(c_decimal)", Math.asin(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testAtan() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("ATAN(c_integer)", Math.atan(INTEGER_VALUE)) + .addExpr("ATAN(c_bigint)", Math.atan(LONG_VALUE)) + .addExpr("ATAN(c_smallint)", Math.atan(SHORT_VALUE)) + .addExpr("ATAN(c_tinyint)", Math.atan(BYTE_VALUE)) + .addExpr("ATAN(c_double)", Math.atan(DOUBLE_VALUE)) + .addExpr("ATAN(c_float)", Math.atan(FLOAT_VALUE)) + .addExpr("ATAN(c_decimal)", Math.atan(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testCot() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("COT(c_integer)", 1.0d / Math.tan(INTEGER_VALUE)) + .addExpr("COT(c_bigint)", 1.0d / Math.tan(LONG_VALUE)) + .addExpr("COT(c_smallint)", 1.0d / Math.tan(SHORT_VALUE)) + .addExpr("COT(c_tinyint)", 1.0d / Math.tan(BYTE_VALUE)) + .addExpr("COT(c_double)", 1.0d / Math.tan(DOUBLE_VALUE)) + .addExpr("COT(c_float)", 1.0d / Math.tan(FLOAT_VALUE)) + .addExpr("COT(c_decimal)", 1.0d / Math.tan(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testDegrees() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("DEGREES(c_integer)", Math.toDegrees(INTEGER_VALUE)) + .addExpr("DEGREES(c_bigint)", Math.toDegrees(LONG_VALUE)) + .addExpr("DEGREES(c_smallint)", Math.toDegrees(SHORT_VALUE)) + .addExpr("DEGREES(c_tinyint)", Math.toDegrees(BYTE_VALUE)) + .addExpr("DEGREES(c_double)", Math.toDegrees(DOUBLE_VALUE)) + .addExpr("DEGREES(c_float)", Math.toDegrees(FLOAT_VALUE)) + .addExpr("DEGREES(c_decimal)", Math.toDegrees(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testRadians() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("RADIANS(c_integer)", Math.toRadians(INTEGER_VALUE)) + .addExpr("RADIANS(c_bigint)", Math.toRadians(LONG_VALUE)) + .addExpr("RADIANS(c_smallint)", Math.toRadians(SHORT_VALUE)) + .addExpr("RADIANS(c_tinyint)", Math.toRadians(BYTE_VALUE)) + .addExpr("RADIANS(c_double)", Math.toRadians(DOUBLE_VALUE)) + .addExpr("RADIANS(c_float)", Math.toRadians(FLOAT_VALUE)) + .addExpr("RADIANS(c_decimal)", Math.toRadians(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testCos() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("COS(c_integer)", Math.cos(INTEGER_VALUE)) + .addExpr("COS(c_bigint)", Math.cos(LONG_VALUE)) + .addExpr("COS(c_smallint)", Math.cos(SHORT_VALUE)) + .addExpr("COS(c_tinyint)", Math.cos(BYTE_VALUE)) + .addExpr("COS(c_double)", Math.cos(DOUBLE_VALUE)) + .addExpr("COS(c_float)", Math.cos(FLOAT_VALUE)) + .addExpr("COS(c_decimal)", Math.cos(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testSin() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("SIN(c_integer)", Math.sin(INTEGER_VALUE)) + .addExpr("SIN(c_bigint)", Math.sin(LONG_VALUE)) + .addExpr("SIN(c_smallint)", Math.sin(SHORT_VALUE)) + .addExpr("SIN(c_tinyint)", Math.sin(BYTE_VALUE)) + .addExpr("SIN(c_double)", Math.sin(DOUBLE_VALUE)) + .addExpr("SIN(c_float)", Math.sin(FLOAT_VALUE)) + .addExpr("SIN(c_decimal)", Math.sin(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testTan() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("TAN(c_integer)", Math.tan(INTEGER_VALUE)) + .addExpr("TAN(c_bigint)", Math.tan(LONG_VALUE)) + .addExpr("TAN(c_smallint)", Math.tan(SHORT_VALUE)) + .addExpr("TAN(c_tinyint)", Math.tan(BYTE_VALUE)) + .addExpr("TAN(c_double)", Math.tan(DOUBLE_VALUE)) + .addExpr("TAN(c_float)", Math.tan(FLOAT_VALUE)) + .addExpr("TAN(c_decimal)", Math.tan(DECIMAL_VALUE.doubleValue())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testSign() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("SIGN(c_integer)", Integer.signum(INTEGER_VALUE)) + .addExpr("SIGN(c_bigint)", (long) (Long.signum(LONG_VALUE))) + .addExpr("SIGN(c_smallint)", (short) (Integer.signum(SHORT_VALUE))) + .addExpr("SIGN(c_tinyint)", (byte) Integer.signum(BYTE_VALUE)) + .addExpr("SIGN(c_double)", Math.signum(DOUBLE_VALUE)) + .addExpr("SIGN(c_float)", Math.signum(FLOAT_VALUE)) + .addExpr("SIGN(c_decimal)", BigDecimal.valueOf(DECIMAL_VALUE.signum())) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testPower() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("POWER(c_integer, 2)", Math.pow(INTEGER_VALUE, 2)) + .addExpr("POWER(c_bigint, 2)", Math.pow(LONG_VALUE, 2)) + .addExpr("POWER(c_smallint, 2)", Math.pow(SHORT_VALUE, 2)) + .addExpr("POWER(c_tinyint, 2)", Math.pow(BYTE_VALUE, 2)) + .addExpr("POWER(c_double, 2)", Math.pow(DOUBLE_VALUE, 2)) + .addExpr("POWER(c_float, 2)", Math.pow(FLOAT_VALUE, 2)) + .addExpr("POWER(c_decimal, 2)", Math.pow(DECIMAL_VALUE.doubleValue(), 2)) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testPi() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("PI", Math.PI) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testAtan2() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("ATAN2(c_integer, 2)", Math.atan2(INTEGER_VALUE, 2)) + .addExpr("ATAN2(c_bigint, 2)", Math.atan2(LONG_VALUE, 2)) + .addExpr("ATAN2(c_smallint, 2)", Math.atan2(SHORT_VALUE, 2)) + .addExpr("ATAN2(c_tinyint, 2)", Math.atan2(BYTE_VALUE, 2)) + .addExpr("ATAN2(c_double, 2)", Math.atan2(DOUBLE_VALUE, 2)) + .addExpr("ATAN2(c_float, 2)", Math.atan2(FLOAT_VALUE, 2)) + .addExpr("ATAN2(c_decimal, 2)", Math.atan2(DECIMAL_VALUE.doubleValue(), 2)) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testTruncate() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("TRUNCATE(c_integer, 2)", SqlFunctions.struncate(INTEGER_VALUE, 2)) + .addExpr("TRUNCATE(c_bigint, 2)", SqlFunctions.struncate(LONG_VALUE, 2)) + .addExpr("TRUNCATE(c_smallint, 2)", (short) SqlFunctions.struncate(SHORT_VALUE, 2)) + .addExpr("TRUNCATE(c_tinyint, 2)", (byte) SqlFunctions.struncate(BYTE_VALUE, 2)) + .addExpr("TRUNCATE(c_double, 2)", SqlFunctions.struncate(DOUBLE_VALUE, 2)) + .addExpr("TRUNCATE(c_float, 2)", (float) SqlFunctions.struncate(FLOAT_VALUE, 2)) + .addExpr("TRUNCATE(c_decimal, 2)", SqlFunctions.struncate(DECIMAL_VALUE, 2)) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testRand() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("RAND(c_integer)", new Random(INTEGER_VALUE).nextDouble()) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testRandInteger() throws Exception{ + ExpressionChecker checker = new ExpressionChecker() + .addExpr("RAND_INTEGER(c_integer, c_integer)", + new Random(INTEGER_VALUE).nextInt(INTEGER_VALUE)) + ; + + checker.buildRunAndCheck(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java new file mode 100644 index 0000000..7a51a95 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import org.junit.Test; + +/** + * Integration test for string functions. + */ +public class BeamSqlStringFunctionsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + @Test + public void testStringFunctions() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("'hello' || ' world'", "hello world") + .addExpr("CHAR_LENGTH('hello')", 5) + .addExpr("CHARACTER_LENGTH('hello')", 5) + .addExpr("UPPER('hello')", "HELLO") + .addExpr("LOWER('HELLO')", "hello") + + .addExpr("POSITION('world' IN 'helloworld')", 5) + .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10) + .addExpr("TRIM(' hello ')", "hello") + .addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ") + .addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello") + + .addExpr("TRIM(BOTH ' ' FROM ' hello ')", "hello") + .addExpr("OVERLAY('w3333333rce' PLACING 'resou' FROM 3)", "w3resou3rce") + .addExpr("SUBSTRING('hello' FROM 2)", "ello") + .addExpr("SUBSTRING('hello' FROM 2 FOR 2)", "el") + .addExpr("INITCAP('hello world')", "Hello World") + ; + + checker.buildRunAndCheck(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java new file mode 100644 index 0000000..2843e41 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.interpreter; + +import static org.junit.Assert.assertTrue; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression; +import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; +import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.fun.SqlTrimFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit test cases for {@link BeamSqlFnExecutor}. + */ +public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase { + + @Test + public void testBeamFilterRel() { + RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND, + Arrays.asList( + rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, + Arrays.asList(rexBuilder.makeInputRef(relDataType, 0), + rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))), + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + Arrays.asList(rexBuilder.makeInputRef(relDataType, 1), + rexBuilder.makeExactLiteral(new BigDecimal(0)))))); + + BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null, + condition); + + BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel); + executor.prepare(); + + Assert.assertEquals(1, executor.exps.size()); + + BeamSqlExpression l1Exp = executor.exps.get(0); + assertTrue(l1Exp instanceof BeamSqlAndExpression); + Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType()); + + Assert.assertEquals(2, l1Exp.getOperands().size()); + BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0); + BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1); + + assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression); + assertTrue(l1Right instanceof BeamSqlEqualsExpression); + + Assert.assertEquals(2, l1Left.getOperands().size()); + BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0); + BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1); + assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression); + assertTrue(l1LeftRight instanceof BeamSqlPrimitive); + + Assert.assertEquals(2, l1Right.getOperands().size()); + BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0); + BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1); + assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression); + assertTrue(l1RightRight instanceof BeamSqlPrimitive); + } + + @Test + public void testBeamProjectRel() { + BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(), + relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(), + rexBuilder.identityProjects(relDataType), relDataType); + BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode); + + executor.prepare(); + Assert.assertEquals(4, executor.exps.size()); + assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression); + assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression); + assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression); + assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression); + } + + + @Test + public void testBuildExpression_logical() { + RexNode rexNode; + BeamSqlExpression exp; + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND, + Arrays.asList( + rexBuilder.makeLiteral(true), + rexBuilder.makeLiteral(false) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlAndExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR, + Arrays.asList( + rexBuilder.makeLiteral(true), + rexBuilder.makeLiteral(false) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlOrExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT, + Arrays.asList( + rexBuilder.makeLiteral(true) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlNotExpression); + } + + @Test(expected = IllegalStateException.class) + public void testBuildExpression_logical_andOr_invalidOperand() { + RexNode rexNode; + BeamSqlExpression exp; + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND, + Arrays.asList( + rexBuilder.makeLiteral(true), + rexBuilder.makeLiteral("hello") + ) + ); + BeamSqlFnExecutor.buildExpression(rexNode); + } + + @Test(expected = IllegalStateException.class) + public void testBuildExpression_logical_not_invalidOperand() { + RexNode rexNode; + BeamSqlExpression exp; + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT, + Arrays.asList( + rexBuilder.makeLiteral("hello") + ) + ); + BeamSqlFnExecutor.buildExpression(rexNode); + } + + + @Test(expected = IllegalStateException.class) + public void testBuildExpression_logical_not_invalidOperandCount() { + RexNode rexNode; + BeamSqlExpression exp; + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT, + Arrays.asList( + rexBuilder.makeLiteral(true), + rexBuilder.makeLiteral(true) + ) + ); + BeamSqlFnExecutor.buildExpression(rexNode); + } + + @Test + public void testBuildExpression_arithmetic() { + testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class); + testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class); + testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, BeamSqlMultiplyExpression.class); + testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, BeamSqlDivideExpression.class); + testBuildArithmeticExpression(SqlStdOperatorTable.MOD, BeamSqlModExpression.class); + } + + private void testBuildArithmeticExpression(SqlOperator fn, + Class<? extends BeamSqlExpression> clazz) { + RexNode rexNode; + BeamSqlExpression exp; + rexNode = rexBuilder.makeCall(fn, Arrays.asList( + rexBuilder.makeBigintLiteral(new BigDecimal(1L)), + rexBuilder.makeBigintLiteral(new BigDecimal(1L)) + )); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + + assertTrue(exp.getClass().equals(clazz)); + } + + @Test + public void testBuildExpression_string() { + RexNode rexNode; + BeamSqlExpression exp; + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT, + Arrays.asList( + rexBuilder.makeLiteral("hello "), + rexBuilder.makeLiteral("world") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlConcatExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION, + Arrays.asList( + rexBuilder.makeLiteral("hello"), + rexBuilder.makeLiteral("worldhello") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlPositionExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION, + Arrays.asList( + rexBuilder.makeLiteral("hello"), + rexBuilder.makeLiteral("worldhello"), + rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER), + rexBuilder.makeBigintLiteral(BigDecimal.ONE)) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlPositionExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH, + Arrays.asList( + rexBuilder.makeLiteral("hello") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlCharLengthExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER, + Arrays.asList( + rexBuilder.makeLiteral("hello") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlUpperExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER, + Arrays.asList( + rexBuilder.makeLiteral("HELLO") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlLowerExpression); + + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP, + Arrays.asList( + rexBuilder.makeLiteral("hello") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlInitCapExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM, + Arrays.asList( + rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH), + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeLiteral("HELLO") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlTrimExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING, + Arrays.asList( + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeBigintLiteral(BigDecimal.ZERO) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlSubstringExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING, + Arrays.asList( + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeBigintLiteral(BigDecimal.ZERO), + rexBuilder.makeBigintLiteral(BigDecimal.ZERO) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlSubstringExpression); + + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY, + Arrays.asList( + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeBigintLiteral(BigDecimal.ZERO) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlOverlayExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY, + Arrays.asList( + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeBigintLiteral(BigDecimal.ZERO), + rexBuilder.makeBigintLiteral(BigDecimal.ZERO) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlOverlayExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE, + Arrays.asList( + rexBuilder.makeLiteral(true), + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeLiteral("HELLO") + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlCaseExpression); + } + + @Test + public void testBuildExpression_date() { + RexNode rexNode; + BeamSqlExpression exp; + Calendar calendar = Calendar.getInstance(); + calendar.setTimeZone(TimeZone.getTimeZone("GMT")); + calendar.setTime(new Date()); + + // CEIL + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL, + Arrays.asList( + rexBuilder.makeDateLiteral(calendar), + rexBuilder.makeFlag(TimeUnitRange.MONTH) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlDateCeilExpression); + + // FLOOR + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR, + Arrays.asList( + rexBuilder.makeDateLiteral(calendar), + rexBuilder.makeFlag(TimeUnitRange.MONTH) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlDateFloorExpression); + + // EXTRACT == EXTRACT_DATE? + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT, + Arrays.asList( + rexBuilder.makeFlag(TimeUnitRange.MONTH), + rexBuilder.makeDateLiteral(calendar) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlExtractExpression); + + // CURRENT_DATE + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE, + Arrays.<RexNode>asList( + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlCurrentDateExpression); + + // LOCALTIME + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME, + Arrays.<RexNode>asList( + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlCurrentTimeExpression); + + // LOCALTIMESTAMP + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP, + Arrays.<RexNode>asList( + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlCurrentTimestampExpression); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java new file mode 100644 index 0000000..c6478a6 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.interpreter; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.planner.BeamRelDataTypeSystem; +import org.apache.beam.sdk.extensions.sql.planner.BeamRuleSets; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelBuilder; +import org.junit.BeforeClass; + +/** + * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link BeamSqlExpression}. + */ +public class BeamSqlFnExecutorTestBase { + public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY); + public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder); + + public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT); + public static RelDataType relDataType; + + public static BeamSqlRowType beamRowType; + public static BeamSqlRow record; + + public static RelBuilder relBuilder; + + @BeforeClass + public static void prepare() { + relDataType = TYPE_FACTORY.builder() + .add("order_id", SqlTypeName.BIGINT) + .add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE) + .add("order_time", SqlTypeName.BIGINT).build(); + + beamRowType = CalciteUtils.toBeamRowType(relDataType); + record = new BeamSqlRow(beamRowType); + + record.addField(0, 1234567L); + record.addField(1, 0); + record.addField(2, 8.9); + record.addField(3, 1234567L); + + SchemaPlus schema = Frameworks.createRootSchema(true); + final List<RelTraitDef> traitDefs = new ArrayList<>(); + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) + .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) + .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build(); + + relBuilder = RelBuilder.create(config); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java new file mode 100644 index 0000000..7bfbe20 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.interpreter.operator; + +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for {@link BeamSqlIsNullExpression} and + * {@link BeamSqlIsNotNullExpression}. + */ +public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase { + + @Test + public void testIsNull() { + BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression( + new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0)); + Assert.assertEquals(false, exp1.evaluate(record).getValue()); + + BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression( + BeamSqlPrimitive.of(SqlTypeName.BIGINT, null)); + Assert.assertEquals(true, exp2.evaluate(record).getValue()); + } + + @Test + public void testIsNotNull() { + BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression( + new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0)); + Assert.assertEquals(true, exp1.evaluate(record).getValue()); + + BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression( + BeamSqlPrimitive.of(SqlTypeName.BIGINT, null)); + Assert.assertEquals(false, exp2.evaluate(record).getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java new file mode 100644 index 0000000..b6f65a1 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.interpreter.operator; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}. + */ +public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase { + + @Test + public void testAnd() { + List<BeamSqlExpression> operands = new ArrayList<>(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + + Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue()); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + + Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue()); + } + + @Test + public void testOr() { + List<BeamSqlExpression> operands = new ArrayList<>(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + + Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue()); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + + Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue()); + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java new file mode 100644 index 0000000..28ed920 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Test for BeamSqlCaseExpression. + */ +public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase { + + @Test public void accept() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertTrue(new BeamSqlCaseExpression(operands).accept()); + + // even param count + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertFalse(new BeamSqlCaseExpression(operands).accept()); + + // `when` type error + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertFalse(new BeamSqlCaseExpression(operands).accept()); + + // `then` type mixing + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + assertFalse(new BeamSqlCaseExpression(operands).accept()); + + } + + @Test public void evaluate() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertEquals("hello", new BeamSqlCaseExpression(operands) + .evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertEquals("world", new BeamSqlCaseExpression(operands) + .evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertEquals("hello1", new BeamSqlCaseExpression(operands) + .evaluate(record).getValue()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java new file mode 100644 index 0000000..feefc45 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for {@link BeamSqlCastExpression}. + */ +public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase { + + private List<BeamSqlExpression> operands; + + @Before + public void setup() { + operands = new ArrayList<>(); + } + + @Test + public void testForOperands() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa")); + Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept()); + } + + @Test + public void testForIntegerToBigintTypeCasting() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); + Assert.assertEquals(5L, + new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong()); + } + + @Test + public void testForDoubleToBigIntCasting() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45)); + Assert.assertEquals(5L, + new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong()); + } + + @Test + public void testForIntegerToDateCast() { + // test for yyyyMMdd format + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521)); + Assert.assertEquals(Date.valueOf("2017-05-21"), + new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); + } + + @Test + public void testyyyyMMddDateFormat() { + //test for yyyy-MM-dd format + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21")); + Assert.assertEquals(Date.valueOf("2017-05-21"), + new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); + } + + @Test + public void testyyMMddDateFormat() { + // test for yy.MM.dd format + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21")); + Assert.assertEquals(Date.valueOf("2017-05-21"), + new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); + } + + @Test + public void testForTimestampCastExpression() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989")); + Assert.assertEquals(SqlTypeName.TIMESTAMP, + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record) + .getOutputType()); + } + + @Test + public void testDateTimeFormatWithMillis() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989")); + Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + + @Test + public void testDateTimeFormatWithTimezone() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST")); + Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + + @Test + public void testDateTimeFormat() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59")); + Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + + @Test(expected = RuntimeException.class) + public void testForCastTypeNotSupported() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime())); + Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + +}
