http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java index 08678d1..456662f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java @@ -40,15 +40,13 @@ public class BeamSqlApiSurfaceTest { final Set<String> allowed = ImmutableSet.of( "org.apache.beam", - "org.joda.time", - "org.apache.commons.csv"); + "org.joda.time"); ApiSurface surface = ApiSurface - .ofClass(BeamSqlCli.class) - .includingClass(BeamSql.class) - .includingClass(BeamSqlEnv.class) - .includingPackage("org.apache.beam.sdk.extensions.sql.schema", - getClass().getClassLoader()) + .ofClass(BeamSql.class) + .includingClass(BeamSqlUdf.class) + .includingClass(BeamRecordSqlType.class) + .includingClass(BeamSqlRecordHelper.class) .pruningPrefix("java") .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*Test") .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*TestBase");
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index db562da..d99ec20 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index ef75ee2..b27435c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -25,7 +25,6 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java index 0876dd9..47109e0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -24,7 +24,6 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo import java.sql.Types; import java.util.Arrays; import org.apache.beam.sdk.coders.BeamRecordCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.BeamRecord; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java index 46aea99..e36eb2b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index 1541123..8db9d7a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -20,8 +20,6 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; import java.util.Iterator; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.SerializableFunction; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index 373deb7..4a1f8a0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -21,7 +21,6 @@ package org.apache.beam.sdk.extensions.sql; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.BeamRecord; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java index 97905c5..9d12126 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java @@ -19,12 +19,12 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java new file mode 100644 index 0000000..906ccfd --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; + +/** + * Base class for rel test. + */ +public class BaseRelTest { + public PCollection<BeamRecord> compilePipeline ( + String sql, Pipeline pipeline, BeamSqlEnv sqlEnv) throws Exception { + return sqlEnv.getPlanner().compileBeamPipeline(sql, pipeline, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java index a51cc30..8e41d0a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java @@ -19,9 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -34,7 +33,7 @@ import org.junit.Test; /** * Test for {@code BeamIntersectRel}. */ -public class BeamIntersectRelTest { +public class BeamIntersectRelTest extends BaseRelTest { static BeamSqlEnv sqlEnv = new BeamSqlEnv(); @Rule @@ -77,7 +76,7 @@ public class BeamIntersectRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -100,7 +99,7 @@ public class BeamIntersectRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java index dde1540..e0d691b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -19,9 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -34,10 +33,10 @@ import org.junit.Test; /** * Bounded + Bounded Test for {@code BeamJoinRel}. */ -public class BeamJoinRelBoundedVsBoundedTest { +public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv(); public static final MockedBoundedTable ORDER_DETAILS1 = MockedBoundedTable.of( @@ -63,8 +62,8 @@ public class BeamJoinRelBoundedVsBoundedTest { @BeforeClass public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1); - beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2); + BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", ORDER_DETAILS1); + BEAM_SQL_ENV.registerTable("ORDER_DETAILS2", ORDER_DETAILS2); } @Test @@ -77,7 +76,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "order_id", @@ -102,7 +101,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); pipeline.enableAbandonedNodeEnforcement(false); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -130,7 +129,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "order_id", @@ -157,7 +156,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "order_id", @@ -187,7 +186,7 @@ public class BeamJoinRelBoundedVsBoundedTest { ; pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + compilePipeline(sql, pipeline, BEAM_SQL_ENV); pipeline.run(); } @@ -198,7 +197,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + compilePipeline(sql, pipeline, BEAM_SQL_ENV); pipeline.run(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java index 28ad99c..c5145ec 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; import java.util.Date; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; @@ -39,10 +38,10 @@ import org.junit.Test; /** * Unbounded + Unbounded Test for {@code BeamJoinRel}. */ -public class BeamJoinRelUnboundedVsBoundedTest { +public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv(); public static final Date FIRST_DATE = new Date(1); public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1); @@ -50,7 +49,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { @BeforeClass public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable .of( Types.INTEGER, "order_id", Types.INTEGER, "site_id", @@ -78,7 +77,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { ) ); - beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable + BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", MockedBoundedTable .of(Types.INTEGER, "order_id", Types.VARCHAR, "buyer" ).addRows( @@ -98,7 +97,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -124,7 +123,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -150,7 +149,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( @@ -178,7 +177,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + compilePipeline(sql, pipeline, BEAM_SQL_ENV); pipeline.run(); } @@ -192,7 +191,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " on " + " o1.order_id=o2.order_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -220,7 +219,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { ; pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + compilePipeline(sql, pipeline, BEAM_SQL_ENV); pipeline.run(); } @@ -235,7 +234,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + compilePipeline(sql, pipeline, BEAM_SQL_ENV); pipeline.run(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java index a5a2e85..e5470ca 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; import java.util.Date; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; import org.apache.beam.sdk.testing.PAssert; @@ -38,10 +37,10 @@ import org.junit.Test; /** * Unbounded + Unbounded Test for {@code BeamJoinRel}. */ -public class BeamJoinRelUnboundedVsUnboundedTest { +public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv(); public static final Date FIRST_DATE = new Date(1); public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); @@ -49,7 +48,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { @BeforeClass public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable .of(Types.INTEGER, "order_id", Types.INTEGER, "site_id", Types.INTEGER, "price", @@ -88,7 +87,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -121,7 +120,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { // 2, 2 | 2, 5 // 3, 3 | NULL, NULL - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -151,7 +150,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -181,7 +180,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { + " o1.order_id1=o2.order_id" ; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV); rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( @@ -213,7 +212,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { + " o1.order_id=o2.order_id" ; pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + compilePipeline(sql, pipeline, BEAM_SQL_ENV); pipeline.run(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java index 425e554..5c4ae2c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java @@ -19,9 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -34,7 +33,7 @@ import org.junit.Test; /** * Test for {@code BeamMinusRel}. */ -public class BeamMinusRelTest { +public class BeamMinusRelTest extends BaseRelTest { static BeamSqlEnv sqlEnv = new BeamSqlEnv(); @Rule @@ -78,7 +77,7 @@ public class BeamMinusRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -100,7 +99,7 @@ public class BeamMinusRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java index 4de493a..cd0297a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java @@ -21,9 +21,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; import java.util.Date; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -38,7 +37,7 @@ import org.junit.Test; /** * Test for {@code BeamSetOperatorRelBase}. */ -public class BeamSetOperatorRelBaseTest { +public class BeamSetOperatorRelBaseTest extends BaseRelTest { static BeamSqlEnv sqlEnv = new BeamSqlEnv(); @Rule @@ -71,7 +70,7 @@ public class BeamSetOperatorRelBaseTest { + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); // compare valueInString to ignore the windowStart & windowEnd PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( @@ -100,7 +99,7 @@ public class BeamSetOperatorRelBaseTest { // use a real pipeline rather than the TestPipeline because we are // testing exceptions, the pipeline will not actually run. Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); - BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); + compilePipeline(sql, pipeline1, sqlEnv); pipeline.run(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java index f033fa0..19ba0d0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java @@ -20,9 +20,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; import java.util.Date; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -35,7 +34,7 @@ import org.junit.Test; /** * Test for {@code BeamSortRel}. */ -public class BeamSortRelTest { +public class BeamSortRelTest extends BaseRelTest { static BeamSqlEnv sqlEnv = new BeamSqlEnv(); @Rule @@ -78,7 +77,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", Types.INTEGER, "site_id", @@ -117,7 +116,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -155,7 +154,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -178,7 +177,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -201,7 +200,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -232,6 +231,6 @@ public class BeamSortRelTest { + "ORDER BY order_id asc limit 11"; TestPipeline pipeline = TestPipeline.create(); - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + compilePipeline(sql, pipeline, sqlEnv); } } http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java index 7cc52da..d79a54e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java @@ -19,9 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -34,7 +33,7 @@ import org.junit.Test; /** * Test for {@code BeamUnionRel}. */ -public class BeamUnionRelTest { +public class BeamUnionRelTest extends BaseRelTest { static BeamSqlEnv sqlEnv = new BeamSqlEnv(); @Rule @@ -63,7 +62,7 @@ public class BeamUnionRelTest { + " order_id, site_id, price " + "FROM ORDER_DETAILS "; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -86,7 +85,7 @@ public class BeamUnionRelTest { + " SELECT order_id, site_id, price " + "FROM ORDER_DETAILS"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java index ff31e55..5604e32 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java @@ -19,9 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -34,7 +33,7 @@ import org.junit.Test; /** * Test for {@code BeamValuesRel}. */ -public class BeamValuesRelTest { +public class BeamValuesRelTest extends BaseRelTest { static BeamSqlEnv sqlEnv = new BeamSqlEnv(); @Rule @@ -60,7 +59,7 @@ public class BeamValuesRelTest { public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.VARCHAR, "name", @@ -76,7 +75,7 @@ public class BeamValuesRelTest { @Test public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "c0", @@ -91,7 +90,7 @@ public class BeamValuesRelTest { @Test public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; - PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "EXPR$0", http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java new file mode 100644 index 0000000..0a320db --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; +import org.apache.beam.sdk.coders.BeamRecordCoder; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Tests for BeamSqlRowCoder. + */ +public class BeamSqlRowCoderTest { + + @Test + public void encodeAndDecode() throws Exception { + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("col_tinyint", SqlTypeName.TINYINT) + .add("col_smallint", SqlTypeName.SMALLINT) + .add("col_integer", SqlTypeName.INTEGER) + .add("col_bigint", SqlTypeName.BIGINT) + .add("col_float", SqlTypeName.FLOAT) + .add("col_double", SqlTypeName.DOUBLE) + .add("col_decimal", SqlTypeName.DECIMAL) + .add("col_string_varchar", SqlTypeName.VARCHAR) + .add("col_time", SqlTypeName.TIME) + .add("col_timestamp", SqlTypeName.TIMESTAMP) + .add("col_boolean", SqlTypeName.BOOLEAN) + .build(); + } + }; + + BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType( + protoRowType.apply(new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT))); + + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date()); + BeamRecord row = new BeamRecord(beamSQLRowType + , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1 + , BigDecimal.ZERO, "hello", calendar, new Date(), true); + + + BeamRecordCoder coder = beamSQLRowType.getRecordCoder(); + CoderProperties.coderDecodeEncodeEqual(coder, row); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java new file mode 100644 index 0000000..fd88448 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTableTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema.kafka; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.csv.CSVFormat; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for BeamKafkaCSVTable. + */ +public class BeamKafkaCSVTableTest { + @Rule + public TestPipeline pipeline = TestPipeline.create(); + public static BeamRecord row1; + public static BeamRecord row2; + + @BeforeClass + public static void setUp() { + row1 = new BeamRecord(genRowType(), 1L, 1, 1.0); + + row2 = new BeamRecord(genRowType(), 2L, 2, 2.0); + } + + @Test public void testCsvRecorderDecoder() throws Exception { + PCollection<BeamRecord> result = pipeline + .apply( + Create.of("1,\"1\",1.0", "2,2,2.0") + ) + .apply(ParDo.of(new String2KvBytes())) + .apply( + new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) + ); + + PAssert.that(result).containsInAnyOrder(row1, row2); + + pipeline.run(); + } + + @Test public void testCsvRecorderEncoder() throws Exception { + PCollection<BeamRecord> result = pipeline + .apply( + Create.of(row1, row2) + ) + .apply( + new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT) + ).apply( + new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) + ); + + PAssert.that(result).containsInAnyOrder(row1, row2); + + pipeline.run(); + } + + private static BeamRecordSqlType genRowType() { + return CalciteUtils.toBeamRowType(new RelProtoDataType() { + + @Override public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT) + .add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).build(); + } + }.apply(BeamQueryPlanner.TYPE_FACTORY)); + } + + private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>> + implements Serializable { + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(KV.of(new byte[] {}, ctx.element().getBytes())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java new file mode 100644 index 0000000..9a57a5f --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema.text; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableTest { + + @Rule public TestPipeline pipeline = TestPipeline.create(); + @Rule public TestPipeline pipeline2 = TestPipeline.create(); + + /** + * testData. + * + * <p> + * The types of the csv fields are: + * integer,bigint,float,double,string + * </p> + */ + private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" }; + private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" }; + + private static List<Object[]> testData = Arrays.asList(data1, data2); + private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>() {{ + for (Object[] data : testData) { + add(buildRow(data)); + } + }}; + + private static Path tempFolder; + private static File readerSourceFile; + private static File writerTargetFile; + + @Test public void testBuildIOReader() { + PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(), + readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); + PAssert.that(rows).containsInAnyOrder(testDataRows); + pipeline.run(); + } + + @Test public void testBuildIOWriter() { + new BeamTextCSVTable(buildBeamSqlRowType(), + readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) + .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath()) + .buildIOWriter()); + pipeline.run(); + + PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(), + writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); + + // confirm the two reads match + PAssert.that(rows).containsInAnyOrder(testDataRows); + pipeline2.run(); + } + + @BeforeClass public static void setUp() throws IOException { + tempFolder = Files.createTempDirectory("BeamTextTableTest"); + readerSourceFile = writeToFile(testData, "readerSourceFile.txt"); + writerTargetFile = writeToFile(testData, "writerTargetFile.txt"); + } + + @AfterClass public static void teardownClass() throws IOException { + Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() { + + @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) + throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + } + + private static File writeToFile(List<Object[]> rows, String filename) throws IOException { + File file = tempFolder.resolve(filename).toFile(); + OutputStream output = new FileOutputStream(file); + writeToStreamAndClose(rows, output); + return file; + } + + /** + * Helper that writes the given lines (adding a newline in between) to a stream, then closes the + * stream. + */ + private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream) { + try (PrintStream writer = new PrintStream(outputStream)) { + CSVPrinter printer = CSVFormat.DEFAULT.print(writer); + for (Object[] row : rows) { + for (Object field : row) { + printer.print(field); + } + printer.println(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private RelProtoDataType buildRowType() { + return new RelProtoDataType() { + + @Override public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT) + .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE) + .add("user_name", SqlTypeName.VARCHAR).build(); + } + }; + } + + private static RelDataType buildRelDataType() { + return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER) + .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT) + .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); + } + + private static BeamRecordSqlType buildBeamSqlRowType() { + return CalciteUtils.toBeamRowType(buildRelDataType()); + } + + private static BeamRecord buildRow(Object[] data) { + return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java new file mode 100644 index 0000000..948e86c --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java @@ -0,0 +1,453 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema.transform; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.BeamRecordCoder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlAvgAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ImmutableBitSet; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unit tests for {@link BeamAggregationTransforms}. + * + */ +public class BeamAggregationTransformTest extends BeamTransformBaseTest{ + + @Rule + public TestPipeline p = TestPipeline.create(); + + private List<AggregateCall> aggCalls; + + private BeamRecordSqlType keyType; + private BeamRecordSqlType aggPartType; + private BeamRecordSqlType outputType; + + private BeamRecordCoder inRecordCoder; + private BeamRecordCoder keyCoder; + private BeamRecordCoder aggCoder; + private BeamRecordCoder outRecordCoder; + + /** + * This step equals to below query. + * <pre> + * SELECT `f_int` + * , COUNT(*) AS `size` + * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1` + * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1` + * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2` + * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2` + * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3` + * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3` + * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4` + * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4` + * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5` + * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5` + * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7` + * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8` + * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8` + * FROM TABLE_NAME + * GROUP BY `f_int` + * </pre> + * @throws ParseException + */ + @Test + public void testCountPerElementBasic() throws ParseException { + setupEnvironment(); + + PCollection<BeamRecord> input = p.apply(Create.of(inputRows)); + + //1. extract fields in group-by key part + PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy", + WithKeys + .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) + .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder)); + + //2. apply a GroupByKey. + PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream + .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create()) + .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder, + IterableCoder.<BeamRecord>of(inRecordCoder))); + + //3. run aggregation functions + PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation", + Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues( + new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType))) + .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder)); + + //4. flat KV to a single record + PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord", + ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); + mergedStream.setCoder(outRecordCoder); + + //assert function BeamAggregationTransform.AggregationGroupByKeyFn + PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn()); + + //assert BeamAggregationTransform.AggregationCombineFn + PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn()); + + //assert BeamAggregationTransform.MergeAggregationRecord + PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord()); + + p.run(); +} + + private void setupEnvironment() { + prepareAggregationCalls(); + prepareTypeAndCoder(); + } + + /** + * create list of all {@link AggregateCall}. + */ + @SuppressWarnings("deprecation") + private void prepareAggregationCalls() { + //aggregations for all data type + aggCalls = new ArrayList<>(); + aggCalls.add( + new AggregateCall(new SqlCountAggFunction(), false, + Arrays.<Integer>asList(), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "count") + ); + aggCalls.add( + new AggregateCall(new SqlSumAggFunction( + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false, + Arrays.<Integer>asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "sum1") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.<Integer>asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "avg1") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.<Integer>asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "max1") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.<Integer>asList(1), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), + "min1") + ); + + aggCalls.add( + new AggregateCall(new SqlSumAggFunction( + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false, + Arrays.<Integer>asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "sum2") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.<Integer>asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "avg2") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.<Integer>asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "max2") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.<Integer>asList(2), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), + "min2") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)), + false, + Arrays.<Integer>asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "sum3") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.<Integer>asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "avg3") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.<Integer>asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "max3") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.<Integer>asList(3), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), + "min3") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)), + false, + Arrays.<Integer>asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "sum4") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.<Integer>asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "avg4") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.<Integer>asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "max4") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.<Integer>asList(4), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), + "min4") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)), + false, + Arrays.<Integer>asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "sum5") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.<Integer>asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "avg5") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.<Integer>asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "max5") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.<Integer>asList(5), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), + "min5") + ); + + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.<Integer>asList(7), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), + "max7") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.<Integer>asList(7), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), + "min7") + ); + + aggCalls.add( + new AggregateCall( + new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)), + false, + Arrays.<Integer>asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "sum8") + ); + aggCalls.add( + new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, + Arrays.<Integer>asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "avg8") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, + Arrays.<Integer>asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "max8") + ); + aggCalls.add( + new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, + Arrays.<Integer>asList(8), + new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), + "min8") + ); + } + + /** + * Coders used in aggregation steps. + */ + private void prepareTypeAndCoder() { + inRecordCoder = inputRowType.getRecordCoder(); + + keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); + keyCoder = keyType.getRecordCoder(); + + aggPartType = initTypeOfSqlRow( + Arrays.asList(KV.of("count", SqlTypeName.BIGINT), + + KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), + KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), + + KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), + KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), + + KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), + KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), + + KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), + KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), + + KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), + KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), + + KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), + + KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), + KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) + )); + aggCoder = aggPartType.getRecordCoder(); + + outputType = prepareFinalRowType(); + outRecordCoder = outputType.getRecordCoder(); + } + + /** + * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. + */ + private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn() { + return Arrays.asList( + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), + inputRows.get(0)), + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), + inputRows.get(1)), + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), + inputRows.get(2)), + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), + inputRows.get(3))); + } + + /** + * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. + */ + private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn() + throws ParseException { + return Arrays.asList( + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), + new BeamRecord(aggPartType, Arrays.<Object>asList( + 4L, + 10000L, 2500L, 4000L, 1000L, + (short) 10, (short) 2, (short) 4, (short) 1, + (byte) 10, (byte) 2, (byte) 4, (byte) 1, + 10.0F, 2.5F, 4.0F, 1.0F, + 10.0, 2.5, 4.0, 1.0, + format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), + 10, 2, 4, 1 + ))) + ); + } + + /** + * Row type of final output row. + */ + private BeamRecordSqlType prepareFinalRowType() { + FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); + List<KV<String, SqlTypeName>> columnMetadata = + Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), + + KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), + KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), + + KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), + KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), + + KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), + KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), + + KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), + KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), + + KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), + KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), + + KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), + + KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), + KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) + ); + for (KV<String, SqlTypeName> cm : columnMetadata) { + builder.add(cm.getKey(), cm.getValue()); + } + return CalciteUtils.toBeamRowType(builder.build()); + } + + /** + * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. + */ + private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException { + return new BeamRecord(outputType, Arrays.<Object>asList( + 1, 4L, + 10000L, 2500L, 4000L, 1000L, + (short) 10, (short) 2, (short) 4, (short) 1, + (byte) 10, (byte) 2, (byte) 4, (byte) 1, + 10.0F, 2.5F, 4.0F, 1.0F, + 10.0, 2.5, 4.0, 1.0, + format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), + 10, 2, 4, 1 + )); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java new file mode 100644 index 0000000..3c8f040 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamTransformBaseTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema.transform; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.KV; +import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; + +/** + * shared methods to test PTransforms which execute Beam SQL steps. + * + */ +public class BeamTransformBaseTest { + public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public static BeamRecordSqlType inputRowType; + public static List<BeamRecord> inputRows; + + @BeforeClass + public static void prepareInput() throws NumberFormatException, ParseException{ + List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList( + KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT), + KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT), + KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE), + KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP), + KV.of("f_int2", SqlTypeName.INTEGER) + ); + inputRowType = initTypeOfSqlRow(columnMetadata); + inputRows = Arrays.asList( + initBeamSqlRow(columnMetadata, + Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0, + "string_row1", format.parse("2017-01-01 01:01:03"), 1)), + initBeamSqlRow(columnMetadata, + Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0, + "string_row2", format.parse("2017-01-01 01:02:03"), 2)), + initBeamSqlRow(columnMetadata, + Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0, + "string_row3", format.parse("2017-01-01 01:03:03"), 3)), + initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"), + Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4))); + } + + /** + * create a {@code BeamSqlRowType} for given column metadata. + */ + public static BeamRecordSqlType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ + FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); + for (KV<String, SqlTypeName> cm : columnMetadata) { + builder.add(cm.getKey(), cm.getValue()); + } + return CalciteUtils.toBeamRowType(builder.build()); + } + + /** + * Create an empty row with given column metadata. + */ + public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { + return initBeamSqlRow(columnMetadata, Arrays.asList()); + } + + /** + * Create a row with given column metadata, and values for each column. + * + */ + public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, + List<Object> rowValues){ + BeamRecordSqlType rowType = initTypeOfSqlRow(columnMetadata); + + return new BeamRecord(rowType, rowValues); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index 5898e2e..a64afa6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -29,10 +29,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TimeZone; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.BeamSql; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.BeamRecord; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java index 4ce2f45..a836f79 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java @@ -21,8 +21,8 @@ package org.apache.beam.sdk.extensions.sql.integrationtest; import java.math.BigDecimal; import java.sql.Types; import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java index 60e8211..cf66268 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java @@ -25,8 +25,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java index 426789c..d661866 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java @@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.mock; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java index 465705d..31234e1 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java @@ -22,9 +22,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection;
