Repository: beam Updated Branches: refs/heads/DSL_SQL 2e9253401 -> df5859d44
[BEAM-2442] BeamSql surface api test. The surface api of BeamSql includes the following: - BeamSql - BeamSqlCli - BeamSqlEnv - All the classes in package org.apache.beam.dsls.sql.schema Calcite related methods are encapsulated into CalciteUtils(which is not part of surface api) to avoid exposure. Created a new BeamSqlTable interface which abstracts the beam table concept. RelDataType, RelProtoDataType are all removed from surface api, BeamSqlRecordType is the only class which represents the schema of a table. java.sql.Types is used to represent sql type instead of Calcite SqlTypeName. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7bcbad53 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7bcbad53 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7bcbad53 Branch: refs/heads/DSL_SQL Commit: 7bcbad53ebe6bbad6c1dd5b8f2b6ff8f9fbf6d26 Parents: 2e92534 Author: James Xu <[email protected]> Authored: Wed Jun 14 23:47:10 2017 +0800 Committer: Tyler Akidau <[email protected]> Committed: Thu Jun 15 17:39:48 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 19 ++-- .../org/apache/beam/dsls/sql/BeamSqlCli.java | 13 +-- .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 52 ++++++++- .../beam/dsls/sql/example/BeamSqlExample.java | 8 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 11 +- .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 4 +- .../beam/dsls/sql/rel/BeamProjectRel.java | 7 +- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 6 +- .../apache/beam/dsls/sql/rel/BeamValuesRel.java | 3 +- .../beam/dsls/sql/schema/BaseBeamTable.java | 70 +----------- .../dsls/sql/schema/BeamPCollectionTable.java | 10 +- .../beam/dsls/sql/schema/BeamSqlRecordType.java | 42 +------- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 5 +- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 6 +- .../beam/dsls/sql/schema/BeamSqlTable.java | 52 +++++++++ .../beam/dsls/sql/schema/BeamTableUtils.java | 3 +- .../sql/schema/kafka/BeamKafkaCSVTable.java | 9 +- .../dsls/sql/schema/kafka/BeamKafkaTable.java | 12 ++- .../beam/dsls/sql/schema/package-info.java | 1 - .../dsls/sql/schema/text/BeamTextCSVTable.java | 10 +- .../dsls/sql/schema/text/BeamTextTable.java | 9 +- .../transform/BeamAggregationTransforms.java | 8 +- .../beam/dsls/sql/utils/CalciteUtils.java | 108 +++++++++++++++++++ .../beam/dsls/sql/utils/package-info.java | 22 ++++ .../beam/dsls/sql/BeamSqlApiSurfaceTest.java | 59 ++++++++++ .../interpreter/BeamSqlFnExecutorTestBase.java | 3 +- .../beam/dsls/sql/planner/BasePlanner.java | 22 ++-- .../BeamPlannerAggregationSubmitTest.java | 12 ++- .../dsls/sql/planner/MockedBeamSqlTable.java | 11 +- .../sql/schema/BeamPCollectionTableTest.java | 7 +- .../dsls/sql/schema/BeamSqlRowCoderTest.java | 3 +- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 19 ++-- .../sql/schema/text/BeamTextCSVTableTest.java | 12 ++- .../transform/BeamAggregationTransformTest.java | 4 +- .../schema/transform/BeamTransformBaseTest.java | 3 +- 35 files changed, 433 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 0d6454b..04fe055 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -17,6 +17,9 @@ */ package org.apache.beam.dsls.sql; +import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; +import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; + import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -71,6 +74,7 @@ p.run().waitUntilFinish(); */ @Experimental public class BeamSql { + /** * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. * @@ -101,7 +105,8 @@ public class BeamSql { /** * A {@link PTransform} representing an execution plan for a SQL query. */ - public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSqlRow>> { + private static class QueryTransform extends + PTransform<PCollectionTuple, PCollection<BeamSqlRow>> { private String sqlQuery; public QueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; @@ -114,13 +119,13 @@ public class BeamSql { PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag); BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); - BeamSqlEnv.registerTable(sourceTag.getId(), - new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema().toRelDataType())); + registerTable(sourceTag.getId(), + new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); } BeamRelNode beamRelNode = null; try { - beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery); + beamRelNode = planner.convertToBeamRel(sqlQuery); } catch (ValidationException | RelConversionException | SqlParseException e) { throw new IllegalStateException(e); } @@ -137,7 +142,7 @@ public class BeamSql { * A {@link PTransform} representing an execution plan for a SQL query referencing * a single table. */ - public static class SimpleQueryTransform + private static class SimpleQueryTransform extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> { private String sqlQuery; public SimpleQueryTransform(String sqlQuery) { @@ -152,8 +157,8 @@ public class BeamSql { public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) { SqlNode sqlNode; try { - sqlNode = BeamSqlEnv.planner.parseQuery(sqlQuery); - BeamSqlEnv.planner.getPlanner().close(); + sqlNode = planner.parseQuery(sqlQuery); + planner.getPlanner().close(); } catch (SqlParseException e) { throw new IllegalStateException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java index a55f655..dbf9a59 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java @@ -17,6 +17,8 @@ */ package org.apache.beam.dsls.sql; +import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; + import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; @@ -25,9 +27,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; /** * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. @@ -38,9 +37,8 @@ public class BeamSqlCli { /** * Returns a human readable representation of the query execution plan. */ - public static String explainQuery(String sqlString) - throws ValidationException, RelConversionException, SqlParseException { - BeamRelNode exeTree = BeamSqlEnv.planner.convertToBeamRel(sqlString); + public static String explainQuery(String sqlString) throws Exception { + BeamRelNode exeTree = planner.convertToBeamRel(sqlString); String beamPlan = RelOptUtil.toString(exeTree); return beamPlan; } @@ -63,8 +61,7 @@ public class BeamSqlCli { public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline) throws Exception{ PCollection<BeamSqlRow> resultStream = - BeamSqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline); + planner.compileBeamPipeline(sqlStatement, basePipeline); return resultStream; } - } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java index af6c007..d7715c7 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -17,9 +17,21 @@ */ package org.apache.beam.dsls.sql; +import java.io.Serializable; + import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.impl.ScalarFunctionImpl; import org.apache.calcite.tools.Frameworks; @@ -30,8 +42,8 @@ import org.apache.calcite.tools.Frameworks; * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. */ public class BeamSqlEnv { - public static SchemaPlus schema; - public static BeamQueryPlanner planner; + static SchemaPlus schema; + static BeamQueryPlanner planner; static { schema = Frameworks.createRootSchema(true); @@ -50,7 +62,7 @@ public class BeamSqlEnv { * */ public static void registerTable(String tableName, BaseBeamTable table) { - schema.add(tableName, table); + schema.add(tableName, new BeamCalciteTable(table.getRecordType())); planner.getSourceTables().put(tableName, table); } @@ -60,4 +72,38 @@ public class BeamSqlEnv { public static BaseBeamTable findTable(String tableName){ return planner.getSourceTables().get(tableName); } + + private static class BeamCalciteTable implements ScannableTable, Serializable { + private BeamSqlRecordType beamSqlRecordType; + public BeamCalciteTable(BeamSqlRecordType beamSqlRecordType) { + this.beamSqlRecordType = beamSqlRecordType; + } + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return CalciteUtils.toCalciteRecordType(this.beamSqlRecordType) + .apply(BeamQueryPlanner.TYPE_FACTORY); + } + + @Override + public Enumerable<Object[]> scan(DataContext root) { + // not used as Beam SQL uses its own execution engine + return null; + } + + /** + * Not used {@link Statistic} to optimize the plan. + */ + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + /** + * all sources are treated as TABLE in Beam SQL. + */ + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 36e1aa9..8ba785b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.example; +import java.sql.Types; import org.apache.beam.dsls.sql.BeamSql; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -31,7 +32,6 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; -import org.apache.calcite.sql.type.SqlTypeName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,9 +48,9 @@ public class BeamSqlExample { //define the input row format BeamSqlRecordType type = new BeamSqlRecordType(); - type.addField("c1", SqlTypeName.INTEGER); - type.addField("c2", SqlTypeName.VARCHAR); - type.addField("c3", SqlTypeName.DOUBLE); + type.addField("c1", Types.INTEGER); + type.addField("c2", Types.VARCHAR); + type.addField("c3", Types.DOUBLE); BeamSqlRow row = new BeamSqlRow(type); row.addField(0, 1); row.addField(1, "row"); http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 9951536..828dcec 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; @@ -109,13 +110,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { stageName + "_aggregation", Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), - BeamSqlRecordType.from(input.getRowType())))) + CalciteUtils.toBeamRecordType(input.getRowType())))) .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder)); PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( - BeamSqlRecordType.from(getRowType()), getAggCallList()))); - mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); + CalciteUtils.toBeamRecordType(getRowType()), getAggCallList()))); + mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); return mergedStream; } @@ -124,7 +125,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { * Type of sub-rowrecord used as Group-By keys. */ private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) { - BeamSqlRecordType inputRecordType = BeamSqlRecordType.from(relDataType); + BeamSqlRecordType inputRecordType = CalciteUtils.toBeamRecordType(relDataType); BeamSqlRecordType typeOfKey = new BeamSqlRecordType(); for (int i : groupSet.asList()) { if (i != windowFieldIdx) { @@ -141,7 +142,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { private BeamSqlRecordType exAggFieldsSchema() { BeamSqlRecordType typeOfAggFields = new BeamSqlRecordType(); for (AggregateCall ac : getAggCallList()) { - typeOfAggFields.addField(ac.name, ac.type.getSqlTypeName()); + typeOfAggFields.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName())); } return typeOfAggFields; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index 4c5e113..dc13646 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -20,10 +20,10 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -63,7 +63,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode { PCollection<BeamSqlRow> filterStream = upstream.apply(stageName, ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor))); - filterStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); + filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); return filterStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index 9b7492b..937a834 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -22,10 +22,10 @@ import java.util.List; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -72,8 +72,9 @@ public class BeamProjectRel extends Project implements BeamRelNode { BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo - .of(new BeamSqlProjectFn(getRelTypeName(), executor, BeamSqlRecordType.from(rowType)))); - projectStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); + .of(new BeamSqlProjectFn(getRelTypeName(), executor, + CalciteUtils.toBeamRecordType(rowType)))); + projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); return projectStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index ff8bbcf..7632e6a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -25,9 +25,9 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -149,7 +149,7 @@ public class BeamSortRel extends Sort implements BeamRelNode { PCollection<BeamSqlRow> orderedStream = rawStream.apply( "flatten", Flatten.<BeamSqlRow>iterables()); - orderedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType()))); + orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); return orderedStream; } @@ -191,7 +191,7 @@ public class BeamSortRel extends Sort implements BeamRelNode { for (int i = 0; i < fieldsIndices.size(); i++) { int fieldIndex = fieldsIndices.get(i); int fieldRet = 0; - SqlTypeName fieldType = row1.getDataType().getFieldsType().get(fieldIndex); + SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex); // whether NULL should be ordered first or last(compared to non-null values) depends on // what user specified in SQL(NULLS FIRST/NULLS LAST) if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) { http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index 9a1887f..61d9713 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -28,6 +28,7 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.schema.BeamTableUtils; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -65,7 +66,7 @@ public class BeamValuesRel extends Values implements BeamRelNode { throw new IllegalStateException("Values with empty tuples!"); } - BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from(this.getRowType()); + BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(this.getRowType()); for (ImmutableList<RexLiteral> tuple : tuples) { BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); for (int i = 0; i < tuple.size(); i++) { http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java index 333bb10..6d49bcc 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java @@ -18,77 +18,17 @@ package org.apache.beam.dsls.sql.schema; import java.io.Serializable; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.DataContext; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.schema.ScannableTable; -import org.apache.calcite.schema.Schema.TableType; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; /** * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. */ -public abstract class BaseBeamTable implements ScannableTable, Serializable { - private RelDataType relDataType; - +public abstract class BaseBeamTable implements BeamSqlTable, Serializable { protected BeamSqlRecordType beamSqlRecordType; - - public BaseBeamTable(RelProtoDataType protoRowType) { - this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY); - this.beamSqlRecordType = BeamSqlRecordType.from(relDataType); - } - - /** - * In Beam SQL, there's no difference between a batch query and a streaming - * query. {@link BeamIOType} is used to validate the sources. - */ - public abstract BeamIOType getSourceType(); - - /** - * create a {@code PCollection<BeamSqlRow>} from source. - * - */ - public abstract PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline); - - /** - * create a {@code IO.write()} instance to write to target. - * - */ - public abstract PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter(); - - @Override - public Enumerable<Object[]> scan(DataContext root) { - // not used as Beam SQL uses its own execution engine - return null; - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return relDataType; + public BaseBeamTable(BeamSqlRecordType beamSqlRecordType) { + this.beamSqlRecordType = beamSqlRecordType; } - /** - * Not used {@link Statistic} to optimize the plan. - */ - @Override - public Statistic getStatistic() { - return Statistics.UNKNOWN; + @Override public BeamSqlRecordType getRecordType() { + return beamSqlRecordType; } - - /** - * all sources are treated as TABLE in Beam SQL. - */ - @Override - public TableType getJdbcTableType() { - return TableType.TABLE; - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java index f679ed7..ecd0d67 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.rel.type.RelProtoDataType; /** * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table, @@ -32,12 +31,13 @@ public class BeamPCollectionTable extends BaseBeamTable { private BeamIOType ioType; private PCollection<BeamSqlRow> upstream; - protected BeamPCollectionTable(RelProtoDataType protoRowType) { - super(protoRowType); + protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); } - public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, RelProtoDataType protoRowType){ - this(protoRowType); + public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, + BeamSqlRecordType beamSqlRecordType){ + this(beamSqlRecordType); ioType = upstream.isBounded().equals(IsBounded.BOUNDED) ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; this.upstream = upstream; http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java index 7da08cc..08ba39f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java @@ -20,12 +20,6 @@ package org.apache.beam.dsls.sql.schema; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; /** * Field type information in {@link BeamSqlRow}. @@ -33,41 +27,13 @@ import org.apache.calcite.sql.type.SqlTypeName; */ public class BeamSqlRecordType implements Serializable { private List<String> fieldsName = new ArrayList<>(); - private List<SqlTypeName> fieldsType = new ArrayList<>(); + private List<Integer> fieldsType = new ArrayList<>(); - /** - * Generate from {@link RelDataType} which is used to create table. - */ - public static BeamSqlRecordType from(RelDataType tableInfo) { - BeamSqlRecordType record = new BeamSqlRecordType(); - for (RelDataTypeField f : tableInfo.getFieldList()) { - record.fieldsName.add(f.getName()); - record.fieldsType.add(f.getType().getSqlTypeName()); - } - return record; - } - - public void addField(String fieldName, SqlTypeName fieldType) { + public void addField(String fieldName, Integer fieldType) { fieldsName.add(fieldName); fieldsType.add(fieldType); } - /** - * Create an instance of {@link RelDataType} so it can be used to create a table. - */ - public RelProtoDataType toRelDataType() { - return new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a) { - FieldInfoBuilder builder = a.builder(); - for (int idx = 0; idx < fieldsName.size(); ++idx) { - builder.add(fieldsName.get(idx), fieldsType.get(idx)); - } - return builder.build(); - } - }; - } - public int size() { return fieldsName.size(); } @@ -80,11 +46,11 @@ public class BeamSqlRecordType implements Serializable { this.fieldsName = fieldsName; } - public List<SqlTypeName> getFieldsType() { + public List<Integer> getFieldsType() { return fieldsType; } - public void setFieldsType(List<SqlTypeName> fieldsType) { + public void setFieldsType(List<Integer> fieldsType) { this.fieldsType = fieldsType; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index eb311cf..3a67303 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -24,6 +24,7 @@ import java.util.Date; import java.util.GregorianCalendar; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.calcite.sql.type.SqlTypeName; @@ -81,7 +82,7 @@ public class BeamSqlRow implements Serializable { } } - SqlTypeName fieldType = dataType.getFieldsType().get(index); + SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index); switch (fieldType) { case INTEGER: if (!(fieldValue instanceof Integer)) { @@ -201,7 +202,7 @@ public class BeamSqlRow implements Serializable { } Object fieldValue = dataValues.get(fieldIdx); - SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); + SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, fieldIdx); switch (fieldType) { case INTEGER: http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index bcbd481..e86fb3f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -23,6 +23,8 @@ import java.io.OutputStream; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; + +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; @@ -62,7 +64,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { continue; } - switch (value.getDataType().getFieldsType().get(idx)) { + switch (CalciteUtils.getFieldType(value.getDataType(), idx)) { case INTEGER: intCoder.encode(value.getInteger(idx), outStream); break; @@ -117,7 +119,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { continue; } - switch (tableSchema.getFieldsType().get(idx)) { + switch (CalciteUtils.getFieldType(tableSchema, idx)) { case INTEGER: record.addField(idx, intCoder.decode(inStream)); break; http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java new file mode 100644 index 0000000..986decb --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * This interface defines a Beam Sql Table. + */ +public interface BeamSqlTable { + /** + * In Beam SQL, there's no difference between a batch query and a streaming + * query. {@link BeamIOType} is used to validate the sources. + */ + BeamIOType getSourceType(); + + /** + * create a {@code PCollection<BeamSqlRow>} from source. + * + */ + PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline); + + /** + * create a {@code IO.write()} instance to write to target. + * + */ + PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter(); + + /** + * Get the schema info of the table. + */ + BeamSqlRecordType getRecordType(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java index 1c1db91..79a9cb2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; import java.math.BigDecimal; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.NlsString; import org.apache.commons.csv.CSVFormat; @@ -78,7 +79,7 @@ public final class BeamTableUtils { return; } - SqlTypeName columnType = row.getDataType().getFieldsType().get(idx); + SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx); // auto-casting for numberics if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java index f8c2553..39cf8d8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.commons.csv.CSVFormat; /** @@ -38,14 +37,14 @@ import org.apache.commons.csv.CSVFormat; */ public class BeamKafkaCSVTable extends BeamKafkaTable { private CSVFormat csvFormat; - public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers, + public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers, List<String> topics) { - this(protoRowType, bootstrapServers, topics, CSVFormat.DEFAULT); + this(beamSqlRecordType, bootstrapServers, topics, CSVFormat.DEFAULT); } - public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers, + public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers, List<String> topics, CSVFormat format) { - super(protoRowType, bootstrapServers, topics); + super(beamSqlRecordType, bootstrapServers, topics); this.csvFormat = format; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java index c43fa2c..f27014e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -18,11 +18,14 @@ package org.apache.beam.dsls.sql.schema.kafka; import static com.google.common.base.Preconditions.checkArgument; + import java.io.Serializable; import java.util.List; import java.util.Map; + import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -32,7 +35,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -47,13 +49,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab private List<String> topics; private Map<String, Object> configUpdates; - protected BeamKafkaTable(RelProtoDataType protoRowType) { - super(protoRowType); + protected BeamKafkaTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); } - public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers, + public BeamKafkaTable(BeamSqlRecordType beamSqlRecordType, String bootstrapServers, List<String> topics) { - super(protoRowType); + super(beamSqlRecordType); this.bootstrapServers = bootstrapServers; this.topics = topics; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java index 47de06f..4c41826 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - /** * define table schema, to map with Beam IO components. * http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java index e575eee..41a786f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java @@ -18,6 +18,7 @@ package org.apache.beam.dsls.sql.schema.text; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; @@ -25,7 +26,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.commons.csv.CSVFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,13 +46,13 @@ public class BeamTextCSVTable extends BeamTextTable { /** * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. */ - public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern) { - this(protoDataType, filePattern, CSVFormat.DEFAULT); + public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern) { + this(beamSqlRecordType, filePattern, CSVFormat.DEFAULT); } - public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern, + public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern, CSVFormat csvFormat) { - super(protoDataType, filePattern); + super(beamSqlRecordType, filePattern); this.csvFormat = csvFormat; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java index 3353761..525c210 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java @@ -22,19 +22,16 @@ import java.io.Serializable; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; /** * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). */ public abstract class BeamTextTable extends BaseBeamTable implements Serializable { protected String filePattern; - protected BeamTextTable(RelProtoDataType protoRowType) { - super(protoRowType); - } - protected BeamTextTable(RelProtoDataType protoDataType, String filePattern) { - super(protoDataType); + protected BeamTextTable(BeamSqlRecordType beamSqlRecordType, String filePattern) { + super(beamSqlRecordType); this.filePattern = filePattern; } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java index 51d3e89..e804b94 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java @@ -27,6 +27,7 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -163,7 +164,7 @@ public class BeamAggregationTransforms implements Serializable{ //verify it's supported. verifySupportedAggregation(ac); - aggDataType.addField(ac.name, ac.type.getSqlTypeName()); + aggDataType.addField(ac.name, CalciteUtils.toJavaType(ac.type.getSqlTypeName())); SqlAggFunction aggFn = ac.getAggregation(); switch (aggFn.getName()) { @@ -178,7 +179,7 @@ public class BeamAggregationTransforms implements Serializable{ case "AVG": int refIndex = ac.getArgList().get(0); aggElementExpressions.add(new BeamSqlInputRefExpression( - sourceRowRecordType.getFieldsType().get(refIndex), refIndex)); + CalciteUtils.getFieldType(sourceRowRecordType, refIndex), refIndex)); if ("AVG".equals(aggFn.getName())) { hasAvg = true; } @@ -191,7 +192,8 @@ public class BeamAggregationTransforms implements Serializable{ } // add a COUNT holder if only have AVG if (hasAvg && !hasCount) { - aggDataType.addField("__COUNT", SqlTypeName.BIGINT); + aggDataType.addField("__COUNT", + CalciteUtils.toJavaType(SqlTypeName.BIGINT)); aggFunctions.add("COUNT"); aggElementExpressions.add(BeamSqlPrimitive.<Long>of(SqlTypeName.BIGINT, 1L)); http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java new file mode 100644 index 0000000..46b4911 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.utils; + +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Utility methods for Calcite related operations. + */ +public class CalciteUtils { + private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>(); + private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>(); + static { + JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT); + JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT); + JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER); + JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT); + + JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT); + JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE); + + JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL); + + JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR); + JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR); + + JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME); + JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP); + + for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) { + CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey()); + } + } + + /** + * Get the corresponding {@code SqlTypeName} for an integer sql type. + */ + public static SqlTypeName toCalciteType(int type) { + return JAVA_TO_CALCITE_MAPPING.get(type); + } + + /** + * Get the integer sql type from Calcite {@code SqlTypeName}. + */ + public static Integer toJavaType(SqlTypeName typeName) { + return CALCITE_TO_JAVA_MAPPING.get(typeName); + } + + /** + * Get the {@code SqlTypeName} for the specified column of a table. + * @return + */ + public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) { + return toCalciteType(schema.getFieldsType().get(index)); + } + + /** + * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table. + */ + public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) { + BeamSqlRecordType record = new BeamSqlRecordType(); + for (RelDataTypeField f : tableInfo.getFieldList()) { + record.getFieldsName().add(f.getName()); + record.getFieldsType().add(toJavaType(f.getType().getSqlTypeName())); + } + return record; + } + + /** + * Create an instance of {@code RelDataType} so it can be used to create a table. + */ + public static RelProtoDataType toCalciteRecordType(final BeamSqlRecordType that) { + return new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a) { + RelDataTypeFactory.FieldInfoBuilder builder = a.builder(); + for (int idx = 0; idx < that.getFieldsName().size(); ++idx) { + builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx))); + } + return builder.build(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java new file mode 100644 index 0000000..b5c861a --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utility classes. + */ +package org.apache.beam.dsls.sql.utils; http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java new file mode 100644 index 0000000..922931c --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql; + +import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.beam.sdk.util.ApiSurface; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Surface test for BeamSql api. + */ +@RunWith(JUnit4.class) +public class BeamSqlApiSurfaceTest { + @Test + public void testSdkApiSurface() throws Exception { + + @SuppressWarnings("unchecked") + final Set<String> allowed = + ImmutableSet.of( + "org.apache.beam", + "org.joda.time", + "org.apache.commons.csv"); + + ApiSurface surface = ApiSurface + .ofClass(BeamSqlCli.class) + .includingClass(BeamSql.class) + .includingClass(BeamSqlEnv.class) + .includingPackage("org.apache.beam.dsls.sql.schema", + getClass().getClassLoader()) + .pruningPrefix("java") + .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*Test") + .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*TestBase"); + + assertThat(surface, containsOnlyPackages(allowed)); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java index d83ca8f..739d548 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java @@ -25,6 +25,7 @@ import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem; import org.apache.beam.dsls.sql.planner.BeamRuleSets; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; @@ -69,7 +70,7 @@ public class BeamSqlFnExecutorTestBase { .add("price", SqlTypeName.DOUBLE) .add("order_time", SqlTypeName.BIGINT).build(); - beamRecordType = BeamSqlRecordType.from(relDataType); + beamRecordType = CalciteUtils.toBeamRecordType(relDataType); record = new BeamSqlRow(beamRecordType); record.addField(0, 1234567L); http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java index 7f69345..2c5b555 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java @@ -17,15 +17,18 @@ */ package org.apache.beam.dsls.sql.planner; +import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; + import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; -import org.apache.beam.dsls.sql.BeamSqlEnv; + import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; @@ -40,9 +43,9 @@ import org.junit.BeforeClass; public class BasePlanner { @BeforeClass public static void prepareClass() { - BeamSqlEnv.registerTable("ORDER_DETAILS", getTable()); - BeamSqlEnv.registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", getTable()); + registerTable("ORDER_DETAILS", getTable()); + registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + registerTable("SUB_ORDER_RAM", getTable()); } private static BaseBeamTable getTable() { @@ -54,8 +57,8 @@ public class BasePlanner { } }; - BeamSqlRecordType dataType = BeamSqlRecordType.from( - protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + BeamSqlRecordType dataType = CalciteUtils + .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); BeamSqlRow row1 = new BeamSqlRow(dataType); row1.addField(0, 12345L); row1.addField(1, 0); @@ -80,7 +83,7 @@ public class BasePlanner { row4.addField(2, 20.5); row4.addField(3, new Date()); - return new MockedBeamSqlTable(protoRowType).withInputRecords( + return new MockedBeamSqlTable(dataType).withInputRecords( Arrays.asList(row1, row2, row3, row4)); } @@ -93,10 +96,13 @@ public class BasePlanner { } }; + BeamSqlRecordType dataType = CalciteUtils + .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + Map<String, Object> consumerPara = new HashMap<String, Object>(); consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) + return new BeamKafkaCSVTable(dataType, bootstrapServer, Arrays.asList(topic)) .updateConsumerProperties(consumerPara); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java index e12eca2..f98517b 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java @@ -27,6 +27,7 @@ import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -69,8 +70,8 @@ public class BeamPlannerAggregationSubmitTest { } }; - BeamSqlRecordType dataType = BeamSqlRecordType.from( - protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + BeamSqlRecordType dataType = CalciteUtils + .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); BeamSqlRow row1 = new BeamSqlRow(dataType); row1.addField(0, 12345L); row1.addField(1, 1); @@ -91,7 +92,7 @@ public class BeamPlannerAggregationSubmitTest { row4.addField(1, 0); row4.addField(2, format.parse("2017-01-01 03:04:05")); - return new MockedBeamSqlTable(protoRowType).withInputRecords( + return new MockedBeamSqlTable(dataType).withInputRecords( Arrays.asList(row1 , row2, row3, row4 )); @@ -108,7 +109,10 @@ public class BeamPlannerAggregationSubmitTest { .add("size", SqlTypeName.BIGINT).build(); } }; - return new MockedBeamSqlTable(protoRowType); + BeamSqlRecordType dataType = CalciteUtils + .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + + return new MockedBeamSqlTable(dataType); } http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java index 185e95a..f651f6a 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -26,6 +26,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -49,8 +50,8 @@ public class MockedBeamSqlTable extends BaseBeamTable { private List<BeamSqlRow> inputRecords; - public MockedBeamSqlTable(RelProtoDataType protoRowType) { - super(protoRowType); + public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); } public MockedBeamSqlTable withInputRecords(List<BeamSqlRow> inputRecords){ @@ -102,8 +103,8 @@ public class MockedBeamSqlTable extends BaseBeamTable { }; List<BeamSqlRow> rows = new ArrayList<>(); - BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from( - protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + BeamSqlRecordType beamSQLRecordType = CalciteUtils + .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); int fieldCount = beamSQLRecordType.size(); for (int i = fieldCount * 2; i < args.length; i += fieldCount) { @@ -113,7 +114,7 @@ public class MockedBeamSqlTable extends BaseBeamTable { } rows.add(row); } - return new MockedBeamSqlTable(protoRowType).withInputRecords(rows); + return new MockedBeamSqlTable(beamSQLRecordType).withInputRecords(rows); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java index a085eae..8dc8439 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java @@ -21,6 +21,7 @@ import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.BasePlanner; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PBegin; @@ -49,14 +50,16 @@ public class BeamPCollectionTableTest extends BasePlanner{ .add("c2", SqlTypeName.VARCHAR).build(); } }; + BeamSqlRecordType dataType = CalciteUtils + .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - BeamSqlRow row = new BeamSqlRow(BeamSqlRecordType.from( + BeamSqlRow row = new BeamSqlRow(CalciteUtils.toBeamRecordType( protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY))); row.addField(0, 1); row.addField(1, "hello world."); PCollection<BeamSqlRow> inputStream = PBegin.in(pipeline).apply(Create.of(row)); BeamSqlEnv.registerTable("COLLECTION_TABLE", - new BeamPCollectionTable(inputStream, protoRowType)); + new BeamPCollectionTable(inputStream, dataType)); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index c087825..b358fe1 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.util.Date; import java.util.GregorianCalendar; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; @@ -56,7 +57,7 @@ public class BeamSqlRowCoderTest { } }; - BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from( + BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType( protoRowType.apply(new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT))); BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java index fc19d40..9cd0915 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java @@ -23,6 +23,7 @@ import java.io.Serializable; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -91,16 +92,14 @@ public class BeamKafkaCSVTableTest { } private static BeamSqlRecordType genRowType() { - return BeamSqlRecordType.from( - new RelProtoDataType() { - @Override public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder() - .add("order_id", SqlTypeName.BIGINT) - .add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE) - .build(); - } - }.apply(BeamQueryPlanner.TYPE_FACTORY)); + return CalciteUtils.toBeamRecordType(new RelProtoDataType() { + + @Override public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("order_id", SqlTypeName.BIGINT) + .add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE).build(); + } + }.apply(BeamQueryPlanner.TYPE_FACTORY)); } private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>> http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java index d782aad..176df46 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java @@ -35,6 +35,7 @@ import java.util.List; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -80,19 +81,20 @@ public class BeamTextCSVTableTest { private static File writerTargetFile; @Test public void testBuildIOReader() { - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildRowType(), + PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); PAssert.that(rows).containsInAnyOrder(testDataRows); pipeline.run(); } @Test public void testBuildIOWriter() { - new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) - .apply(new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()) + new BeamTextCSVTable(buildBeamSqlRecordType(), + readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) + .apply(new BeamTextCSVTable(buildBeamSqlRecordType(), writerTargetFile.getAbsolutePath()) .buildIOWriter()); pipeline.run(); - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildRowType(), + PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(), writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); // confirm the two reads match @@ -166,7 +168,7 @@ public class BeamTextCSVTableTest { } private static BeamSqlRecordType buildBeamSqlRecordType() { - return BeamSqlRecordType.from(buildRelDataType()); + return CalciteUtils.toBeamRecordType(buildRelDataType()); } private static BeamSqlRow buildRow(Object[] data) { http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java index 5cbbe41..388a344 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java @@ -21,11 +21,13 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; + import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.testing.PAssert; @@ -431,7 +433,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); } - return BeamSqlRecordType.from(builder.build()); + return CalciteUtils.toBeamRecordType(builder.build()); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7bcbad53/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java index ef85347..2e91405 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.values.KV; import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; import org.apache.calcite.sql.type.SqlTypeName; @@ -72,7 +73,7 @@ public class BeamTransformBaseTest { for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); } - return BeamSqlRecordType.from(builder.build()); + return CalciteUtils.toBeamRecordType(builder.build()); } /**
