Repository: beam Updated Branches: refs/heads/DSL_SQL 9eec6a030 -> a1cc5518e
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java deleted file mode 100644 index 8751bbb..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema; - -import java.math.BigDecimal; -import java.util.Date; -import java.util.GregorianCalendar; -import org.apache.beam.sdk.coders.BeamRecordCoder; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Tests for BeamSqlRowCoder. - */ -public class BeamSqlRowCoderTest { - - @Test - public void encodeAndDecode() throws Exception { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder() - .add("col_tinyint", SqlTypeName.TINYINT) - .add("col_smallint", SqlTypeName.SMALLINT) - .add("col_integer", SqlTypeName.INTEGER) - .add("col_bigint", SqlTypeName.BIGINT) - .add("col_float", SqlTypeName.FLOAT) - .add("col_double", SqlTypeName.DOUBLE) - .add("col_decimal", SqlTypeName.DECIMAL) - .add("col_string_varchar", SqlTypeName.VARCHAR) - .add("col_time", SqlTypeName.TIME) - .add("col_timestamp", SqlTypeName.TIMESTAMP) - .add("col_boolean", SqlTypeName.BOOLEAN) - .build(); - } - }; - - BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType( - protoRowType.apply(new JavaTypeFactoryImpl( - RelDataTypeSystem.DEFAULT))); - - GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTime(new Date()); - BeamRecord row = new BeamRecord(beamSQLRowType - , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1 - , BigDecimal.ZERO, "hello", calendar, new Date(), true); - - - BeamRecordCoder coder = beamSQLRowType.getRecordCoder(); - CoderProperties.coderDecodeEncodeEqual(coder, row); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java deleted file mode 100644 index e5d81fa..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema.kafka; - -import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.commons.csv.CSVFormat; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for BeamKafkaCSVTable. - */ -public class BeamKafkaCSVTableTest { - @Rule - public TestPipeline pipeline = TestPipeline.create(); - public static BeamRecord row1; - public static BeamRecord row2; - - @BeforeClass - public static void setUp() { - row1 = new BeamRecord(genRowType(), 1L, 1, 1.0); - - row2 = new BeamRecord(genRowType(), 2L, 2, 2.0); - } - - @Test public void testCsvRecorderDecoder() throws Exception { - PCollection<BeamRecord> result = pipeline - .apply( - Create.of("1,\"1\",1.0", "2,2,2.0") - ) - .apply(ParDo.of(new String2KvBytes())) - .apply( - new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) - ); - - PAssert.that(result).containsInAnyOrder(row1, row2); - - pipeline.run(); - } - - @Test public void testCsvRecorderEncoder() throws Exception { - PCollection<BeamRecord> result = pipeline - .apply( - Create.of(row1, row2) - ) - .apply( - new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT) - ).apply( - new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) - ); - - PAssert.that(result).containsInAnyOrder(row1, row2); - - pipeline.run(); - } - - private static BeamRecordSqlType genRowType() { - return CalciteUtils.toBeamRowType(new RelProtoDataType() { - - @Override public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT) - .add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).build(); - } - }.apply(BeamQueryPlanner.TYPE_FACTORY)); - } - - private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>> - implements Serializable { - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(KV.of(new byte[] {}, ctx.element().getBytes())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java deleted file mode 100644 index 8935c46..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema.text; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintStream; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Tests for {@code BeamTextCSVTable}. - */ -public class BeamTextCSVTableTest { - - @Rule public TestPipeline pipeline = TestPipeline.create(); - @Rule public TestPipeline pipeline2 = TestPipeline.create(); - - /** - * testData. - * - * <p> - * The types of the csv fields are: - * integer,bigint,float,double,string - * </p> - */ - private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" }; - private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" }; - - private static List<Object[]> testData = Arrays.asList(data1, data2); - private static List<BeamRecord> testDataRows = new ArrayList<BeamRecord>() {{ - for (Object[] data : testData) { - add(buildRow(data)); - } - }}; - - private static Path tempFolder; - private static File readerSourceFile; - private static File writerTargetFile; - - @Test public void testBuildIOReader() { - PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(), - readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); - PAssert.that(rows).containsInAnyOrder(testDataRows); - pipeline.run(); - } - - @Test public void testBuildIOWriter() { - new BeamTextCSVTable(buildBeamSqlRowType(), - readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) - .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath()) - .buildIOWriter()); - pipeline.run(); - - PCollection<BeamRecord> rows = new BeamTextCSVTable(buildBeamSqlRowType(), - writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); - - // confirm the two reads match - PAssert.that(rows).containsInAnyOrder(testDataRows); - pipeline2.run(); - } - - @BeforeClass public static void setUp() throws IOException { - tempFolder = Files.createTempDirectory("BeamTextTableTest"); - readerSourceFile = writeToFile(testData, "readerSourceFile.txt"); - writerTargetFile = writeToFile(testData, "writerTargetFile.txt"); - } - - @AfterClass public static void teardownClass() throws IOException { - Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() { - - @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) - throws IOException { - Files.delete(file); - return FileVisitResult.CONTINUE; - } - - @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) - throws IOException { - Files.delete(dir); - return FileVisitResult.CONTINUE; - } - }); - } - - private static File writeToFile(List<Object[]> rows, String filename) throws IOException { - File file = tempFolder.resolve(filename).toFile(); - OutputStream output = new FileOutputStream(file); - writeToStreamAndClose(rows, output); - return file; - } - - /** - * Helper that writes the given lines (adding a newline in between) to a stream, then closes the - * stream. - */ - private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream) { - try (PrintStream writer = new PrintStream(outputStream)) { - CSVPrinter printer = CSVFormat.DEFAULT.print(writer); - for (Object[] row : rows) { - for (Object field : row) { - printer.print(field); - } - printer.println(); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - private RelProtoDataType buildRowType() { - return new RelProtoDataType() { - - @Override public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT) - .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE) - .add("user_name", SqlTypeName.VARCHAR).build(); - } - }; - } - - private static RelDataType buildRelDataType() { - return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER) - .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT) - .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); - } - - private static BeamRecordSqlType buildBeamSqlRowType() { - return CalciteUtils.toBeamRowType(buildRelDataType()); - } - - private static BeamRecord buildRow(Object[] data) { - return new BeamRecord(buildBeamSqlRowType(), Arrays.asList(data)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java deleted file mode 100644 index 64f2ccd..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java +++ /dev/null @@ -1,453 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema.transform; - -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.coders.BeamRecordCoder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; -import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.fun.SqlAvgAggFunction; -import org.apache.calcite.sql.fun.SqlCountAggFunction; -import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; -import org.apache.calcite.sql.fun.SqlSumAggFunction; -import org.apache.calcite.sql.type.BasicSqlType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.util.ImmutableBitSet; -import org.junit.Rule; -import org.junit.Test; - -/** - * Unit tests for {@link BeamAggregationTransforms}. - * - */ -public class BeamAggregationTransformTest extends BeamTransformBaseTest{ - - @Rule - public TestPipeline p = TestPipeline.create(); - - private List<AggregateCall> aggCalls; - - private BeamRecordSqlType keyType; - private BeamRecordSqlType aggPartType; - private BeamRecordSqlType outputType; - - private BeamRecordCoder inRecordCoder; - private BeamRecordCoder keyCoder; - private BeamRecordCoder aggCoder; - private BeamRecordCoder outRecordCoder; - - /** - * This step equals to below query. - * <pre> - * SELECT `f_int` - * , COUNT(*) AS `size` - * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1` - * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1` - * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2` - * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2` - * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3` - * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3` - * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4` - * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4` - * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5` - * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5` - * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7` - * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8` - * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8` - * FROM TABLE_NAME - * GROUP BY `f_int` - * </pre> - * @throws ParseException - */ - @Test - public void testCountPerElementBasic() throws ParseException { - setupEnvironment(); - - PCollection<BeamRecord> input = p.apply(Create.of(inputRows)); - - //1. extract fields in group-by key part - PCollection<KV<BeamRecord, BeamRecord>> exGroupByStream = input.apply("exGroupBy", - WithKeys - .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) - .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, inRecordCoder)); - - //2. apply a GroupByKey. - PCollection<KV<BeamRecord, Iterable<BeamRecord>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamRecord, BeamRecord>create()) - .setCoder(KvCoder.<BeamRecord, Iterable<BeamRecord>>of(keyCoder, - IterableCoder.<BeamRecord>of(inRecordCoder))); - - //3. run aggregation functions - PCollection<KV<BeamRecord, BeamRecord>> aggregatedStream = groupedStream.apply("aggregation", - Combine.<BeamRecord, BeamRecord, BeamRecord>groupedValues( - new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType))) - .setCoder(KvCoder.<BeamRecord, BeamRecord>of(keyCoder, aggCoder)); - - //4. flat KV to a single record - PCollection<BeamRecord> mergedStream = aggregatedStream.apply("mergeRecord", - ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); - mergedStream.setCoder(outRecordCoder); - - //assert function BeamAggregationTransform.AggregationGroupByKeyFn - PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn()); - - //assert BeamAggregationTransform.AggregationCombineFn - PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn()); - - //assert BeamAggregationTransform.MergeAggregationRecord - PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord()); - - p.run(); -} - - private void setupEnvironment() { - prepareAggregationCalls(); - prepareTypeAndCoder(); - } - - /** - * create list of all {@link AggregateCall}. - */ - @SuppressWarnings("deprecation") - private void prepareAggregationCalls() { - //aggregations for all data type - aggCalls = new ArrayList<>(); - aggCalls.add( - new AggregateCall(new SqlCountAggFunction(), false, - Arrays.<Integer>asList(), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "count") - ); - aggCalls.add( - new AggregateCall(new SqlSumAggFunction( - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "sum1") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "avg1") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "max1") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "min1") - ); - - aggCalls.add( - new AggregateCall(new SqlSumAggFunction( - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "sum2") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "avg2") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "max2") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "min2") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)), - false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "sum3") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "avg3") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "max3") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "min3") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)), - false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "sum4") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "avg4") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "max4") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "min4") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)), - false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "sum5") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "avg5") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "max5") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "min5") - ); - - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(7), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), - "max7") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(7), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), - "min7") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)), - false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "sum8") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "avg8") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "max8") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "min8") - ); - } - - /** - * Coders used in aggregation steps. - */ - private void prepareTypeAndCoder() { - inRecordCoder = inputRowType.getRecordCoder(); - - keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); - keyCoder = keyType.getRecordCoder(); - - aggPartType = initTypeOfSqlRow( - Arrays.asList(KV.of("count", SqlTypeName.BIGINT), - - KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), - KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), - - KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), - KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), - - KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), - KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), - - KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), - KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), - - KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), - KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), - - KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), - - KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), - KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) - )); - aggCoder = aggPartType.getRecordCoder(); - - outputType = prepareFinalRowType(); - outRecordCoder = outputType.getRecordCoder(); - } - - /** - * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. - */ - private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationGroupByKeyFn() { - return Arrays.asList( - KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - inputRows.get(0)), - KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), - inputRows.get(1)), - KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), - inputRows.get(2)), - KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), - inputRows.get(3))); - } - - /** - * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. - */ - private List<KV<BeamRecord, BeamRecord>> prepareResultOfAggregationCombineFn() - throws ParseException { - return Arrays.asList( - KV.of(new BeamRecord(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - new BeamRecord(aggPartType, Arrays.<Object>asList( - 4L, - 10000L, 2500L, 4000L, 1000L, - (short) 10, (short) 2, (short) 4, (short) 1, - (byte) 10, (byte) 2, (byte) 4, (byte) 1, - 10.0F, 2.5F, 4.0F, 1.0F, - 10.0, 2.5, 4.0, 1.0, - format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), - 10, 2, 4, 1 - ))) - ); - } - - /** - * Row type of final output row. - */ - private BeamRecordSqlType prepareFinalRowType() { - FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); - List<KV<String, SqlTypeName>> columnMetadata = - Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), - - KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), - KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), - - KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), - KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), - - KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), - KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), - - KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), - KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), - - KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), - KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), - - KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), - - KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), - KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) - ); - for (KV<String, SqlTypeName> cm : columnMetadata) { - builder.add(cm.getKey(), cm.getValue()); - } - return CalciteUtils.toBeamRowType(builder.build()); - } - - /** - * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. - */ - private BeamRecord prepareResultOfMergeAggregationRecord() throws ParseException { - return new BeamRecord(outputType, Arrays.<Object>asList( - 1, 4L, - 10000L, 2500L, 4000L, 1000L, - (short) 10, (short) 2, (short) 4, (short) 1, - (byte) 10, (byte) 2, (byte) 4, (byte) 1, - 10.0F, 2.5F, 4.0F, 1.0F, - 10.0, 2.5, 4.0, 1.0, - format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), - 10, 2, 4, 1 - )); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java deleted file mode 100644 index da6e95b..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamTransformBaseTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema.transform; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.KV; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.BeforeClass; - -/** - * shared methods to test PTransforms which execute Beam SQL steps. - * - */ -public class BeamTransformBaseTest { - public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - public static BeamRecordSqlType inputRowType; - public static List<BeamRecord> inputRows; - - @BeforeClass - public static void prepareInput() throws NumberFormatException, ParseException{ - List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList( - KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT), - KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT), - KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE), - KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP), - KV.of("f_int2", SqlTypeName.INTEGER) - ); - inputRowType = initTypeOfSqlRow(columnMetadata); - inputRows = Arrays.asList( - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0, - "string_row1", format.parse("2017-01-01 01:01:03"), 1)), - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0, - "string_row2", format.parse("2017-01-01 01:02:03"), 2)), - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0, - "string_row3", format.parse("2017-01-01 01:03:03"), 3)), - initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"), - Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4))); - } - - /** - * create a {@code BeamSqlRowType} for given column metadata. - */ - public static BeamRecordSqlType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ - FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); - for (KV<String, SqlTypeName> cm : columnMetadata) { - builder.add(cm.getKey(), cm.getValue()); - } - return CalciteUtils.toBeamRowType(builder.build()); - } - - /** - * Create an empty row with given column metadata. - */ - public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { - return initBeamSqlRow(columnMetadata, Arrays.asList()); - } - - /** - * Create a row with given column metadata, and values for each column. - * - */ - public static BeamRecord initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, - List<Object> rowValues){ - BeamRecordSqlType rowType = initTypeOfSqlRow(columnMetadata); - - return new BeamRecord(rowType, rowValues); - } - -}
