http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java deleted file mode 100644 index b5c861a..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Utility classes. - */ -package org.apache.beam.dsls.sql.utils;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/resources/log4j.properties b/dsls/sql/src/main/resources/log4j.properties deleted file mode 100644 index 709484b..0000000 --- a/dsls/sql/src/main/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=ERROR,console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java deleted file mode 100644 index 922931c..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql; - -import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableSet; -import java.util.Set; -import org.apache.beam.sdk.util.ApiSurface; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Surface test for BeamSql api. - */ -@RunWith(JUnit4.class) -public class BeamSqlApiSurfaceTest { - @Test - public void testSdkApiSurface() throws Exception { - - @SuppressWarnings("unchecked") - final Set<String> allowed = - ImmutableSet.of( - "org.apache.beam", - "org.joda.time", - "org.apache.commons.csv"); - - ApiSurface surface = ApiSurface - .ofClass(BeamSqlCli.class) - .includingClass(BeamSql.class) - .includingClass(BeamSqlEnv.class) - .includingPackage("org.apache.beam.dsls.sql.schema", - getClass().getClassLoader()) - .pruningPrefix("java") - .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*Test") - .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*TestBase"); - - assertThat(surface, containsOnlyPackages(allowed)); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java deleted file mode 100644 index a142514..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.junit.Test; - -/** - * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window - * with BOUNDED PCollection. - */ -public class BeamSqlDslAggregationTest extends BeamSqlDslBase { - /** - * GROUP-BY with single aggregation function with bounded PCollection. - */ - @Test - public void testAggregationWithoutWindowWithBounded() throws Exception { - runAggregationWithoutWindow(boundedInput1); - } - - /** - * GROUP-BY with single aggregation function with unbounded PCollection. - */ - @Test - public void testAggregationWithoutWindowWithUnbounded() throws Exception { - runAggregationWithoutWindow(unboundedInput1); - } - - private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; - - PCollection<BeamSqlRow> result = - input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int2", 0); - record.addField("size", 4L); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with multiple aggregation functions with bounded PCollection. - */ - @Test - public void testAggregationFunctionsWithBounded() throws Exception{ - runAggregationFunctions(boundedInput1); - } - - /** - * GROUP-BY with multiple aggregation functions with unbounded PCollection. - */ - @Test - public void testAggregationFunctionsWithUnbounded() throws Exception{ - runAggregationFunctions(unboundedInput1); - } - - private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{ - String sql = "select f_int2, count(*) as size, " - + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1," - + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2," - + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3," - + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4," - + "sum(f_double) as sum5, avg(f_double) as avg5, " - + "max(f_double) as max5, min(f_double) as min5," - + "max(f_timestamp) as max6, min(f_timestamp) as min6 " - + "FROM TABLE_A group by f_int2"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testAggregationFunctions", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", - "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", - "max5", "min5", "max6", "min6"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, - Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT, - Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, - Types.TIMESTAMP, Types.TIMESTAMP)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int2", 0); - record.addField("size", 4L); - - record.addField("sum1", 10000L); - record.addField("avg1", 2500L); - record.addField("max1", 4000L); - record.addField("min1", 1000L); - - record.addField("sum2", (short) 10); - record.addField("avg2", (short) 2); - record.addField("max2", (short) 4); - record.addField("min2", (short) 1); - - record.addField("sum3", (byte) 10); - record.addField("avg3", (byte) 2); - record.addField("max3", (byte) 4); - record.addField("min3", (byte) 1); - - record.addField("sum4", 10.0F); - record.addField("avg4", 2.5F); - record.addField("max4", 4.0F); - record.addField("min4", 1.0F); - - record.addField("sum5", 10.0); - record.addField("avg5", 2.5); - record.addField("max5", 4.0); - record.addField("min5", 1.0); - - record.addField("max6", FORMAT.parse("2017-01-01 02:04:03")); - record.addField("min6", FORMAT.parse("2017-01-01 01:01:03")); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * Implicit GROUP-BY with DISTINCT with bounded PCollection. - */ - @Test - public void testDistinctWithBounded() throws Exception { - runDistinct(boundedInput1); - } - - /** - * Implicit GROUP-BY with DISTINCT with unbounded PCollection. - */ - @Test - public void testDistinctWithUnbounded() throws Exception { - runDistinct(unboundedInput1); - } - - private void runDistinct(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; - - PCollection<BeamSqlRow> result = - input.apply("testDistinct", BeamSql.simpleQuery(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int", 1); - record1.addField("f_long", 1000L); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int", 2); - record2.addField("f_long", 2000L); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int", 3); - record3.addField("f_long", 3000L); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int", 4); - record4.addField("f_long", 4000L); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection. - */ - @Test - public void testTumbleWindowWithBounded() throws Exception { - runTumbleWindow(boundedInput1); - } - - /** - * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection. - */ - @Test - public void testTumbleWindowWithUnbounded() throws Exception { - runTumbleWindow(unboundedInput1); - } - - private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," - + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`" - + " FROM TABLE_A" - + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testTumbleWindow", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "window_start"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); - record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); - record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 1L); - record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); - record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); - - PAssert.that(result).containsInAnyOrder(record1, record2); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection. - */ - @Test - public void testHopWindowWithBounded() throws Exception { - runHopWindow(boundedInput1); - } - - /** - * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection. - */ - @Test - public void testHopWindowWithUnbounded() throws Exception { - runHopWindow(unboundedInput1); - } - - private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," - + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`" - + " FROM PCOLLECTION" - + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; - PCollection<BeamSqlRow> result = - input.apply("testHopWindow", BeamSql.simpleQuery(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "window_start"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00")); - record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime())); - record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 3L); - record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); - record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); - record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int2", 0); - record3.addField("size", 1L); - record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00")); - record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); - record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime())); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int2", 0); - record4.addField("size", 1L); - record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); - record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with SESSION window with bounded PCollection. - */ - @Test - public void testSessionWindowWithBounded() throws Exception { - runSessionWindow(boundedInput1); - } - - /** - * GROUP-BY with SESSION window with unbounded PCollection. - */ - @Test - public void testSessionWindowWithUnbounded() throws Exception { - runSessionWindow(unboundedInput1); - } - - private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," - + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`" - + " FROM TABLE_A" - + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testSessionWindow", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "window_start"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03")); - record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime())); - record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime())); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 1L); - record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03")); - record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime())); - record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime())); - - PAssert.that(result).containsInAnyOrder(record1, record2); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testWindowOnNonTimestampField() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage( - "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " - + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) - .apply("testWindowOnNonTimestampField", BeamSql.query(sql)); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testUnsupportedDistinct() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Encountered \"*\""); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2"; - - PCollection<BeamSqlRow> result = - boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); - - pipeline.run().waitUntilFinish(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java deleted file mode 100644 index a5d92e7..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.math.BigDecimal; -import java.sql.Types; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.ExpectedException; - -/** - * prepare input records to test {@link BeamSql}. - * - * <p>Note that, any change in these records would impact tests in this package. - * - */ -public class BeamSqlDslBase { - public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - @Rule - public ExpectedException exceptions = ExpectedException.none(); - - public static BeamSqlRowType rowTypeInTableA; - public static List<BeamSqlRow> recordsInTableA; - - //bounded PCollections - public PCollection<BeamSqlRow> boundedInput1; - public PCollection<BeamSqlRow> boundedInput2; - - //unbounded PCollections - public PCollection<BeamSqlRow> unboundedInput1; - public PCollection<BeamSqlRow> unboundedInput2; - - @BeforeClass - public static void prepareClass() throws ParseException { - rowTypeInTableA = BeamSqlRowType.create( - Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string", - "f_timestamp", "f_int2", "f_decimal"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT, - Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL)); - - recordsInTableA = prepareInputRowsInTableA(); - } - - @Before - public void preparePCollections(){ - boundedInput1 = PBegin.in(pipeline).apply("boundedInput1", - Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); - - boundedInput2 = PBegin.in(pipeline).apply("boundedInput2", - Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); - - unboundedInput1 = prepareUnboundedPCollection1(); - unboundedInput2 = prepareUnboundedPCollection2(); - } - - private PCollection<BeamSqlRow> prepareUnboundedPCollection1() { - TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(rowTypeInTableA)); - - for (BeamSqlRow row : recordsInTableA) { - values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); - values = values.addElements(row); - } - - return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity()); - } - - private PCollection<BeamSqlRow> prepareUnboundedPCollection2() { - TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(rowTypeInTableA)); - - BeamSqlRow row = recordsInTableA.get(0); - values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); - values = values.addElements(row); - - return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity()); - } - - private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{ - List<BeamSqlRow> rows = new ArrayList<>(); - - BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA); - row1.addField(0, 1); - row1.addField(1, 1000L); - row1.addField(2, Short.valueOf("1")); - row1.addField(3, Byte.valueOf("1")); - row1.addField(4, 1.0f); - row1.addField(5, 1.0); - row1.addField(6, "string_row1"); - row1.addField(7, FORMAT.parse("2017-01-01 01:01:03")); - row1.addField(8, 0); - row1.addField(9, new BigDecimal(1)); - rows.add(row1); - - BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA); - row2.addField(0, 2); - row2.addField(1, 2000L); - row2.addField(2, Short.valueOf("2")); - row2.addField(3, Byte.valueOf("2")); - row2.addField(4, 2.0f); - row2.addField(5, 2.0); - row2.addField(6, "string_row2"); - row2.addField(7, FORMAT.parse("2017-01-01 01:02:03")); - row2.addField(8, 0); - row2.addField(9, new BigDecimal(2)); - rows.add(row2); - - BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA); - row3.addField(0, 3); - row3.addField(1, 3000L); - row3.addField(2, Short.valueOf("3")); - row3.addField(3, Byte.valueOf("3")); - row3.addField(4, 3.0f); - row3.addField(5, 3.0); - row3.addField(6, "string_row3"); - row3.addField(7, FORMAT.parse("2017-01-01 01:06:03")); - row3.addField(8, 0); - row3.addField(9, new BigDecimal(3)); - rows.add(row3); - - BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA); - row4.addField(0, 4); - row4.addField(1, 4000L); - row4.addField(2, Short.valueOf("4")); - row4.addField(3, Byte.valueOf("4")); - row4.addField(4, 4.0f); - row4.addField(5, 4.0); - row4.addField(6, "string_row4"); - row4.addField(7, FORMAT.parse("2017-01-01 02:04:03")); - row4.addField(8, 0); - row4.addField(9, new BigDecimal(4)); - rows.add(row4); - - return rows; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java deleted file mode 100644 index b4b50c1..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Test; - -/** - * Tests for WHERE queries with BOUNDED PCollection. - */ -public class BeamSqlDslFilterTest extends BeamSqlDslBase { - /** - * single filter with bounded PCollection. - */ - @Test - public void testSingleFilterWithBounded() throws Exception { - runSingleFilter(boundedInput1); - } - - /** - * single filter with unbounded PCollection. - */ - @Test - public void testSingleFilterWithUnbounded() throws Exception { - runSingleFilter(unboundedInput1); - } - - private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; - - PCollection<BeamSqlRow> result = - input.apply("testSingleFilter", BeamSql.simpleQuery(sql)); - - PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); - - pipeline.run().waitUntilFinish(); - } - - /** - * composite filters with bounded PCollection. - */ - @Test - public void testCompositeFilterWithBounded() throws Exception { - runCompositeFilter(boundedInput1); - } - - /** - * composite filters with unbounded PCollection. - */ - @Test - public void testCompositeFilterWithUnbounded() throws Exception { - runCompositeFilter(unboundedInput1); - } - - private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM TABLE_A" - + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testCompositeFilter", BeamSql.query(sql)); - - PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2)); - - pipeline.run().waitUntilFinish(); - } - - /** - * nothing return with filters in bounded PCollection. - */ - @Test - public void testNoReturnFilterWithBounded() throws Exception { - runNoReturnFilter(boundedInput1); - } - - /** - * nothing return with filters in unbounded PCollection. - */ - @Test - public void testNoReturnFilterWithUnbounded() throws Exception { - runNoReturnFilter(unboundedInput1); - } - - private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM TABLE_A WHERE f_int < 1"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testNoReturnFilter", BeamSql.query(sql)); - - PAssert.that(result).empty(); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testFromInvalidTableName1() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Object 'TABLE_B' not found"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT * FROM TABLE_B WHERE f_int < 1"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) - .apply("testFromInvalidTableName1", BeamSql.query(sql)); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testFromInvalidTableName2() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Use fixed table name PCOLLECTION"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT * FROM PCOLLECTION_NA"; - - PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testInvalidFilter() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Column 'f_int_na' not found in any table"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0"; - - PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); - - pipeline.run().waitUntilFinish(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java deleted file mode 100644 index e010915..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql; - -import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1; -import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2; - -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Rule; -import org.junit.Test; - -/** - * Tests for joins in queries. - */ -public class BeamSqlDslJoinTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - private static final BeamSqlRowType SOURCE_RECORD_TYPE = - BeamSqlRowType.create( - Arrays.asList( - "order_id", "site_id", "price" - ), - Arrays.asList( - Types.INTEGER, Types.INTEGER, Types.INTEGER - ) - ); - - private static final BeamSqlRowCoder SOURCE_CODER = - new BeamSqlRowCoder(SOURCE_RECORD_TYPE); - - private static final BeamSqlRowType RESULT_RECORD_TYPE = - BeamSqlRowType.create( - Arrays.asList( - "order_id", "site_id", "price", "order_id0", "site_id0", "price0" - ), - Arrays.asList( - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER - , Types.INTEGER, Types.INTEGER - ) - ); - - private static final BeamSqlRowCoder RESULT_CODER = - new BeamSqlRowCoder(RESULT_RECORD_TYPE); - - @Test - public void testInnerJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 2, 3, 3, 1, 2, 3 - ).getRows()); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " LEFT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 1, 2, 3, null, null, null, - 2, 3, 3, 1, 2, 3, - 3, 4, 5, null, null, null - ).getRows()); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 2, 3, 3, 1, 2, 3, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test - public void testFullOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " FULL OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 2, 3, 3, 1, 2, 3, - 1, 2, 3, null, null, null, - 3, 4, 5, null, null, null, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test(expected = IllegalStateException.class) - public void testException_nonEqualJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id>o2.site_id" - ; - - pipeline.enableAbandonedNodeEnforcement(false); - queryFromOrderTables(sql); - pipeline.run(); - } - - @Test(expected = IllegalStateException.class) - public void testException_crossJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; - - pipeline.enableAbandonedNodeEnforcement(false); - queryFromOrderTables(sql); - pipeline.run(); - } - - private PCollection<BeamSqlRow> queryFromOrderTables(String sql) { - return PCollectionTuple - .of( - new TupleTag<BeamSqlRow>("ORDER_DETAILS1"), - ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER) - ) - .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"), - ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER) - ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java deleted file mode 100644 index ab5a639..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Test; - -/** - * Tests for field-project in queries with BOUNDED PCollection. - */ -public class BeamSqlDslProjectTest extends BeamSqlDslBase { - /** - * select all fields with bounded PCollection. - */ - @Test - public void testSelectAllWithBounded() throws Exception { - runSelectAll(boundedInput2); - } - - /** - * select all fields with unbounded PCollection. - */ - @Test - public void testSelectAllWithUnbounded() throws Exception { - runSelectAll(unboundedInput2); - } - - private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM PCOLLECTION"; - - PCollection<BeamSqlRow> result = - input.apply("testSelectAll", BeamSql.simpleQuery(sql)); - - PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); - - pipeline.run().waitUntilFinish(); - } - - /** - * select partial fields with bounded PCollection. - */ - @Test - public void testPartialFieldsWithBounded() throws Exception { - runPartialFields(boundedInput2); - } - - /** - * select partial fields with unbounded PCollection. - */ - @Test - public void testPartialFieldsWithUnbounded() throws Exception { - runPartialFields(unboundedInput2); - } - - private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int, f_long FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testPartialFields", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * select partial fields for multiple rows with bounded PCollection. - */ - @Test - public void testPartialFieldsInMultipleRowWithBounded() throws Exception { - runPartialFieldsInMultipleRow(boundedInput1); - } - - /** - * select partial fields for multiple rows with unbounded PCollection. - */ - @Test - public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception { - runPartialFieldsInMultipleRow(unboundedInput1); - } - - private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int, f_long FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); - record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); - record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); - record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * select partial fields with bounded PCollection. - */ - @Test - public void testPartialFieldsInRowsWithBounded() throws Exception { - runPartialFieldsInRows(boundedInput1); - } - - /** - * select partial fields with unbounded PCollection. - */ - @Test - public void testPartialFieldsInRowsWithUnbounded() throws Exception { - runPartialFieldsInRows(unboundedInput1); - } - - private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int, f_long FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testPartialFieldsInRows", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); - record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); - record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); - record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * select literal field with bounded PCollection. - */ - @Test - public void testLiteralFieldWithBounded() throws Exception { - runLiteralField(boundedInput2); - } - - /** - * select literal field with unbounded PCollection. - */ - @Test - public void testLiteralFieldWithUnbounded() throws Exception { - runLiteralField(unboundedInput2); - } - - public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT 1 as literal_field FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testLiteralField", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"), - Arrays.asList(Types.INTEGER)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("literal_field", 1); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testProjectUnknownField() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Column 'f_int_na' not found in any table"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT f_int_na FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) - .apply("testProjectUnknownField", BeamSql.query(sql)); - - pipeline.run().waitUntilFinish(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java deleted file mode 100644 index 726f658..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.sql.Types; -import java.util.Arrays; -import java.util.Iterator; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; -import org.apache.beam.dsls.sql.schema.BeamSqlUdf; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Test; - -/** - * Tests for UDF/UDAF. - */ -public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { - /** - * GROUP-BY with UDAF. - */ - @Test - public void testUdaf() throws Exception { - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"), - Arrays.asList(Types.INTEGER, Types.INTEGER)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int2", 0); - record.addField("squaresum", 30); - - String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`" - + " FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result1 = - boundedInput1.apply("testUdaf1", - BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class)); - PAssert.that(result1).containsInAnyOrder(record); - - String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`" - + " FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) - .apply("testUdaf2", - BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class)); - PAssert.that(result2).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * test UDF. - */ - @Test - public void testUdf() throws Exception{ - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"), - Arrays.asList(Types.INTEGER, Types.INTEGER)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int", 2); - record.addField("cubicvalue", 8); - - String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; - PCollection<BeamSqlRow> result1 = - boundedInput1.apply("testUdf1", - BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class)); - PAssert.that(result1).containsInAnyOrder(record); - - String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; - PCollection<BeamSqlRow> result2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) - .apply("testUdf2", - BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class)); - PAssert.that(result2).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * UDAF for test, which returns the sum of square. - */ - public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> { - - public SquareSum() { - } - - @Override - public Integer init() { - return 0; - } - - @Override - public Integer add(Integer accumulator, Integer input) { - return accumulator + input * input; - } - - @Override - public Integer merge(Iterable<Integer> accumulators) { - int v = 0; - Iterator<Integer> ite = accumulators.iterator(); - while (ite.hasNext()) { - v += ite.next(); - } - return v; - } - - @Override - public Integer result(Integer accumulator) { - return accumulator; - } - - } - - /** - * A example UDF for test. - */ - public static class CubicInteger implements BeamSqlUdf{ - public static Integer eval(Integer input){ - return input * input * input; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java deleted file mode 100644 index a669635..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * Test utilities. - */ -public class TestUtils { - /** - * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. - */ - public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> { - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().valueInString()); - } - } - - /** - * Convert list of {@code BeamSqlRow} to list of {@code String}. - */ - public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) { - List<String> strs = new ArrayList<>(); - for (BeamSqlRow row : rows) { - strs.add(row.valueInString()); - } - - return strs; - } - - /** - * Convenient way to build a list of {@code BeamSqlRow}s. - * - * <p>You can use it like this: - * - * <pre>{@code - * TestUtils.RowsBuilder.of( - * Types.INTEGER, "order_id", - * Types.INTEGER, "sum_site_id", - * Types.VARCHAR, "buyer" - * ).addRows( - * 1, 3, "james", - * 2, 5, "bond" - * ).getStringRows() - * }</pre> - * {@code} - */ - public static class RowsBuilder { - private BeamSqlRowType type; - private List<BeamSqlRow> rows = new ArrayList<>(); - - /** - * Create a RowsBuilder with the specified row type info. - * - * <p>For example: - * <pre>{@code - * TestUtils.RowsBuilder.of( - * Types.INTEGER, "order_id", - * Types.INTEGER, "sum_site_id", - * Types.VARCHAR, "buyer" - * )}</pre> - * - * @args pairs of column type and column names. - */ - public static RowsBuilder of(final Object... args) { - BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args); - RowsBuilder builder = new RowsBuilder(); - builder.type = beamSQLRowType; - - return builder; - } - - /** - * Create a RowsBuilder with the specified row type info. - * - * <p>For example: - * <pre>{@code - * TestUtils.RowsBuilder.of( - * beamSqlRowType - * )}</pre> - * @beamSQLRowType the record type. - */ - public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) { - RowsBuilder builder = new RowsBuilder(); - builder.type = beamSQLRowType; - - return builder; - } - - /** - * Add rows to the builder. - * - * <p>Note: check the class javadoc for for detailed example. - */ - public RowsBuilder addRows(final Object... args) { - this.rows.addAll(buildRows(type, Arrays.asList(args))); - return this; - } - - /** - * Add rows to the builder. - * - * <p>Note: check the class javadoc for for detailed example. - */ - public RowsBuilder addRows(final List args) { - this.rows.addAll(buildRows(type, args)); - return this; - } - - public List<BeamSqlRow> getRows() { - return rows; - } - - public List<String> getStringRows() { - return beamSqlRows2Strings(rows); - } - } - - /** - * Convenient way to build a {@code BeamSqlRowType}. - * - * <p>e.g. - * - * <pre>{@code - * buildBeamSqlRowType( - * Types.BIGINT, "order_id", - * Types.INTEGER, "site_id", - * Types.DOUBLE, "price", - * Types.TIMESTAMP, "order_time" - * ) - * }</pre> - */ - public static BeamSqlRowType buildBeamSqlRowType(Object... args) { - List<Integer> types = new ArrayList<>(); - List<String> names = new ArrayList<>(); - - for (int i = 0; i < args.length - 1; i += 2) { - types.add((int) args[i]); - names.add((String) args[i + 1]); - } - - return BeamSqlRowType.create(names, types); - } - - /** - * Convenient way to build a {@code BeamSqlRow}s. - * - * <p>e.g. - * - * <pre>{@code - * buildRows( - * rowType, - * 1, 1, 1, // the first row - * 2, 2, 2, // the second row - * ... - * ) - * }</pre> - */ - public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) { - List<BeamSqlRow> rows = new ArrayList<>(); - int fieldCount = type.size(); - - for (int i = 0; i < args.size(); i += fieldCount) { - BeamSqlRow row = new BeamSqlRow(type); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args.get(i + j)); - } - rows.add(row); - } - return rows; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java deleted file mode 100644 index 947660a..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.integrationtest; - -import java.math.BigDecimal; -import java.math.RoundingMode; -import org.junit.Test; - -/** - * Integration test for arithmetic operators. - */ -public class BeamSqlArithmeticOperatorsIntegrationTest - extends BeamSqlBuiltinFunctionsIntegrationTestBase { - - private static final BigDecimal ZERO = BigDecimal.valueOf(0.0); - private static final BigDecimal ONE0 = BigDecimal.valueOf(1); - private static final BigDecimal ONE = BigDecimal.valueOf(1.0); - private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0)); - private static final BigDecimal ONE10 = BigDecimal.ONE.divide( - BigDecimal.ONE, 10, RoundingMode.HALF_EVEN); - private static final BigDecimal TWO = BigDecimal.valueOf(2.0); - - @Test - public void testPlus() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 + 1", 2) - .addExpr("1.0 + 1", TWO) - .addExpr("1 + 1.0", TWO) - .addExpr("1.0 + 1.0", TWO) - .addExpr("c_tinyint + c_tinyint", (byte) 2) - .addExpr("c_smallint + c_smallint", (short) 2) - .addExpr("c_bigint + c_bigint", 2L) - .addExpr("c_decimal + c_decimal", TWO) - .addExpr("c_tinyint + c_decimal", TWO) - .addExpr("c_float + c_decimal", 2.0) - .addExpr("c_double + c_decimal", 2.0) - .addExpr("c_float + c_float", 2.0f) - .addExpr("c_double + c_float", 2.0) - .addExpr("c_double + c_double", 2.0) - .addExpr("c_float + c_bigint", 2.0f) - .addExpr("c_double + c_bigint", 2.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testPlus_overflow() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_max + c_tinyint_max", (byte) -2) - .addExpr("c_smallint_max + c_smallint_max", (short) -2) - .addExpr("c_integer_max + c_integer_max", -2) - // yeah, I know 384L is strange, but since it is already overflowed - // what the actualy result is not so important, it is wrong any way. - .addExpr("c_bigint_max + c_bigint_max", 384L) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testMinus() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 - 1", 0) - .addExpr("1.0 - 1", ZERO) - .addExpr("1 - 0.0", ONE) - .addExpr("1.0 - 1.0", ZERO) - .addExpr("c_tinyint - c_tinyint", (byte) 0) - .addExpr("c_smallint - c_smallint", (short) 0) - .addExpr("c_bigint - c_bigint", 0L) - .addExpr("c_decimal - c_decimal", ZERO) - .addExpr("c_tinyint - c_decimal", ZERO) - .addExpr("c_float - c_decimal", 0.0) - .addExpr("c_double - c_decimal", 0.0) - .addExpr("c_float - c_float", 0.0f) - .addExpr("c_double - c_float", 0.0) - .addExpr("c_double - c_double", 0.0) - .addExpr("c_float - c_bigint", 0.0f) - .addExpr("c_double - c_bigint", 0.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testMultiply() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 * 1", 1) - .addExpr("1.0 * 1", ONE2) - .addExpr("1 * 1.0", ONE2) - .addExpr("1.0 * 1.0", ONE2) - .addExpr("c_tinyint * c_tinyint", (byte) 1) - .addExpr("c_smallint * c_smallint", (short) 1) - .addExpr("c_bigint * c_bigint", 1L) - .addExpr("c_decimal * c_decimal", ONE2) - .addExpr("c_tinyint * c_decimal", ONE2) - .addExpr("c_float * c_decimal", 1.0) - .addExpr("c_double * c_decimal", 1.0) - .addExpr("c_float * c_float", 1.0f) - .addExpr("c_double * c_float", 1.0) - .addExpr("c_double * c_double", 1.0) - .addExpr("c_float * c_bigint", 1.0f) - .addExpr("c_double * c_bigint", 1.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testDivide() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 / 1", 1) - .addExpr("1.0 / 1", ONE10) - .addExpr("1 / 1.0", ONE10) - .addExpr("1.0 / 1.0", ONE10) - .addExpr("c_tinyint / c_tinyint", (byte) 1) - .addExpr("c_smallint / c_smallint", (short) 1) - .addExpr("c_bigint / c_bigint", 1L) - .addExpr("c_decimal / c_decimal", ONE10) - .addExpr("c_tinyint / c_decimal", ONE10) - .addExpr("c_float / c_decimal", 1.0) - .addExpr("c_double / c_decimal", 1.0) - .addExpr("c_float / c_float", 1.0f) - .addExpr("c_double / c_float", 1.0) - .addExpr("c_double / c_double", 1.0) - .addExpr("c_float / c_bigint", 1.0f) - .addExpr("c_double / c_bigint", 1.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testMod() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("mod(1, 1)", 0) - .addExpr("mod(1.0, 1)", 0) - .addExpr("mod(1, 1.0)", ZERO) - .addExpr("mod(1.0, 1.0)", ZERO) - .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0) - .addExpr("mod(c_smallint, c_smallint)", (short) 0) - .addExpr("mod(c_bigint, c_bigint)", 0L) - .addExpr("mod(c_decimal, c_decimal)", ZERO) - .addExpr("mod(c_tinyint, c_decimal)", ZERO) - ; - - checker.buildRunAndCheck(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java deleted file mode 100644 index b9ce9b4..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.integrationtest; - -import com.google.common.base.Joiner; -import java.math.BigDecimal; -import java.sql.Types; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import org.apache.beam.dsls.sql.BeamSql; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.util.Pair; -import org.junit.Rule; - -/** - * Base class for all built-in functions integration tests. - */ -public class BeamSqlBuiltinFunctionsIntegrationTestBase { - private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>(); - static { - JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT); - JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT); - JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER); - JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT); - JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT); - JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE); - JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL); - JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR); - JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE); - JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN); - } - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - protected PCollection<BeamSqlRow> getTestPCollection() { - BeamSqlRowType type = BeamSqlRowType.create( - Arrays.asList("ts", "c_tinyint", "c_smallint", - "c_integer", "c_bigint", "c_float", "c_double", "c_decimal", - "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"), - Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT, - Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL, - Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT) - ); - try { - return MockedBoundedTable - .of(type) - .addRows( - parseDate("1986-02-15 11:35:26"), - (byte) 1, - (short) 1, - 1, - 1L, - 1.0f, - 1.0, - BigDecimal.ONE, - (byte) 127, - (short) 32767, - 2147483647, - 9223372036854775807L - ) - .buildIOReader(pipeline) - .setCoder(new BeamSqlRowCoder(type)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - protected static Date parseDate(String str) { - try { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - return sdf.parse(str); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - - /** - * Helper class to make write integration test for built-in functions easier. - * - * <p>example usage: - * <pre>{@code - * ExpressionChecker checker = new ExpressionChecker() - * .addExpr("1 + 1", 2) - * .addExpr("1.0 + 1", 2.0) - * .addExpr("1 + 1.0", 2.0) - * .addExpr("1.0 + 1.0", 2.0) - * .addExpr("c_tinyint + c_tinyint", (byte) 2); - * checker.buildRunAndCheck(inputCollections); - * }</pre> - */ - public class ExpressionChecker { - private transient List<Pair<String, Object>> exps = new ArrayList<>(); - - public ExpressionChecker addExpr(String expression, Object expectedValue) { - exps.add(Pair.of(expression, expectedValue)); - return this; - } - - private String getSql() { - List<String> expStrs = new ArrayList<>(); - for (Pair<String, Object> pair : exps) { - expStrs.add(pair.getKey()); - } - return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION"; - } - - /** - * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result. - */ - public void buildRunAndCheck() { - PCollection<BeamSqlRow> inputCollection = getTestPCollection(); - System.out.println("SQL:>\n" + getSql()); - try { - List<String> names = new ArrayList<>(); - List<Integer> types = new ArrayList<>(); - List<Object> values = new ArrayList<>(); - - for (Pair<String, Object> pair : exps) { - names.add(pair.getKey()); - types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass())); - values.add(pair.getValue()); - } - - PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder - .of(BeamSqlRowType.create(names, types)) - .addRows(values) - .getRows() - ); - inputCollection.getPipeline().run(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java deleted file mode 100644 index 5502ad4..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.integrationtest; - -import java.math.BigDecimal; -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Test; - -/** - * Integration test for comparison operators. - */ -public class BeamSqlComparisonOperatorsIntegrationTest - extends BeamSqlBuiltinFunctionsIntegrationTestBase { - - @Test - public void testEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_1 = c_tinyint_1", true) - .addExpr("c_tinyint_1 = c_tinyint_2", false) - .addExpr("c_smallint_1 = c_smallint_1", true) - .addExpr("c_smallint_1 = c_smallint_2", false) - .addExpr("c_integer_1 = c_integer_1", true) - .addExpr("c_integer_1 = c_integer_2", false) - .addExpr("c_bigint_1 = c_bigint_1", true) - .addExpr("c_bigint_1 = c_bigint_2", false) - .addExpr("c_float_1 = c_float_1", true) - .addExpr("c_float_1 = c_float_2", false) - .addExpr("c_double_1 = c_double_1", true) - .addExpr("c_double_1 = c_double_2", false) - .addExpr("c_decimal_1 = c_decimal_1", true) - .addExpr("c_decimal_1 = c_decimal_2", false) - .addExpr("c_varchar_1 = c_varchar_1", true) - .addExpr("c_varchar_1 = c_varchar_2", false) - .addExpr("c_boolean_true = c_boolean_true", true) - .addExpr("c_boolean_true = c_boolean_false", false) - - ; - checker.buildRunAndCheck(); - } - - @Test - public void testNotEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_1 <> c_tinyint_1", false) - .addExpr("c_tinyint_1 <> c_tinyint_2", true) - .addExpr("c_smallint_1 <> c_smallint_1", false) - .addExpr("c_smallint_1 <> c_smallint_2", true) - .addExpr("c_integer_1 <> c_integer_1", false) - .addExpr("c_integer_1 <> c_integer_2", true) - .addExpr("c_bigint_1 <> c_bigint_1", false) - .addExpr("c_bigint_1 <> c_bigint_2", true) - .addExpr("c_float_1 <> c_float_1", false) - .addExpr("c_float_1 <> c_float_2", true) - .addExpr("c_double_1 <> c_double_1", false) - .addExpr("c_double_1 <> c_double_2", true) - .addExpr("c_decimal_1 <> c_decimal_1", false) - .addExpr("c_decimal_1 <> c_decimal_2", true) - .addExpr("c_varchar_1 <> c_varchar_1", false) - .addExpr("c_varchar_1 <> c_varchar_2", true) - .addExpr("c_boolean_true <> c_boolean_true", false) - .addExpr("c_boolean_true <> c_boolean_false", true) - ; - checker.buildRunAndCheck(); - } - - @Test - public void testGreaterThan() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 > c_tinyint_1", true) - .addExpr("c_tinyint_1 > c_tinyint_1", false) - .addExpr("c_tinyint_1 > c_tinyint_2", false) - - .addExpr("c_smallint_2 > c_smallint_1", true) - .addExpr("c_smallint_1 > c_smallint_1", false) - .addExpr("c_smallint_1 > c_smallint_2", false) - - .addExpr("c_integer_2 > c_integer_1", true) - .addExpr("c_integer_1 > c_integer_1", false) - .addExpr("c_integer_1 > c_integer_2", false) - - .addExpr("c_bigint_2 > c_bigint_1", true) - .addExpr("c_bigint_1 > c_bigint_1", false) - .addExpr("c_bigint_1 > c_bigint_2", false) - - .addExpr("c_float_2 > c_float_1", true) - .addExpr("c_float_1 > c_float_1", false) - .addExpr("c_float_1 > c_float_2", false) - - .addExpr("c_double_2 > c_double_1", true) - .addExpr("c_double_1 > c_double_1", false) - .addExpr("c_double_1 > c_double_2", false) - - .addExpr("c_decimal_2 > c_decimal_1", true) - .addExpr("c_decimal_1 > c_decimal_1", false) - .addExpr("c_decimal_1 > c_decimal_2", false) - - .addExpr("c_varchar_2 > c_varchar_1", true) - .addExpr("c_varchar_1 > c_varchar_1", false) - .addExpr("c_varchar_1 > c_varchar_2", false) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testGreaterThanException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false > c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testGreaterThanOrEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 >= c_tinyint_1", true) - .addExpr("c_tinyint_1 >= c_tinyint_1", true) - .addExpr("c_tinyint_1 >= c_tinyint_2", false) - - .addExpr("c_smallint_2 >= c_smallint_1", true) - .addExpr("c_smallint_1 >= c_smallint_1", true) - .addExpr("c_smallint_1 >= c_smallint_2", false) - - .addExpr("c_integer_2 >= c_integer_1", true) - .addExpr("c_integer_1 >= c_integer_1", true) - .addExpr("c_integer_1 >= c_integer_2", false) - - .addExpr("c_bigint_2 >= c_bigint_1", true) - .addExpr("c_bigint_1 >= c_bigint_1", true) - .addExpr("c_bigint_1 >= c_bigint_2", false) - - .addExpr("c_float_2 >= c_float_1", true) - .addExpr("c_float_1 >= c_float_1", true) - .addExpr("c_float_1 >= c_float_2", false) - - .addExpr("c_double_2 >= c_double_1", true) - .addExpr("c_double_1 >= c_double_1", true) - .addExpr("c_double_1 >= c_double_2", false) - - .addExpr("c_decimal_2 >= c_decimal_1", true) - .addExpr("c_decimal_1 >= c_decimal_1", true) - .addExpr("c_decimal_1 >= c_decimal_2", false) - - .addExpr("c_varchar_2 >= c_varchar_1", true) - .addExpr("c_varchar_1 >= c_varchar_1", true) - .addExpr("c_varchar_1 >= c_varchar_2", false) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testGreaterThanOrEqualsException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false >= c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testLessThan() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 < c_tinyint_1", false) - .addExpr("c_tinyint_1 < c_tinyint_1", false) - .addExpr("c_tinyint_1 < c_tinyint_2", true) - - .addExpr("c_smallint_2 < c_smallint_1", false) - .addExpr("c_smallint_1 < c_smallint_1", false) - .addExpr("c_smallint_1 < c_smallint_2", true) - - .addExpr("c_integer_2 < c_integer_1", false) - .addExpr("c_integer_1 < c_integer_1", false) - .addExpr("c_integer_1 < c_integer_2", true) - - .addExpr("c_bigint_2 < c_bigint_1", false) - .addExpr("c_bigint_1 < c_bigint_1", false) - .addExpr("c_bigint_1 < c_bigint_2", true) - - .addExpr("c_float_2 < c_float_1", false) - .addExpr("c_float_1 < c_float_1", false) - .addExpr("c_float_1 < c_float_2", true) - - .addExpr("c_double_2 < c_double_1", false) - .addExpr("c_double_1 < c_double_1", false) - .addExpr("c_double_1 < c_double_2", true) - - .addExpr("c_decimal_2 < c_decimal_1", false) - .addExpr("c_decimal_1 < c_decimal_1", false) - .addExpr("c_decimal_1 < c_decimal_2", true) - - .addExpr("c_varchar_2 < c_varchar_1", false) - .addExpr("c_varchar_1 < c_varchar_1", false) - .addExpr("c_varchar_1 < c_varchar_2", true) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testLessThanException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false < c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testLessThanOrEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 <= c_tinyint_1", false) - .addExpr("c_tinyint_1 <= c_tinyint_1", true) - .addExpr("c_tinyint_1 <= c_tinyint_2", true) - - .addExpr("c_smallint_2 <= c_smallint_1", false) - .addExpr("c_smallint_1 <= c_smallint_1", true) - .addExpr("c_smallint_1 <= c_smallint_2", true) - - .addExpr("c_integer_2 <= c_integer_1", false) - .addExpr("c_integer_1 <= c_integer_1", true) - .addExpr("c_integer_1 <= c_integer_2", true) - - .addExpr("c_bigint_2 <= c_bigint_1", false) - .addExpr("c_bigint_1 <= c_bigint_1", true) - .addExpr("c_bigint_1 <= c_bigint_2", true) - - .addExpr("c_float_2 <= c_float_1", false) - .addExpr("c_float_1 <= c_float_1", true) - .addExpr("c_float_1 <= c_float_2", true) - - .addExpr("c_double_2 <= c_double_1", false) - .addExpr("c_double_1 <= c_double_1", true) - .addExpr("c_double_1 <= c_double_2", true) - - .addExpr("c_decimal_2 <= c_decimal_1", false) - .addExpr("c_decimal_1 <= c_decimal_1", true) - .addExpr("c_decimal_1 <= c_decimal_2", true) - - .addExpr("c_varchar_2 <= c_varchar_1", false) - .addExpr("c_varchar_1 <= c_varchar_1", true) - .addExpr("c_varchar_1 <= c_varchar_2", true) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testLessThanOrEqualsException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false <= c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testIsNullAndIsNotNull() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 IS NOT NULL", true) - .addExpr("NULL IS NOT NULL", false) - - .addExpr("1 IS NULL", false) - .addExpr("NULL IS NULL", true) - ; - - checker.buildRunAndCheck(); - } - - @Override protected PCollection<BeamSqlRow> getTestPCollection() { - BeamSqlRowType type = BeamSqlRowType.create( - Arrays.asList( - "c_tinyint_0", "c_tinyint_1", "c_tinyint_2", - "c_smallint_0", "c_smallint_1", "c_smallint_2", - "c_integer_0", "c_integer_1", "c_integer_2", - "c_bigint_0", "c_bigint_1", "c_bigint_2", - "c_float_0", "c_float_1", "c_float_2", - "c_double_0", "c_double_1", "c_double_2", - "c_decimal_0", "c_decimal_1", "c_decimal_2", - "c_varchar_0", "c_varchar_1", "c_varchar_2", - "c_boolean_false", "c_boolean_true" - ), - Arrays.asList( - Types.TINYINT, Types.TINYINT, Types.TINYINT, - Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, - Types.INTEGER, Types.INTEGER, Types.INTEGER, - Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.FLOAT, Types.FLOAT, Types.FLOAT, - Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, - Types.DECIMAL, Types.DECIMAL, Types.DECIMAL, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.BOOLEAN, Types.BOOLEAN - ) - ); - try { - return MockedBoundedTable - .of(type) - .addRows( - (byte) 0, (byte) 1, (byte) 2, - (short) 0, (short) 1, (short) 2, - 0, 1, 2, - 0L, 1L, 2L, - 0.0f, 1.0f, 2.0f, - 0.0, 1.0, 2.0, - BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE), - "a", "b", "c", - false, true - ) - .buildIOReader(pipeline) - .setCoder(new BeamSqlRowCoder(type)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -}
