http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index 46cab09..e3c6aec 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -20,11 +20,11 @@ package org.apache.beam.sdk.extensions.sql; import java.sql.Types; import java.util.Arrays; import java.util.Iterator; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -39,24 +39,24 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { */ @Test public void testUdaf() throws Exception { - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"), Arrays.asList(Types.INTEGER, Types.INTEGER)); - BeamSqlRow record = new BeamSqlRow(resultType); + BeamRecord record = new BeamRecord(resultType); record.addField("f_int2", 0); record.addField("squaresum", 30); String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result1 = + PCollection<BeamRecord> result1 = boundedInput1.apply("testUdaf1", BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class)); PAssert.that(result1).containsInAnyOrder(record); String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) + PCollection<BeamRecord> result2 = + PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1) .apply("testUdaf2", BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class)); PAssert.that(result2).containsInAnyOrder(record); @@ -69,22 +69,22 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { */ @Test public void testUdf() throws Exception{ - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"), + BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"), Arrays.asList(Types.INTEGER, Types.INTEGER)); - BeamSqlRow record = new BeamSqlRow(resultType); + BeamRecord record = new BeamRecord(resultType); record.addField("f_int", 2); record.addField("cubicvalue", 8); String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; - PCollection<BeamSqlRow> result1 = + PCollection<BeamRecord> result1 = boundedInput1.apply("testUdf1", BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class)); PAssert.that(result1).containsInAnyOrder(record); String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; - PCollection<BeamSqlRow> result2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) + PCollection<BeamRecord> result2 = + PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1) .apply("testUdf2", BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class)); PAssert.that(result2).containsInAnyOrder(record);
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index 9995b0a..63b6ca8 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -21,9 +21,9 @@ package org.apache.beam.sdk.extensions.sql; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.BeamRecord; /** * Test utilities. @@ -32,7 +32,7 @@ public class TestUtils { /** * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. */ - public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> { + public static class BeamSqlRow2StringDoFn extends DoFn<BeamRecord, String> { @ProcessElement public void processElement(ProcessContext ctx) { ctx.output(ctx.element().valueInString()); @@ -42,9 +42,9 @@ public class TestUtils { /** * Convert list of {@code BeamSqlRow} to list of {@code String}. */ - public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) { + public static List<String> beamSqlRows2Strings(List<BeamRecord> rows) { List<String> strs = new ArrayList<>(); - for (BeamSqlRow row : rows) { + for (BeamRecord row : rows) { strs.add(row.valueInString()); } @@ -69,8 +69,8 @@ public class TestUtils { * {@code} */ public static class RowsBuilder { - private BeamSqlRowType type; - private List<BeamSqlRow> rows = new ArrayList<>(); + private BeamSqlRecordType type; + private List<BeamRecord> rows = new ArrayList<>(); /** * Create a RowsBuilder with the specified row type info. @@ -86,7 +86,7 @@ public class TestUtils { * @args pairs of column type and column names. */ public static RowsBuilder of(final Object... args) { - BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args); + BeamSqlRecordType beamSQLRowType = buildBeamSqlRowType(args); RowsBuilder builder = new RowsBuilder(); builder.type = beamSQLRowType; @@ -103,7 +103,7 @@ public class TestUtils { * )}</pre> * @beamSQLRowType the record type. */ - public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) { + public static RowsBuilder of(final BeamSqlRecordType beamSQLRowType) { RowsBuilder builder = new RowsBuilder(); builder.type = beamSQLRowType; @@ -130,7 +130,7 @@ public class TestUtils { return this; } - public List<BeamSqlRow> getRows() { + public List<BeamRecord> getRows() { return rows; } @@ -153,7 +153,7 @@ public class TestUtils { * ) * }</pre> */ - public static BeamSqlRowType buildBeamSqlRowType(Object... args) { + public static BeamSqlRecordType buildBeamSqlRowType(Object... args) { List<Integer> types = new ArrayList<>(); List<String> names = new ArrayList<>(); @@ -162,7 +162,7 @@ public class TestUtils { names.add((String) args[i + 1]); } - return BeamSqlRowType.create(names, types); + return BeamSqlRecordType.create(names, types); } /** @@ -179,12 +179,12 @@ public class TestUtils { * ) * }</pre> */ - public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) { - List<BeamSqlRow> rows = new ArrayList<>(); + public static List<BeamRecord> buildRows(BeamSqlRecordType type, List args) { + List<BeamRecord> rows = new ArrayList<>(); int fieldCount = type.size(); for (int i = 0; i < args.size(); i += fieldCount) { - BeamSqlRow row = new BeamSqlRow(type); + BeamRecord row = new BeamRecord(type); for (int j = 0; j < fieldCount; j++) { row.addField(j, args.get(i + j)); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java index 388c556..4da7790 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; @@ -57,8 +57,8 @@ public class BeamSqlFnExecutorTestBase { RelDataTypeSystem.DEFAULT); public static RelDataType relDataType; - public static BeamSqlRowType beamRowType; - public static BeamSqlRow record; + public static BeamSqlRecordType beamRowType; + public static BeamRecord record; public static RelBuilder relBuilder; @@ -71,7 +71,7 @@ public class BeamSqlFnExecutorTestBase { .add("order_time", SqlTypeName.BIGINT).build(); beamRowType = CalciteUtils.toBeamRowType(relDataType); - record = new BeamSqlRow(beamRowType); + record = new BeamRecord(beamRowType); record.addField(0, 1234567L); record.addField(1, 0); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java index 5a3f65d..a51cc30 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java @@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; import org.junit.Rule; @@ -77,7 +77,7 @@ public class BeamIntersectRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -100,7 +100,7 @@ public class BeamIntersectRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java index c4f6350..dde1540 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; import org.junit.Rule; @@ -77,7 +77,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "order_id", @@ -102,7 +102,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); pipeline.enableAbandonedNodeEnforcement(false); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -130,7 +130,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "order_id", @@ -157,7 +157,7 @@ public class BeamJoinRelBoundedVsBoundedTest { + " o1.order_id=o2.site_id AND o2.price=o1.site_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "order_id", http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java index 1dbd8b4..28ad99c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -26,10 +26,10 @@ import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.junit.BeforeClass; @@ -98,7 +98,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -124,7 +124,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -150,7 +150,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( @@ -192,7 +192,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { + " on " + " o1.order_id=o2.order_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java index 5e5e416..a5a2e85 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -25,10 +25,10 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.junit.BeforeClass; @@ -88,7 +88,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -121,7 +121,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { // 2, 2 | 2, 5 // 3, 3 | NULL, NULL - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -151,7 +151,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { + " o1.order_id=o2.order_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( TestUtils.RowsBuilder.of( @@ -181,7 +181,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { + " o1.order_id1=o2.order_id" ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java index 9149dd4..425e554 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java @@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; import org.junit.Rule; @@ -78,7 +78,7 @@ public class BeamMinusRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -100,7 +100,7 @@ public class BeamMinusRelTest { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java index 36538c0..4de493a 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java @@ -25,11 +25,11 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; import org.junit.Rule; @@ -71,7 +71,7 @@ public class BeamSetOperatorRelBaseTest { + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); // compare valueInString to ignore the windowStart & windowEnd PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) .containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java index 15e3b89..f033fa0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java @@ -24,9 +24,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; import org.junit.Rule; @@ -78,7 +78,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", Types.INTEGER, "site_id", @@ -117,7 +117,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -155,7 +155,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -178,7 +178,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -201,7 +201,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java index c232b30..7cc52da 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java @@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; import org.junit.Rule; @@ -63,7 +63,7 @@ public class BeamUnionRelTest { + " order_id, site_id, price " + "FROM ORDER_DETAILS "; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", @@ -86,7 +86,7 @@ public class BeamUnionRelTest { + " SELECT order_id, site_id, price " + "FROM ORDER_DETAILS"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.BIGINT, "order_id", http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java index e5fa864..ff31e55 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java @@ -23,9 +23,9 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlCli; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; import org.junit.Rule; @@ -60,7 +60,7 @@ public class BeamValuesRelTest { public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.VARCHAR, "name", @@ -76,7 +76,7 @@ public class BeamValuesRelTest { @Test public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "c0", @@ -91,7 +91,7 @@ public class BeamValuesRelTest { @Test public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PCollection<BeamRecord> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder.of( Types.INTEGER, "EXPR$0", http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java index 8cdf2cd..7407a76 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/CheckSize.java @@ -18,21 +18,21 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.BeamRecord; import org.junit.Assert; /** * Utility class to check size of BeamSQLRow iterable. */ -public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> { +public class CheckSize implements SerializableFunction<Iterable<BeamRecord>, Void> { private int size; public CheckSize(int size) { this.size = size; } - @Override public Void apply(Iterable<BeamSqlRow> input) { + @Override public Void apply(Iterable<BeamRecord> input) { int count = 0; - for (BeamSqlRow row : input) { + for (BeamRecord row : input) { count++; } Assert.assertEquals(size, count); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index ffc6833..b58a17f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -32,11 +32,10 @@ import java.util.TimeZone; import org.apache.beam.sdk.extensions.sql.BeamSql; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.util.Pair; import org.junit.Rule; @@ -62,8 +61,8 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { @Rule public final TestPipeline pipeline = TestPipeline.create(); - protected PCollection<BeamSqlRow> getTestPCollection() { - BeamSqlRowType type = BeamSqlRowType.create( + protected PCollection<BeamRecord> getTestPCollection() { + BeamSqlRecordType type = BeamSqlRecordType.create( Arrays.asList("ts", "c_tinyint", "c_smallint", "c_integer", "c_bigint", "c_float", "c_double", "c_decimal", "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"), @@ -89,7 +88,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { 9223372036854775807L ) .buildIOReader(pipeline) - .setCoder(new BeamSqlRowCoder(type)); + .setCoder(type.getRecordCoder()); } catch (Exception e) { throw new RuntimeException(e); } @@ -140,7 +139,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result. */ public void buildRunAndCheck() { - PCollection<BeamSqlRow> inputCollection = getTestPCollection(); + PCollection<BeamRecord> inputCollection = getTestPCollection(); System.out.println("SQL:>\n" + getSql()); try { List<String> names = new ArrayList<>(); @@ -153,10 +152,10 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { values.add(pair.getValue()); } - PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); + PCollection<BeamRecord> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder - .of(BeamSqlRowType.create(names, types)) + .of(BeamSqlRecordType.create(names, types)) .addRows(values) .getRows() ); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java index 14de5b6..3569e31 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java @@ -22,9 +22,8 @@ import java.math.BigDecimal; import java.sql.Types; import java.util.Arrays; import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.Test; @@ -282,8 +281,8 @@ public class BeamSqlComparisonOperatorsIntegrationTest checker.buildRunAndCheck(); } - @Override protected PCollection<BeamSqlRow> getTestPCollection() { - BeamSqlRowType type = BeamSqlRowType.create( + @Override protected PCollection<BeamRecord> getTestPCollection() { + BeamSqlRecordType type = BeamSqlRecordType.create( Arrays.asList( "c_tinyint_0", "c_tinyint_1", "c_tinyint_2", "c_smallint_0", "c_smallint_1", "c_smallint_2", @@ -322,7 +321,7 @@ public class BeamSqlComparisonOperatorsIntegrationTest false, true ) .buildIOReader(pipeline) - .setCoder(new BeamSqlRowCoder(type)); + .setCoder(type.getRecordCoder()); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java index 181c991..cda6a3c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java @@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue; import java.util.Date; import java.util.Iterator; import org.apache.beam.sdk.extensions.sql.BeamSql; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.junit.Test; @@ -63,17 +63,17 @@ public class BeamSqlDateFunctionsIntegrationTest + "CURRENT_TIMESTAMP as c3" + " FROM PCOLLECTION" ; - PCollection<BeamSqlRow> rows = getTestPCollection().apply( + PCollection<BeamRecord> rows = getTestPCollection().apply( BeamSql.simpleQuery(sql)); PAssert.that(rows).satisfies(new Checker()); pipeline.run(); } - private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> { - @Override public Void apply(Iterable<BeamSqlRow> input) { - Iterator<BeamSqlRow> iter = input.iterator(); + private static class Checker implements SerializableFunction<Iterable<BeamRecord>, Void> { + @Override public Void apply(Iterable<BeamRecord> input) { + Iterator<BeamRecord> iter = input.iterator(); assertTrue(iter.hasNext()); - BeamSqlRow row = iter.next(); + BeamRecord row = iter.next(); // LOCALTIME Date date = new Date(); assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java index c7c26eb..073ca52 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java @@ -26,12 +26,12 @@ import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -41,11 +41,11 @@ import org.apache.beam.sdk.values.PDone; */ public class MockedBoundedTable extends MockedTable { /** rows written to this table. */ - private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); + private static final ConcurrentLinkedQueue<BeamRecord> CONTENT = new ConcurrentLinkedQueue<>(); /** rows flow out from this table. */ - private final List<BeamSqlRow> rows = new ArrayList<>(); + private final List<BeamRecord> rows = new ArrayList<>(); - public MockedBoundedTable(BeamSqlRowType beamSqlRowType) { + public MockedBoundedTable(BeamSqlRecordType beamSqlRowType) { super(beamSqlRowType); } @@ -69,7 +69,7 @@ public class MockedBoundedTable extends MockedTable { /** * Build a mocked bounded table with the specified type. */ - public static MockedBoundedTable of(final BeamSqlRowType type) { + public static MockedBoundedTable of(final BeamSqlRecordType type) { return new MockedBoundedTable(type); } @@ -88,7 +88,7 @@ public class MockedBoundedTable extends MockedTable { * }</pre> */ public MockedBoundedTable addRows(Object... args) { - List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args)); + List<BeamRecord> rows = buildRows(getRowType(), Arrays.asList(args)); this.rows.addAll(rows); return this; } @@ -99,12 +99,12 @@ public class MockedBoundedTable extends MockedTable { } @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply( "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows)); } - @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + @Override public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { return new OutputStore(); } @@ -112,11 +112,11 @@ public class MockedBoundedTable extends MockedTable { * Keep output in {@code CONTENT} for validation. * */ - public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> { + public static class OutputStore extends PTransform<PCollection<BeamRecord>, PDone> { @Override - public PDone expand(PCollection<BeamSqlRow> input) { - input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() { + public PDone expand(PCollection<BeamRecord> input) { + input.apply(ParDo.of(new DoFn<BeamRecord, Void>() { @ProcessElement public void processElement(ProcessContext c) { CONTENT.add(c.element()); http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java index 6017ee7..59fc6e1 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java @@ -20,9 +20,9 @@ package org.apache.beam.sdk.extensions.sql.mock; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -31,12 +31,12 @@ import org.apache.beam.sdk.values.PDone; */ public abstract class MockedTable extends BaseBeamTable { public static final AtomicInteger COUNTER = new AtomicInteger(); - public MockedTable(BeamSqlRowType beamSqlRowType) { + public MockedTable(BeamSqlRecordType beamSqlRowType) { super(beamSqlRowType); } @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { throw new UnsupportedOperationException("buildIOWriter unsupported!"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java index f9ea2ac..6194264 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java @@ -24,10 +24,9 @@ import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.calcite.util.Pair; @@ -39,10 +38,10 @@ import org.joda.time.Instant; */ public class MockedUnboundedTable extends MockedTable { /** rows flow out from this table with the specified watermark instant. */ - private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>(); + private final List<Pair<Duration, List<BeamRecord>>> timestampedRows = new ArrayList<>(); /** specify the index of column in the row which stands for the event time field. */ private int timestampField; - private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) { + private MockedUnboundedTable(BeamSqlRecordType beamSqlRowType) { super(beamSqlRowType); } @@ -83,7 +82,7 @@ public class MockedUnboundedTable extends MockedTable { * }</pre> */ public MockedUnboundedTable addRows(Duration duration, Object... args) { - List<BeamSqlRow> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args)); + List<BeamRecord> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args)); // record the watermark + rows this.timestampedRows.add(Pair.of(duration, rows)); return this; @@ -93,11 +92,10 @@ public class MockedUnboundedTable extends MockedTable { return BeamIOType.UNBOUNDED; } - @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - TestStream.Builder<BeamSqlRow> values = TestStream.create( - new BeamSqlRowCoder(beamSqlRowType)); + @Override public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { + TestStream.Builder<BeamRecord> values = TestStream.create(beamSqlRowType.getRecordCoder()); - for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) { + for (Pair<Duration, List<BeamRecord>> pair : timestampedRows) { values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); for (int i = 0; i < pair.getValue().size(); i++) { values = values.addElements(TimestampedValue.of(pair.getValue().get(i), http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java index ddff819..08f98c3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java @@ -21,8 +21,10 @@ package org.apache.beam.sdk.extensions.sql.schema; import java.math.BigDecimal; import java.util.Date; import java.util.GregorianCalendar; +import org.apache.beam.sdk.coders.BeamRecordCoder; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -57,10 +59,10 @@ public class BeamSqlRowCoderTest { } }; - BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType( + BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType( protoRowType.apply(new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT))); - BeamSqlRow row = new BeamSqlRow(beamSQLRowType); + BeamRecord row = new BeamRecord(beamSQLRowType); row.addField("col_tinyint", Byte.valueOf("1")); row.addField("col_smallint", Short.valueOf("1")); row.addField("col_integer", 1); @@ -76,7 +78,7 @@ public class BeamSqlRowCoderTest { row.addField("col_boolean", true); - BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType); + BeamRecordCoder coder = beamSQLRowType.getRecordCoder(); CoderProperties.coderDecodeEncodeEqual(coder, row); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java index 05af36c..2fc013d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java @@ -21,13 +21,13 @@ package org.apache.beam.sdk.extensions.sql.schema.kafka; import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.type.RelDataType; @@ -45,8 +45,8 @@ import org.junit.Test; public class BeamKafkaCSVTableTest { @Rule public TestPipeline pipeline = TestPipeline.create(); - public static BeamSqlRow row1 = new BeamSqlRow(genRowType()); - public static BeamSqlRow row2 = new BeamSqlRow(genRowType()); + public static BeamRecord row1 = new BeamRecord(genRowType()); + public static BeamRecord row2 = new BeamRecord(genRowType()); @BeforeClass public static void setUp() { @@ -60,7 +60,7 @@ public class BeamKafkaCSVTableTest { } @Test public void testCsvRecorderDecoder() throws Exception { - PCollection<BeamSqlRow> result = pipeline + PCollection<BeamRecord> result = pipeline .apply( Create.of("1,\"1\",1.0", "2,2,2.0") ) @@ -75,7 +75,7 @@ public class BeamKafkaCSVTableTest { } @Test public void testCsvRecorderEncoder() throws Exception { - PCollection<BeamSqlRow> result = pipeline + PCollection<BeamRecord> result = pipeline .apply( Create.of(row1, row2) ) @@ -90,7 +90,7 @@ public class BeamKafkaCSVTableTest { pipeline.run(); } - private static BeamSqlRowType genRowType() { + private static BeamSqlRecordType genRowType() { return CalciteUtils.toBeamRowType(new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a0) { http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java index 79e3d6d..4a39f7c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java @@ -33,10 +33,10 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -69,7 +69,7 @@ public class BeamTextCSVTableTest { private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" }; private static List<Object[]> testData = Arrays.asList(data1, data2); - private static List<BeamSqlRow> testDataRows = new ArrayList<BeamSqlRow>() {{ + private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>() {{ for (Object[] data : testData) { add(buildRow(data)); } @@ -80,7 +80,7 @@ public class BeamTextCSVTableTest { private static File writerTargetFile; @Test public void testBuildIOReader() { - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(), + PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); PAssert.that(rows).containsInAnyOrder(testDataRows); pipeline.run(); @@ -93,7 +93,7 @@ public class BeamTextCSVTableTest { .buildIOWriter()); pipeline.run(); - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(), + PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); // confirm the two reads match @@ -166,11 +166,11 @@ public class BeamTextCSVTableTest { .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); } - private static BeamSqlRowType buildBeamSqlRowType() { + private static BeamSqlRecordType buildBeamSqlRowType() { return CalciteUtils.toBeamRowType(buildRelDataType()); } - private static BeamSqlRow buildRow(Object[] data) { - return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data)); + private static BeamRecord buildRow(Object[] data) { + return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java index 821abc9..dca2ad7 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java @@ -21,14 +21,13 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.coders.BeamRecordCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; @@ -36,6 +35,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.core.AggregateCall; @@ -63,14 +63,14 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ private List<AggregateCall> aggCalls; - private BeamSqlRowType keyType; - private BeamSqlRowType aggPartType; - private BeamSqlRowType outputType; + private BeamSqlRecordType keyType; + private BeamSqlRecordType aggPartType; + private BeamSqlRecordType outputType; - private BeamSqlRowCoder inRecordCoder; - private BeamSqlRowCoder keyCoder; - private BeamSqlRowCoder aggCoder; - private BeamSqlRowCoder outRecordCoder; + private BeamRecordCoder inRecordCoder; + private BeamRecordCoder keyCoder; + private BeamRecordCoder aggCoder; + private BeamRecordCoder outRecordCoder; /** * This step equals to below query. @@ -99,28 +99,28 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ public void testCountPerElementBasic() throws ParseException { setupEnvironment(); - PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows)); + PCollection<BeamRecord> input = p.apply(Create.of(inputRows)); //1. extract fields in group-by key part - PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy", + PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder)); + .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder)); //2. apply a GroupByKey. - PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create()) - .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder, - IterableCoder.<BeamSqlRow>of(inRecordCoder))); + PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream + .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create()) + .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder, + IterableCoder.<BeamRecord>of(inRecordCoder))); //3. run aggregation functions - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation", - Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues( + PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation", + Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues( new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder)); + .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder)); //4. flat KV to a single record - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord", + PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); mergedStream.setCoder(outRecordCoder); @@ -332,10 +332,10 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ * Coders used in aggregation steps. */ private void prepareTypeAndCoder() { - inRecordCoder = new BeamSqlRowCoder(inputRowType); + inRecordCoder = inputRowType.getRecordCoder(); keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); - keyCoder = new BeamSqlRowCoder(keyType); + keyCoder = keyType.getRecordCoder(); aggPartType = initTypeOfSqlRow( Arrays.asList(KV.of("count", SqlTypeName.BIGINT), @@ -360,35 +360,35 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) )); - aggCoder = new BeamSqlRowCoder(aggPartType); + aggCoder = aggPartType.getRecordCoder(); outputType = prepareFinalRowType(); - outRecordCoder = new BeamSqlRowCoder(outputType); + outRecordCoder = outputType.getRecordCoder(); } /** * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. */ - private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() { + private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn() { return Arrays.asList( - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), inputRows.get(0)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), inputRows.get(1)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), inputRows.get(2)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), inputRows.get(3))); } /** * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. */ - private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn() + private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn() throws ParseException { return Arrays.asList( - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - new BeamSqlRow(aggPartType, Arrays.<Object>asList( + KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), + new BeamRecord(aggPartType, Arrays.<Object>asList( 4L, 10000L, 2500L, 4000L, 1000L, (short) 10, (short) 2, (short) 4, (short) 1, @@ -404,7 +404,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ /** * Row type of final output row. */ - private BeamSqlRowType prepareFinalRowType() { + private BeamSqlRecordType prepareFinalRowType() { FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), @@ -438,8 +438,8 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ /** * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. */ - private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException { - return new BeamSqlRow(outputType, Arrays.<Object>asList( + private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException { + return new BeamRecord(outputType, Arrays.<Object>asList( 1, 4L, 10000L, 2500L, 4000L, 1000L, (short) 10, (short) 2, (short) 4, (short) 1, http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java index af7ec23..e31463b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java @@ -24,8 +24,8 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType; +import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; import org.apache.calcite.sql.type.SqlTypeName; @@ -38,8 +38,8 @@ import org.junit.BeforeClass; public class BeamTransformBaseTest { public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public static BeamSqlRowType inputRowType; - public static List<BeamSqlRow> inputRows; + public static BeamSqlRecordType inputRowType; + public static List<BeamRecord> inputRows; @BeforeClass public static void prepareInput() throws NumberFormatException, ParseException{ @@ -68,7 +68,7 @@ public class BeamTransformBaseTest { /** * create a {@code BeamSqlRowType} for given column metadata. */ - public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ + public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); @@ -79,7 +79,7 @@ public class BeamTransformBaseTest { /** * Create an empty row with given column metadata. */ - public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { + public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { return initBeamSqlRow(columnMetadata, Arrays.asList()); } @@ -87,11 +87,11 @@ public class BeamTransformBaseTest { * Create a row with given column metadata, and values for each column. * */ - public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, + public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, List<Object> rowValues){ - BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata); + BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata); - return new BeamSqlRow(rowType, rowValues); + return new BeamRecord(rowType, rowValues); } }
