http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java deleted file mode 100644 index 1a734bc..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator.string; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Test of BeamSqlUpperExpression. - */ -public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test public void evaluate() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - assertEquals("HELLO", - new BeamSqlUpperExpression(operands).evaluate(record).getValue()); - } - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java deleted file mode 100644 index 6c1dcb2..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.mock; - -import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType; -import static org.apache.beam.dsls.sql.TestUtils.buildRows; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -/** - * Mocked table for bounded data sources. - */ -public class MockedBoundedTable extends MockedTable { - /** rows written to this table. */ - private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); - /** rows flow out from this table. */ - private final List<BeamSqlRow> rows = new ArrayList<>(); - - public MockedBoundedTable(BeamSqlRowType beamSqlRowType) { - super(beamSqlRowType); - } - - /** - * Convenient way to build a mocked bounded table. - * - * <p>e.g. - * - * <pre>{@code - * MockedUnboundedTable - * .of(Types.BIGINT, "order_id", - * Types.INTEGER, "site_id", - * Types.DOUBLE, "price", - * Types.TIMESTAMP, "order_time") - * }</pre> - */ - public static MockedBoundedTable of(final Object... args){ - return new MockedBoundedTable(buildBeamSqlRowType(args)); - } - - /** - * Build a mocked bounded table with the specified type. - */ - public static MockedBoundedTable of(final BeamSqlRowType type) { - return new MockedBoundedTable(type); - } - - - /** - * Add rows to the builder. - * - * <p>Sample usage: - * - * <pre>{@code - * addRows( - * 1, 3, "james", -- first row - * 2, 5, "bond" -- second row - * ... - * ) - * }</pre> - */ - public MockedBoundedTable addRows(Object... args) { - List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args)); - this.rows.addAll(rows); - return this; - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.BOUNDED; - } - - @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply( - "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows)); - } - - @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - return new OutputStore(); - } - - /** - * Keep output in {@code CONTENT} for validation. - * - */ - public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> { - - @Override - public PDone expand(PCollection<BeamSqlRow> input) { - input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() { - @ProcessElement - public void processElement(ProcessContext c) { - CONTENT.add(c.element()); - } - - @Teardown - public void close() { - CONTENT.clear(); - } - - })); - return PDone.in(input.getPipeline()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java deleted file mode 100644 index 858ae88..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.mock; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -/** - * Base class for mocked table. - */ -public abstract class MockedTable extends BaseBeamTable { - public static final AtomicInteger COUNTER = new AtomicInteger(); - public MockedTable(BeamSqlRowType beamSqlRowType) { - super(beamSqlRowType); - } - - @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - throw new UnsupportedOperationException("buildIOWriter unsupported!"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java deleted file mode 100644 index ee6eb22..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.mock; - -import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType; -import static org.apache.beam.dsls.sql.TestUtils.buildRows; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.calcite.util.Pair; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A mocked unbounded table. - */ -public class MockedUnboundedTable extends MockedTable { - /** rows flow out from this table with the specified watermark instant. */ - private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>(); - /** specify the index of column in the row which stands for the event time field. */ - private int timestampField; - private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) { - super(beamSqlRowType); - } - - /** - * Convenient way to build a mocked unbounded table. - * - * <p>e.g. - * - * <pre>{@code - * MockedUnboundedTable - * .of(Types.BIGINT, "order_id", - * Types.INTEGER, "site_id", - * Types.DOUBLE, "price", - * Types.TIMESTAMP, "order_time") - * }</pre> - */ - public static MockedUnboundedTable of(final Object... args){ - return new MockedUnboundedTable(buildBeamSqlRowType(args)); - } - - public MockedUnboundedTable timestampColumnIndex(int idx) { - this.timestampField = idx; - return this; - } - - /** - * Add rows to the builder. - * - * <p>Sample usage: - * - * <pre>{@code - * addRows( - * duration, -- duration which stands for the corresponding watermark instant - * 1, 3, "james", -- first row - * 2, 5, "bond" -- second row - * ... - * ) - * }</pre> - */ - public MockedUnboundedTable addRows(Duration duration, Object... args) { - List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args)); - // record the watermark + rows - this.timestampedRows.add(Pair.of(duration, rows)); - return this; - } - - @Override public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; - } - - @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - TestStream.Builder<BeamSqlRow> values = TestStream.create( - new BeamSqlRowCoder(beamSqlRowType)); - - for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) { - values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); - for (int i = 0; i < pair.getValue().size(); i++) { - values = values.addElements(TimestampedValue.of(pair.getValue().get(i), - new Instant(pair.getValue().get(i).getDate(timestampField)))); - } - } - - return pipeline.begin().apply( - "MockedUnboundedTable_" + COUNTER.incrementAndGet(), - values.advanceWatermarkToInfinity()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java deleted file mode 100644 index 3b37143..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamIntersectRel}. - */ -public class BeamIntersectRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS1", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 4L, 4, 4.0 - ) - ); - - sqlEnv.registerTable("ORDER_DETAILS2", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0, - 3L, 3, 3.0 - ) - ); - } - - @Test - public void testIntersect() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " INTERSECT " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0 - ).getRows()); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testIntersectAll() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " INTERSECT ALL " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).satisfies(new CheckSize(3)); - - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0 - ).getRows()); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java deleted file mode 100644 index 24a3256..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Bounded + Bounded Test for {@code BeamJoinRel}. - */ -public class BeamJoinRelBoundedVsBoundedTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - - public static final MockedBoundedTable ORDER_DETAILS1 = - MockedBoundedTable.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price" - ).addRows( - 1, 2, 3, - 2, 3, 3, - 3, 4, 5 - ); - - public static final MockedBoundedTable ORDER_DETAILS2 = - MockedBoundedTable.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price" - ).addRows( - 1, 2, 3, - 2, 3, 3, - 3, 4, 5 - ); - - @BeforeClass - public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1); - beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2); - } - - @Test - public void testInnerJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 2, 3, 3, 1, 2, 3 - ).getRows()); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " LEFT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.enableAbandonedNodeEnforcement(false); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 1, 2, 3, null, null, null, - 2, 3, 3, 1, 2, 3, - 3, 4, 5, null, null, null - ).getRows()); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 2, 3, 3, 1, 2, 3, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test - public void testFullOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " FULL OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 2, 3, 3, 1, 2, 3, - 1, 2, 3, null, null, null, - 3, 4, 5, null, null, null, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testException_nonEqualJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id>o2.site_id" - ; - - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testException_crossJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; - - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java deleted file mode 100644 index 3f0c98e..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.mock.MockedUnboundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Unbounded + Unbounded Test for {@code BeamJoinRel}. - */ -public class BeamJoinRelUnboundedVsBoundedTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - public static final Date FIRST_DATE = new Date(1); - public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); - public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1); - private static final Duration WINDOW_SIZE = Duration.standardHours(1); - - @BeforeClass - public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable - .of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.TIMESTAMP, "order_time" - ) - .timestampColumnIndex(3) - .addRows( - Duration.ZERO, - 1, 1, 1, FIRST_DATE, - 1, 2, 2, FIRST_DATE - ) - .addRows( - WINDOW_SIZE.plus(Duration.standardSeconds(1)), - 2, 2, 3, SECOND_DATE, - 2, 3, 3, SECOND_DATE, - // this late data is omitted - 1, 2, 3, FIRST_DATE - ) - .addRows( - WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)), - 3, 3, 3, THIRD_DATE, - // this late data is omitted - 2, 2, 3, SECOND_DATE - ) - ); - - beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable - .of(Types.INTEGER, "order_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, "james", - 2, "bond" - )); - } - - @Test - public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " JOIN " - + " ORDER_DETAILS1 o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond" - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond" - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " LEFT OUTER JOIN " - + " ORDER_DETAILS1 o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond", - 3, 3, null - ).getStringRows() - ); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testLeftOuterJoinError() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " LEFT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " RIGHT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond", - 3, 3, null - ).getStringRows() - ); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testRightOuterJoinError() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " RIGHT OUTER JOIN " - + " ORDER_DETAILS1 o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testFullOuterJoinError() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " FULL OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java deleted file mode 100644 index d76e875..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedUnboundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Unbounded + Unbounded Test for {@code BeamJoinRel}. - */ -public class BeamJoinRelUnboundedVsUnboundedTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - public static final Date FIRST_DATE = new Date(1); - public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); - - private static final Duration WINDOW_SIZE = Duration.standardHours(1); - - @BeforeClass - public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable - .of(Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.TIMESTAMP, "order_time" - ) - .timestampColumnIndex(3) - .addRows( - Duration.ZERO, - 1, 1, 1, FIRST_DATE, - 1, 2, 6, FIRST_DATE - ) - .addRows( - WINDOW_SIZE.plus(Duration.standardMinutes(1)), - 2, 2, 7, SECOND_DATE, - 2, 3, 8, SECOND_DATE, - // this late record is omitted(First window) - 1, 3, 3, FIRST_DATE - ) - .addRows( - // this late record is omitted(Second window) - WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)), - 2, 3, 3, SECOND_DATE - ) - ); - } - - @Test - public void testInnerJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id0", - Types.INTEGER, "sum_site_id0").addRows( - 1, 3, 1, 3, - 2, 5, 2, 5 - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " LEFT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - // 1, 1 | 1, 3 - // 2, 2 | NULL, NULL - // ---- | ----- - // 2, 2 | 2, 5 - // 3, 3 | NULL, NULL - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id0", - Types.INTEGER, "sum_site_id0" - ).addRows( - 1, 1, 1, 3, - 2, 2, null, null, - 2, 2, 2, 5, - 3, 3, null, null - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " RIGHT OUTER JOIN " - + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id0", - Types.INTEGER, "sum_site_id0" - ).addRows( - 1, 3, 1, 1, - null, null, 2, 2, - 2, 5, 2, 2, - null, null, 3, 3 - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testFullOuterJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " FULL OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id1=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id1", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id0" - ).addRows( - 1, 1, 1, 3, - 6, 2, null, null, - 7, 2, null, null, - 8, 3, null, null, - null, null, 2, 5 - ).getStringRows() - ); - pipeline.run(); - } - - @Test(expected = IllegalArgumentException.class) - public void testWindowsMismatch() throws Exception { - String sql = "SELECT * FROM " - + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 " - + " LEFT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java deleted file mode 100644 index 80da8fb..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamMinusRel}. - */ -public class BeamMinusRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS1", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 4L, 4, 4.0, - 4L, 4, 4.0 - ) - ); - - sqlEnv.registerTable("ORDER_DETAILS2", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0, - 3L, 3, 3.0 - ) - ); - } - - @Test - public void testExcept() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " EXCEPT " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 4L, 4, 4.0 - ).getRows()); - - pipeline.run(); - } - - @Test - public void testExceptAll() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " EXCEPT ALL " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).satisfies(new CheckSize(2)); - - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 4L, 4, 4.0, - 4L, 4, 4.0 - ).getRows()); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java deleted file mode 100644 index d0b01df..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamSetOperatorRelBase}. - */ -public class BeamSetOperatorRelBaseTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - public static final Date THE_DATE = new Date(100000); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price", - Types.TIMESTAMP, "order_time" - ).addRows( - 1L, 1, 1.0, THE_DATE, - 2L, 2, 2.0, THE_DATE - ) - ); - } - - @Test - public void testSameWindow() throws Exception { - String sql = "SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR) " - + " UNION SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - // compare valueInString to ignore the windowStart & windowEnd - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.BIGINT, "cnt" - ).addRows( - 1L, 1, 1L, - 2L, 2, 1L - ).getStringRows()); - pipeline.run(); - } - - @Test(expected = IllegalArgumentException.class) - public void testDifferentWindows() throws Exception { - String sql = "SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR) " - + " UNION SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '2' HOUR) "; - - // use a real pipeline rather than the TestPipeline because we are - // testing exceptions, the pipeline will not actually run. - Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); - BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java deleted file mode 100644 index 1067926..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamSortRel}. - */ -public class BeamSortRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @Before - public void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price", - Types.TIMESTAMP, "order_time" - ).addRows( - 1L, 2, 1.0, new Date(), - 1L, 1, 2.0, new Date(), - 2L, 4, 3.0, new Date(), - 2L, 1, 4.0, new Date(), - 5L, 5, 5.0, new Date(), - 6L, 6, 6.0, new Date(), - 7L, 7, 7.0, new Date(), - 8L, 8888, 8.0, new Date(), - 8L, 999, 9.0, new Date(), - 10L, 100, 10.0, new Date() - ) - ); - sqlEnv.registerTable("SUB_ORDER_RAM", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ) - ); - } - - @Test - public void testOrderBy_basic() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc limit 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, 1, 2.0, - 2L, 4, 3.0, - 2L, 1, 4.0 - ).getRows()); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_nullsFirst() throws Exception { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, null, 2.0, - 2L, 1, 3.0, - 2L, null, 4.0, - 5L, 5, 5.0 - ) - ); - sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable - .of(Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price")); - - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, null, 2.0, - 1L, 2, 1.0, - 2L, null, 4.0, - 2L, 1, 3.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_nullsLast() throws Exception { - sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable - .of(Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, null, 2.0, - 2L, 1, 3.0, - 2L, null, 4.0, - 5L, 5, 5.0)); - sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable - .of(Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price")); - - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, null, 2.0, - 2L, 1, 3.0, - 2L, null, 4.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_with_offset() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 5L, 5, 5.0, - 6L, 6, 6.0, - 7L, 7, 7.0, - 8L, 8888, 8.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_bigFetch() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc limit 11"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, 1, 2.0, - 2L, 4, 3.0, - 2L, 1, 4.0, - 5L, 5, 5.0, - 6L, 6, 6.0, - 7L, 7, 7.0, - 8L, 8888, 8.0, - 8L, 999, 9.0, - 10L, 100, 10.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testOrderBy_exception() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT " - + " order_id, COUNT(*) " - + "FROM ORDER_DETAILS " - + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)" - + "ORDER BY order_id asc limit 11"; - - TestPipeline pipeline = TestPipeline.create(); - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java deleted file mode 100644 index cad3290..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamUnionRel}. - */ -public class BeamUnionRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0 - ) - ); - } - - @Test - public void testUnion() throws Exception { - String sql = "SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + " UNION SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0 - ).getRows() - ); - pipeline.run(); - } - - @Test - public void testUnionAll() throws Exception { - String sql = "SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS" - + " UNION ALL " - + " SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 2L, 2, 2.0 - ).getRows() - ); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java deleted file mode 100644 index 9d13f9b..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import java.sql.Types; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamValuesRel}. - */ -public class BeamValuesRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("string_table", - MockedBoundedTable.of( - Types.VARCHAR, "name", - Types.VARCHAR, "description" - ) - ); - sqlEnv.registerTable("int_table", - MockedBoundedTable.of( - Types.INTEGER, "c0", - Types.INTEGER, "c1" - ) - ); - } - - @Test - public void testValues() throws Exception { - String sql = "insert into string_table(name, description) values " - + "('hello', 'world'), ('james', 'bond')"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.VARCHAR, "name", - Types.VARCHAR, "description" - ).addRows( - "hello", "world", - "james", "bond" - ).getRows() - ); - pipeline.run(); - } - - @Test - public void testValues_castInt() throws Exception { - String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "c0", - Types.INTEGER, "c1" - ).addRows( - 1, 2 - ).getRows() - ); - pipeline.run(); - } - - @Test - public void testValues_onlySelect() throws Exception { - String sql = "select 1, '1'"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "EXPR$0", - Types.CHAR, "EXPR$1" - ).addRows( - 1, "1" - ).getRows() - ); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java deleted file mode 100644 index ce532df..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.rel; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.junit.Assert; - -/** - * Utility class to check size of BeamSQLRow iterable. - */ -public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> { - private int size; - public CheckSize(int size) { - this.size = size; - } - @Override public Void apply(Iterable<BeamSqlRow> input) { - int count = 0; - for (BeamSqlRow row : input) { - count++; - } - Assert.assertEquals(size, count); - return null; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java deleted file mode 100644 index e41e341..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema; - -import java.math.BigDecimal; -import java.util.Date; -import java.util.GregorianCalendar; - -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Tests for BeamSqlRowCoder. - */ -public class BeamSqlRowCoderTest { - - @Test - public void encodeAndDecode() throws Exception { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder() - .add("col_tinyint", SqlTypeName.TINYINT) - .add("col_smallint", SqlTypeName.SMALLINT) - .add("col_integer", SqlTypeName.INTEGER) - .add("col_bigint", SqlTypeName.BIGINT) - .add("col_float", SqlTypeName.FLOAT) - .add("col_double", SqlTypeName.DOUBLE) - .add("col_decimal", SqlTypeName.DECIMAL) - .add("col_string_varchar", SqlTypeName.VARCHAR) - .add("col_time", SqlTypeName.TIME) - .add("col_timestamp", SqlTypeName.TIMESTAMP) - .add("col_boolean", SqlTypeName.BOOLEAN) - .build(); - } - }; - - BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType( - protoRowType.apply(new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT))); - BeamSqlRow row = new BeamSqlRow(beamSQLRowType); - row.addField("col_tinyint", Byte.valueOf("1")); - row.addField("col_smallint", Short.valueOf("1")); - row.addField("col_integer", 1); - row.addField("col_bigint", 1L); - row.addField("col_float", 1.1F); - row.addField("col_double", 1.1); - row.addField("col_decimal", BigDecimal.ZERO); - row.addField("col_string_varchar", "hello"); - GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTime(new Date()); - row.addField("col_time", calendar); - row.addField("col_timestamp", new Date()); - row.addField("col_boolean", true); - - - BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType); - CoderProperties.coderDecodeEncodeEqual(coder, row); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java deleted file mode 100644 index 01cd960..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema.kafka; - -import java.io.Serializable; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.commons.csv.CSVFormat; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for BeamKafkaCSVTable. - */ -public class BeamKafkaCSVTableTest { - @Rule - public TestPipeline pipeline = TestPipeline.create(); - public static BeamSqlRow row1 = new BeamSqlRow(genRowType()); - public static BeamSqlRow row2 = new BeamSqlRow(genRowType()); - - @BeforeClass - public static void setUp() { - row1.addField(0, 1L); - row1.addField(1, 1); - row1.addField(2, 1.0); - - row2.addField(0, 2L); - row2.addField(1, 2); - row2.addField(2, 2.0); - } - - @Test public void testCsvRecorderDecoder() throws Exception { - PCollection<BeamSqlRow> result = pipeline - .apply( - Create.of("1,\"1\",1.0", "2,2,2.0") - ) - .apply(ParDo.of(new String2KvBytes())) - .apply( - new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) - ); - - PAssert.that(result).containsInAnyOrder(row1, row2); - - pipeline.run(); - } - - @Test public void testCsvRecorderEncoder() throws Exception { - PCollection<BeamSqlRow> result = pipeline - .apply( - Create.of(row1, row2) - ) - .apply( - new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT) - ).apply( - new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) - ); - - PAssert.that(result).containsInAnyOrder(row1, row2); - - pipeline.run(); - } - - private static BeamSqlRowType genRowType() { - return CalciteUtils.toBeamRowType(new RelProtoDataType() { - - @Override public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT) - .add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).build(); - } - }.apply(BeamQueryPlanner.TYPE_FACTORY)); - } - - private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>> - implements Serializable { - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(KV.of(new byte[] {}, ctx.element().getBytes())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java deleted file mode 100644 index b6e11e5..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema.text; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintStream; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Tests for {@code BeamTextCSVTable}. - */ -public class BeamTextCSVTableTest { - - @Rule public TestPipeline pipeline = TestPipeline.create(); - @Rule public TestPipeline pipeline2 = TestPipeline.create(); - - /** - * testData. - * - * <p> - * The types of the csv fields are: - * integer,bigint,float,double,string - * </p> - */ - private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" }; - private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" }; - - private static List<Object[]> testData = Arrays.asList(data1, data2); - private static List<BeamSqlRow> testDataRows = new ArrayList<BeamSqlRow>() {{ - for (Object[] data : testData) { - add(buildRow(data)); - } - }}; - - private static Path tempFolder; - private static File readerSourceFile; - private static File writerTargetFile; - - @Test public void testBuildIOReader() { - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(), - readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); - PAssert.that(rows).containsInAnyOrder(testDataRows); - pipeline.run(); - } - - @Test public void testBuildIOWriter() { - new BeamTextCSVTable(buildBeamSqlRowType(), - readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) - .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath()) - .buildIOWriter()); - pipeline.run(); - - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(), - writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); - - // confirm the two reads match - PAssert.that(rows).containsInAnyOrder(testDataRows); - pipeline2.run(); - } - - @BeforeClass public static void setUp() throws IOException { - tempFolder = Files.createTempDirectory("BeamTextTableTest"); - readerSourceFile = writeToFile(testData, "readerSourceFile.txt"); - writerTargetFile = writeToFile(testData, "writerTargetFile.txt"); - } - - @AfterClass public static void teardownClass() throws IOException { - Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() { - - @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) - throws IOException { - Files.delete(file); - return FileVisitResult.CONTINUE; - } - - @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) - throws IOException { - Files.delete(dir); - return FileVisitResult.CONTINUE; - } - }); - } - - private static File writeToFile(List<Object[]> rows, String filename) throws IOException { - File file = tempFolder.resolve(filename).toFile(); - OutputStream output = new FileOutputStream(file); - writeToStreamAndClose(rows, output); - return file; - } - - /** - * Helper that writes the given lines (adding a newline in between) to a stream, then closes the - * stream. - */ - private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream) { - try (PrintStream writer = new PrintStream(outputStream)) { - CSVPrinter printer = CSVFormat.DEFAULT.print(writer); - for (Object[] row : rows) { - for (Object field : row) { - printer.print(field); - } - printer.println(); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - private RelProtoDataType buildRowType() { - return new RelProtoDataType() { - - @Override public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT) - .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE) - .add("user_name", SqlTypeName.VARCHAR).build(); - } - }; - } - - private static RelDataType buildRelDataType() { - return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER) - .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT) - .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); - } - - private static BeamSqlRowType buildBeamSqlRowType() { - return CalciteUtils.toBeamRowType(buildRelDataType()); - } - - private static BeamSqlRow buildRow(Object[] data) { - return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data)); - } -}
