http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java new file mode 100644 index 0000000..1dbd8b4 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsBoundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1); + private static final Duration WINDOW_SIZE = Duration.standardHours(1); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + .of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, + 1, 1, 1, FIRST_DATE, + 1, 2, 2, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardSeconds(1)), + 2, 2, 3, SECOND_DATE, + 2, 3, 3, SECOND_DATE, + // this late data is omitted + 1, 2, 3, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)), + 3, 3, 3, THIRD_DATE, + // this late data is omitted + 2, 2, 3, SECOND_DATE + ) + ); + + beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable + .of(Types.INTEGER, "order_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, "james", + 2, "bond" + )); + } + + @Test + public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond" + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond" + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLeftOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " RIGHT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRightOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testFullOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " FULL OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java new file mode 100644 index 0000000..5e5e416 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; +import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsUnboundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + + private static final Duration WINDOW_SIZE = Duration.standardHours(1); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + .of(Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, + 1, 1, 1, FIRST_DATE, + 1, 2, 6, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardMinutes(1)), + 2, 2, 7, SECOND_DATE, + 2, 3, 8, SECOND_DATE, + // this late record is omitted(First window) + 1, 3, 3, FIRST_DATE + ) + .addRows( + // this late record is omitted(Second window) + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)), + 2, 3, 3, SECOND_DATE + ) + ); + } + + @Test + public void testInnerJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0").addRows( + 1, 3, 1, 3, + 2, 5, 2, 5 + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + // 1, 1 | 1, 3 + // 2, 2 | NULL, NULL + // ---- | ----- + // 2, 2 | 2, 5 + // 3, 3 | NULL, NULL + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).addRows( + 1, 1, 1, 3, + 2, 2, null, null, + 2, 2, 2, 5, + 3, 3, null, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).addRows( + 1, 3, 1, 1, + null, null, 2, 2, + 2, 5, 2, 2, + null, null, 3, 3 + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " FULL OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id1=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id1", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id0" + ).addRows( + 1, 1, 1, 3, + 6, 2, null, null, + 7, 2, null, null, + 8, 3, null, null, + null, null, 2, 5 + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testWindowsMismatch() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java new file mode 100644 index 0000000..9149dd4 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.sql.Types; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamMinusRel}. + */ +public class BeamMinusRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS1", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0, + 4L, 4, 4.0 + ) + ); + + sqlEnv.registerTable("ORDER_DETAILS2", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ) + ); + } + + @Test + public void testExcept() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 4L, 4, 4.0 + ).getRows()); + + pipeline.run(); + } + + @Test + public void testExceptAll() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).satisfies(new CheckSize(2)); + + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 4L, 4, 4.0, + 4L, 4, 4.0 + ).getRows()); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java new file mode 100644 index 0000000..36538c0 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamSetOperatorRelBase}. + */ +public class BeamSetOperatorRelBaseTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + public static final Date THE_DATE = new Date(100000); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price", + Types.TIMESTAMP, "order_time" + ).addRows( + 1L, 1, 1.0, THE_DATE, + 2L, 2, 2.0, THE_DATE + ) + ); + } + + @Test + public void testSameWindow() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + // compare valueInString to ignore the windowStart & windowEnd + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.BIGINT, "cnt" + ).addRows( + 1L, 1, 1L, + 2L, 2, 1L + ).getStringRows()); + pipeline.run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDifferentWindows() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '2' HOUR) "; + + // use a real pipeline rather than the TestPipeline because we are + // testing exceptions, the pipeline will not actually run. + Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); + BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java new file mode 100644 index 0000000..15e3b89 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamSortRel}. + */ +public class BeamSortRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @Before + public void prepare() { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price", + Types.TIMESTAMP, "order_time" + ).addRows( + 1L, 2, 1.0, new Date(), + 1L, 1, 2.0, new Date(), + 2L, 4, 3.0, new Date(), + 2L, 1, 4.0, new Date(), + 5L, 5, 5.0, new Date(), + 6L, 6, 6.0, new Date(), + 7L, 7, 7.0, new Date(), + 8L, 8888, 8.0, new Date(), + 8L, 999, 9.0, new Date(), + 10L, 100, 10.0, new Date() + ) + ); + sqlEnv.registerTable("SUB_ORDER_RAM", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ) + ); + } + + @Test + public void testOrderBy_basic() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, 1, 2.0, + 2L, 4, 3.0, + 2L, 1, 4.0 + ).getRows()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_nullsFirst() throws Exception { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0, + 5L, 5, 5.0 + ) + ); + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price")); + + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, null, 2.0, + 1L, 2, 1.0, + 2L, null, 4.0, + 2L, 1, 3.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_nullsLast() throws Exception { + sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0, + 5L, 5, 5.0)); + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price")); + + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_with_offset() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 5L, 5, 5.0, + 6L, 6, 6.0, + 7L, 7, 7.0, + 8L, 8888, 8.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_bigFetch() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 11"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, 1, 2.0, + 2L, 4, 3.0, + 2L, 1, 4.0, + 5L, 5, 5.0, + 6L, 6, 6.0, + 7L, 7, 7.0, + 8L, 8888, 8.0, + 8L, 999, 9.0, + 10L, 100, 10.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testOrderBy_exception() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT " + + " order_id, COUNT(*) " + + "FROM ORDER_DETAILS " + + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)" + + "ORDER BY order_id asc limit 11"; + + TestPipeline pipeline = TestPipeline.create(); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java new file mode 100644 index 0000000..c232b30 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.sql.Types; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamUnionRel}. + */ +public class BeamUnionRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0 + ) + ); + } + + @Test + public void testUnion() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + " UNION SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getRows() + ); + pipeline.run(); + } + + @Test + public void testUnionAll() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS" + + " UNION ALL " + + " SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 2L, 2, 2.0 + ).getRows() + ); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java new file mode 100644 index 0000000..e5fa864 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.sql.Types; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamValuesRel}. + */ +public class BeamValuesRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("string_table", + MockedBoundedTable.of( + Types.VARCHAR, "name", + Types.VARCHAR, "description" + ) + ); + sqlEnv.registerTable("int_table", + MockedBoundedTable.of( + Types.INTEGER, "c0", + Types.INTEGER, "c1" + ) + ); + } + + @Test + public void testValues() throws Exception { + String sql = "insert into string_table(name, description) values " + + "('hello', 'world'), ('james', 'bond')"; + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.VARCHAR, "name", + Types.VARCHAR, "description" + ).addRows( + "hello", "world", + "james", "bond" + ).getRows() + ); + pipeline.run(); + } + + @Test + public void testValues_castInt() throws Exception { + String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "c0", + Types.INTEGER, "c1" + ).addRows( + 1, 2 + ).getRows() + ); + pipeline.run(); + } + + @Test + public void testValues_onlySelect() throws Exception { + String sql = "select 1, '1'"; + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "EXPR$0", + Types.CHAR, "EXPR$1" + ).addRows( + 1, "1" + ).getRows() + ); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java new file mode 100644 index 0000000..8cdf2cd --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.junit.Assert; + +/** + * Utility class to check size of BeamSQLRow iterable. + */ +public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> { + private int size; + public CheckSize(int size) { + this.size = size; + } + @Override public Void apply(Iterable<BeamSqlRow> input) { + int count = 0; + for (BeamSqlRow row : input) { + count++; + } + Assert.assertEquals(size, count); + return null; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java deleted file mode 100644 index 2843e41..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTest.java +++ /dev/null @@ -1,416 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter; - -import static org.junit.Assert.assertTrue; - -import java.math.BigDecimal; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Date; -import java.util.TimeZone; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression; -import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; -import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel; -import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; -import org.apache.calcite.avatica.util.TimeUnitRange; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.fun.SqlTrimFunction; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; -import org.junit.Test; - -/** - * Unit test cases for {@link BeamSqlFnExecutor}. - */ -public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase { - - @Test - public void testBeamFilterRel() { - RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND, - Arrays.asList( - rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, - Arrays.asList(rexBuilder.makeInputRef(relDataType, 0), - rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))), - rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, - Arrays.asList(rexBuilder.makeInputRef(relDataType, 1), - rexBuilder.makeExactLiteral(new BigDecimal(0)))))); - - BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null, - condition); - - BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel); - executor.prepare(); - - Assert.assertEquals(1, executor.exps.size()); - - BeamSqlExpression l1Exp = executor.exps.get(0); - assertTrue(l1Exp instanceof BeamSqlAndExpression); - Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType()); - - Assert.assertEquals(2, l1Exp.getOperands().size()); - BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0); - BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1); - - assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression); - assertTrue(l1Right instanceof BeamSqlEqualsExpression); - - Assert.assertEquals(2, l1Left.getOperands().size()); - BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0); - BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1); - assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression); - assertTrue(l1LeftRight instanceof BeamSqlPrimitive); - - Assert.assertEquals(2, l1Right.getOperands().size()); - BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0); - BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1); - assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression); - assertTrue(l1RightRight instanceof BeamSqlPrimitive); - } - - @Test - public void testBeamProjectRel() { - BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(), - relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(), - rexBuilder.identityProjects(relDataType), relDataType); - BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode); - - executor.prepare(); - Assert.assertEquals(4, executor.exps.size()); - assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression); - assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression); - assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression); - assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression); - } - - - @Test - public void testBuildExpression_logical() { - RexNode rexNode; - BeamSqlExpression exp; - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND, - Arrays.asList( - rexBuilder.makeLiteral(true), - rexBuilder.makeLiteral(false) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlAndExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR, - Arrays.asList( - rexBuilder.makeLiteral(true), - rexBuilder.makeLiteral(false) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlOrExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT, - Arrays.asList( - rexBuilder.makeLiteral(true) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlNotExpression); - } - - @Test(expected = IllegalStateException.class) - public void testBuildExpression_logical_andOr_invalidOperand() { - RexNode rexNode; - BeamSqlExpression exp; - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND, - Arrays.asList( - rexBuilder.makeLiteral(true), - rexBuilder.makeLiteral("hello") - ) - ); - BeamSqlFnExecutor.buildExpression(rexNode); - } - - @Test(expected = IllegalStateException.class) - public void testBuildExpression_logical_not_invalidOperand() { - RexNode rexNode; - BeamSqlExpression exp; - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT, - Arrays.asList( - rexBuilder.makeLiteral("hello") - ) - ); - BeamSqlFnExecutor.buildExpression(rexNode); - } - - - @Test(expected = IllegalStateException.class) - public void testBuildExpression_logical_not_invalidOperandCount() { - RexNode rexNode; - BeamSqlExpression exp; - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT, - Arrays.asList( - rexBuilder.makeLiteral(true), - rexBuilder.makeLiteral(true) - ) - ); - BeamSqlFnExecutor.buildExpression(rexNode); - } - - @Test - public void testBuildExpression_arithmetic() { - testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class); - testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class); - testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, BeamSqlMultiplyExpression.class); - testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, BeamSqlDivideExpression.class); - testBuildArithmeticExpression(SqlStdOperatorTable.MOD, BeamSqlModExpression.class); - } - - private void testBuildArithmeticExpression(SqlOperator fn, - Class<? extends BeamSqlExpression> clazz) { - RexNode rexNode; - BeamSqlExpression exp; - rexNode = rexBuilder.makeCall(fn, Arrays.asList( - rexBuilder.makeBigintLiteral(new BigDecimal(1L)), - rexBuilder.makeBigintLiteral(new BigDecimal(1L)) - )); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - - assertTrue(exp.getClass().equals(clazz)); - } - - @Test - public void testBuildExpression_string() { - RexNode rexNode; - BeamSqlExpression exp; - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT, - Arrays.asList( - rexBuilder.makeLiteral("hello "), - rexBuilder.makeLiteral("world") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlConcatExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION, - Arrays.asList( - rexBuilder.makeLiteral("hello"), - rexBuilder.makeLiteral("worldhello") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlPositionExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION, - Arrays.asList( - rexBuilder.makeLiteral("hello"), - rexBuilder.makeLiteral("worldhello"), - rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER), - rexBuilder.makeBigintLiteral(BigDecimal.ONE)) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlPositionExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH, - Arrays.asList( - rexBuilder.makeLiteral("hello") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlCharLengthExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER, - Arrays.asList( - rexBuilder.makeLiteral("hello") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlUpperExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER, - Arrays.asList( - rexBuilder.makeLiteral("HELLO") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlLowerExpression); - - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP, - Arrays.asList( - rexBuilder.makeLiteral("hello") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlInitCapExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM, - Arrays.asList( - rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH), - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeLiteral("HELLO") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlTrimExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING, - Arrays.asList( - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeBigintLiteral(BigDecimal.ZERO) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlSubstringExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING, - Arrays.asList( - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeBigintLiteral(BigDecimal.ZERO), - rexBuilder.makeBigintLiteral(BigDecimal.ZERO) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlSubstringExpression); - - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY, - Arrays.asList( - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeBigintLiteral(BigDecimal.ZERO) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlOverlayExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY, - Arrays.asList( - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeBigintLiteral(BigDecimal.ZERO), - rexBuilder.makeBigintLiteral(BigDecimal.ZERO) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlOverlayExpression); - - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE, - Arrays.asList( - rexBuilder.makeLiteral(true), - rexBuilder.makeLiteral("HELLO"), - rexBuilder.makeLiteral("HELLO") - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlCaseExpression); - } - - @Test - public void testBuildExpression_date() { - RexNode rexNode; - BeamSqlExpression exp; - Calendar calendar = Calendar.getInstance(); - calendar.setTimeZone(TimeZone.getTimeZone("GMT")); - calendar.setTime(new Date()); - - // CEIL - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL, - Arrays.asList( - rexBuilder.makeDateLiteral(calendar), - rexBuilder.makeFlag(TimeUnitRange.MONTH) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlDateCeilExpression); - - // FLOOR - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR, - Arrays.asList( - rexBuilder.makeDateLiteral(calendar), - rexBuilder.makeFlag(TimeUnitRange.MONTH) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlDateFloorExpression); - - // EXTRACT == EXTRACT_DATE? - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT, - Arrays.asList( - rexBuilder.makeFlag(TimeUnitRange.MONTH), - rexBuilder.makeDateLiteral(calendar) - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlExtractExpression); - - // CURRENT_DATE - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE, - Arrays.<RexNode>asList( - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlCurrentDateExpression); - - // LOCALTIME - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME, - Arrays.<RexNode>asList( - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlCurrentTimeExpression); - - // LOCALTIMESTAMP - rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP, - Arrays.<RexNode>asList( - ) - ); - exp = BeamSqlFnExecutor.buildExpression(rexNode); - assertTrue(exp instanceof BeamSqlCurrentTimestampExpression); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java deleted file mode 100644 index c6478a6..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutorTestBase.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; -import org.apache.beam.sdk.extensions.sql.planner.BeamRelDataTypeSystem; -import org.apache.beam.sdk.extensions.sql.planner.BeamRuleSets; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.Lex; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.plan.ConventionTraitDef; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.volcano.VolcanoPlanner; -import org.apache.calcite.rel.RelCollationTraitDef; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.RelBuilder; -import org.junit.BeforeClass; - -/** - * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link BeamSqlExpression}. - */ -public class BeamSqlFnExecutorTestBase { - public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY); - public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder); - - public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT); - public static RelDataType relDataType; - - public static BeamSqlRowType beamRowType; - public static BeamSqlRow record; - - public static RelBuilder relBuilder; - - @BeforeClass - public static void prepare() { - relDataType = TYPE_FACTORY.builder() - .add("order_id", SqlTypeName.BIGINT) - .add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE) - .add("order_time", SqlTypeName.BIGINT).build(); - - beamRowType = CalciteUtils.toBeamRowType(relDataType); - record = new BeamSqlRow(beamRowType); - - record.addField(0, 1234567L); - record.addField(1, 0); - record.addField(2, 8.9); - record.addField(3, 1234567L); - - SchemaPlus schema = Frameworks.createRootSchema(true); - final List<RelTraitDef> traitDefs = new ArrayList<>(); - traitDefs.add(ConventionTraitDef.INSTANCE); - traitDefs.add(RelCollationTraitDef.INSTANCE); - FrameworkConfig config = Frameworks.newConfigBuilder() - .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) - .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) - .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build(); - - relBuilder = RelBuilder.create(config); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java deleted file mode 100644 index 7bfbe20..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamNullExperssionTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test cases for {@link BeamSqlIsNullExpression} and - * {@link BeamSqlIsNotNullExpression}. - */ -public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase { - - @Test - public void testIsNull() { - BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression( - new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0)); - Assert.assertEquals(false, exp1.evaluate(record).getValue()); - - BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression( - BeamSqlPrimitive.of(SqlTypeName.BIGINT, null)); - Assert.assertEquals(true, exp2.evaluate(record).getValue()); - } - - @Test - public void testIsNotNull() { - BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression( - new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0)); - Assert.assertEquals(true, exp1.evaluate(record).getValue()); - - BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression( - BeamSqlPrimitive.of(SqlTypeName.BIGINT, null)); - Assert.assertEquals(false, exp2.evaluate(record).getValue()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java deleted file mode 100644 index b6f65a1..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}. - */ -public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test - public void testAnd() { - List<BeamSqlExpression> operands = new ArrayList<>(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - - Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue()); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); - - Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue()); - } - - @Test - public void testOr() { - List<BeamSqlExpression> operands = new ArrayList<>(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); - - Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue()); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - - Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue()); - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java deleted file mode 100644 index 28ed920..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpressionTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Test for BeamSqlCaseExpression. - */ -public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test public void accept() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); - assertTrue(new BeamSqlCaseExpression(operands).accept()); - - // even param count - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); - assertFalse(new BeamSqlCaseExpression(operands).accept()); - - // `when` type error - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); - assertFalse(new BeamSqlCaseExpression(operands).accept()); - - // `then` type mixing - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - assertFalse(new BeamSqlCaseExpression(operands).accept()); - - } - - @Test public void evaluate() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); - assertEquals("hello", new BeamSqlCaseExpression(operands) - .evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); - assertEquals("world", new BeamSqlCaseExpression(operands) - .evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); - assertEquals("hello1", new BeamSqlCaseExpression(operands) - .evaluate(record).getValue()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java deleted file mode 100644 index feefc45..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpressionTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator; - -import java.sql.Date; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test for {@link BeamSqlCastExpression}. - */ -public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase { - - private List<BeamSqlExpression> operands; - - @Before - public void setup() { - operands = new ArrayList<>(); - } - - @Test - public void testForOperands() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa")); - Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept()); - } - - @Test - public void testForIntegerToBigintTypeCasting() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); - Assert.assertEquals(5L, - new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong()); - } - - @Test - public void testForDoubleToBigIntCasting() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45)); - Assert.assertEquals(5L, - new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong()); - } - - @Test - public void testForIntegerToDateCast() { - // test for yyyyMMdd format - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521)); - Assert.assertEquals(Date.valueOf("2017-05-21"), - new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); - } - - @Test - public void testyyyyMMddDateFormat() { - //test for yyyy-MM-dd format - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21")); - Assert.assertEquals(Date.valueOf("2017-05-21"), - new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); - } - - @Test - public void testyyMMddDateFormat() { - // test for yy.MM.dd format - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21")); - Assert.assertEquals(Date.valueOf("2017-05-21"), - new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); - } - - @Test - public void testForTimestampCastExpression() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989")); - Assert.assertEquals(SqlTypeName.TIMESTAMP, - new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record) - .getOutputType()); - } - - @Test - public void testDateTimeFormatWithMillis() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989")); - Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), - new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); - } - - @Test - public void testDateTimeFormatWithTimezone() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST")); - Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), - new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); - } - - @Test - public void testDateTimeFormat() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59")); - Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"), - new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); - } - - @Test(expected = RuntimeException.class) - public void testForCastTypeNotSupported() { - operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime())); - Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), - new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); - } - -}
