Repository: beam Updated Branches: refs/heads/DSL_SQL ed9764124 -> db982cfe1
[BEAM-2309] Implement VALUES and add support for data type CHAR (to be able to test VALUES) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/433282f5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/433282f5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/433282f5 Branch: refs/heads/DSL_SQL Commit: 433282f5eec56802d50f1e05d834f380034f0940 Parents: ed97641 Author: James Xu <[email protected]> Authored: Wed May 17 14:41:18 2017 +0800 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Tue May 23 08:46:24 2017 +0200 ---------------------------------------------------------------------- .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 12 ++- .../interpreter/operator/BeamSqlPrimitive.java | 1 + .../beam/dsls/sql/planner/BeamRuleSets.java | 3 +- .../apache/beam/dsls/sql/rel/BeamValuesRel.java | 78 ++++++++++++++++ .../beam/dsls/sql/rule/BeamValuesRule.java | 48 ++++++++++ .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 2 + .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 2 + .../beam/dsls/sql/schema/BeamTableUtils.java | 71 +++++++++------ .../dsls/sql/transform/BeamSQLProjectFn.java | 3 +- .../beam/dsls/sql/planner/BasePlanner.java | 2 +- .../BeamPlannerAggregationSubmitTest.java | 13 ++- .../dsls/sql/planner/BeamPlannerSubmitTest.java | 8 +- .../dsls/sql/planner/MockedBeamSQLTable.java | 9 +- .../beam/dsls/sql/rel/BeamSortRelTest.java | 22 +++-- .../beam/dsls/sql/rel/BeamValuesRelTest.java | 95 ++++++++++++++++++++ 15 files changed, 318 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java index be388aa..9dcf003 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java @@ -63,7 +63,9 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.impl.ScalarFunctionImpl; import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlUserDefinedFunction; +import org.apache.calcite.util.NlsString; /** * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. @@ -99,8 +101,14 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor { static BeamSqlExpression buildExpression(RexNode rexNode) { if (rexNode instanceof RexLiteral) { RexLiteral node = (RexLiteral) rexNode; - - return BeamSqlPrimitive.of(node.getTypeName(), node.getValue()); + // NlsString is not serializable, we need to convert + // it to string explicitly. + if (SqlTypeName.CHAR_TYPES.contains(node.getTypeName()) + && node.getValue() instanceof NlsString) { + return BeamSqlPrimitive.of(node.getTypeName(), ((NlsString) node.getValue()).getValue()); + } else { + return BeamSqlPrimitive.of(node.getTypeName(), node.getValue()); + } } else if (rexNode instanceof RexInputRef) { RexInputRef node = (RexInputRef) rexNode; return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex()); http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java index a5938f3..bc18c5e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; + import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; import org.apache.beam.dsls.sql.schema.BeamSQLRow; http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java index 2cac5ae..1ad62bc 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java @@ -28,6 +28,7 @@ import org.apache.beam.dsls.sql.rule.BeamIOSinkRule; import org.apache.beam.dsls.sql.rule.BeamIOSourceRule; import org.apache.beam.dsls.sql.rule.BeamProjectRule; import org.apache.beam.dsls.sql.rule.BeamSortRule; +import org.apache.beam.dsls.sql.rule.BeamValuesRule; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.rel.RelNode; import org.apache.calcite.tools.RuleSet; @@ -41,7 +42,7 @@ public class BeamRuleSets { private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE, BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE, - BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE) + BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE) .build(); public static RuleSet[] getRuleSets() { http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java new file mode 100644 index 0000000..4fbe7ec --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; + +/** + * {@code BeamRelNode} to replace a {@code Values} node. + * + * <p>{@code BeamValuesRel} will be used in the following SQLs: + * <ul> + * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li> + * <li>{@code select 1, '1', LOCALTIME}</li> + * </ul> + */ +public class BeamValuesRel extends Values implements BeamRelNode { + + public BeamValuesRel( + RelOptCluster cluster, + RelDataType rowType, + ImmutableList<ImmutableList<RexLiteral>> tuples, + RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + + } + + @Override public PCollection<BeamSQLRow> buildBeamPipeline( + BeamPipelineCreator planCreator) throws Exception { + List<BeamSQLRow> rows = new ArrayList<>(tuples.size()); + String stageName = BeamSQLRelUtils.getStageName(this); + if (tuples.isEmpty()) { + throw new IllegalStateException("Values with empty tuples!"); + } + + BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from(this.getRowType()); + for (ImmutableList<RexLiteral> tuple : tuples) { + BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); + for (int i = 0; i < tuple.size(); i++) { + BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); + } + rows.add(row); + } + + return planCreator.getPipeline().apply(stageName, Create.of(rows)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java new file mode 100644 index 0000000..4ea9e60 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rule; + +import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; +import org.apache.beam.dsls.sql.rel.BeamValuesRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.logical.LogicalValues; + +/** + * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}. + */ +public class BeamValuesRule extends ConverterRule { + public static final BeamValuesRule INSTANCE = new BeamValuesRule(); + private BeamValuesRule() { + super(LogicalValues.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamValuesRule"); + } + + @Override public RelNode convert(RelNode rel) { + Values values = (Values) rel; + return new BeamValuesRel( + values.getCluster(), + values.getRowType(), + values.getTuples(), + values.getTraitSet().replace(BeamLogicalConvention.INSTANCE) + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java index 7b6428e..bc75eb1 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java @@ -121,6 +121,7 @@ public class BeamSQLRow implements Serializable { } break; case VARCHAR: + case CHAR: if (!(fieldValue instanceof String)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); @@ -232,6 +233,7 @@ public class BeamSQLRow implements Serializable { return fieldValue; } case VARCHAR: + case CHAR: if (!(fieldValue instanceof String)) { throw new InvalidFieldException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 0accb9a..bfcb487 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -85,6 +85,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ longCoder.encode(value.getLong(idx), outStream, context.nested()); break; case VARCHAR: + case CHAR: stringCoder.encode(value.getString(idx), outStream, context.nested()); break; case TIMESTAMP: @@ -134,6 +135,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ record.addField(idx, longCoder.decode(inStream, context.nested())); break; case VARCHAR: + case CHAR: record.addField(idx, stringCoder.decode(inStream, context.nested())); break; case TIMESTAMP: http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java index bc622c2..c7397e1 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java @@ -21,9 +21,11 @@ package org.apache.beam.dsls.sql.schema; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; +import java.math.BigDecimal; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVPrinter; @@ -72,33 +74,50 @@ public final class BeamTableUtils { return writer.toString(); } - public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) { + public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, Object rawObj) { + if (rawObj == null) { + row.addField(idx, rawObj); + return; + } + SqlTypeName columnType = row.getDataType().getFieldsType().get(idx); - switch (columnType) { - case TINYINT: - row.addField(idx, Byte.valueOf(raw)); - break; - case SMALLINT: - row.addField(idx, Short.valueOf(raw)); - break; - case INTEGER: - row.addField(idx, Integer.valueOf(raw)); - break; - case BIGINT: - row.addField(idx, Long.valueOf(raw)); - break; - case FLOAT: - row.addField(idx, Float.valueOf(raw)); - break; - case DOUBLE: - row.addField(idx, Double.valueOf(raw)); - break; - case VARCHAR: - row.addField(idx, raw); - break; - default: - throw new BeamSqlUnsupportedException( - String.format("Column type %s is not supported yet!", columnType)); + // auto-casting for numberics + if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) + || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { + String raw = rawObj.toString(); + switch (columnType) { + case TINYINT: + row.addField(idx, Byte.valueOf(raw)); + break; + case SMALLINT: + row.addField(idx, Short.valueOf(raw)); + break; + case INTEGER: + row.addField(idx, Integer.valueOf(raw)); + break; + case BIGINT: + row.addField(idx, Long.valueOf(raw)); + break; + case FLOAT: + row.addField(idx, Float.valueOf(raw)); + break; + case DOUBLE: + row.addField(idx, Double.valueOf(raw)); + break; + default: + throw new BeamSqlUnsupportedException( + String.format("Column type %s is not supported yet!", columnType)); + } + } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) { + // convert NlsString to String + if (rawObj instanceof NlsString) { + row.addField(idx, ((NlsString) rawObj).getValue()); + } else { + row.addField(idx, rawObj); + } + } else { + // keep the origin + row.addField(idx, rawObj); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java index 79dd67f..ef4dc0f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java @@ -22,6 +22,7 @@ import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; import org.apache.beam.dsls.sql.rel.BeamProjectRel; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -57,7 +58,7 @@ public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> { outRow.updateWindowRange(inputRecord, window); for (int idx = 0; idx < results.size(); ++idx) { - outRow.addField(idx, results.get(idx)); + BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx)); } c.output(outRow); http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java index fe8a236..0d9d147 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java @@ -41,7 +41,7 @@ public class BasePlanner { public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); @BeforeClass - public static void prepare() { + public static void prepareClass() { runner.addTableMetadata("ORDER_DETAILS", getTable()); runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); runner.addTableMetadata("SUB_ORDER_RAM", getTable()); http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java index 22f1848..ffc3e01 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java @@ -21,6 +21,7 @@ import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; + import org.apache.beam.dsls.sql.BeamSQLEnvironment; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; @@ -31,6 +32,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -47,11 +49,16 @@ public class BeamPlannerAggregationSubmitTest { public final TestPipeline pipeline = TestPipeline.create(); @BeforeClass - public static void prepare() throws ParseException { + public static void prepareClass() throws ParseException { runner.addTableMetadata("ORDER_DETAILS", getOrderTable()); runner.addTableMetadata("ORDER_SUMMARY", getSummaryTable()); } + @Before + public void prepare() throws ParseException { + MockedBeamSQLTable.CONTENT.clear(); + } + private static BaseBeamTable getOrderTable() throws ParseException { final RelProtoDataType protoRowType = new RelProtoDataType() { @Override @@ -118,7 +125,7 @@ public class BeamPlannerAggregationSubmitTest { pipeline.run().waitUntilFinish(); Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); - BeamSQLRow result = MockedBeamSQLTable.CONTENT.get(0); + BeamSQLRow result = MockedBeamSQLTable.CONTENT.peek(); Assert.assertEquals(1, result.getInteger(0)); Assert.assertEquals(format.parse("2017-01-01 01:00:00"), result.getDate(1)); Assert.assertEquals(1L, result.getLong(2)); @@ -136,6 +143,6 @@ public class BeamPlannerAggregationSubmitTest { Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); Assert.assertEquals("site_id=0,agg_hour=null,size=3", - MockedBeamSQLTable.CONTENT.get(0).valueInString()); + MockedBeamSQLTable.CONTENT.peek().valueInString()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java index 17cea27..7219d11 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java @@ -21,6 +21,7 @@ import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -32,6 +33,11 @@ public class BeamPlannerSubmitTest extends BasePlanner { @Rule public final TestPipeline pipeline = TestPipeline.create(); + @Before + public void prepare() { + MockedBeamSQLTable.CONTENT.clear(); + } + @Test public void insertSelectFilter() throws Exception { String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " @@ -43,7 +49,7 @@ public class BeamPlannerSubmitTest extends BasePlanner { pipeline.run().waitUntilFinish(); Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); - Assert.assertTrue(MockedBeamSQLTable.CONTENT.get(0).valueInString() + Assert.assertTrue(MockedBeamSQLTable.CONTENT.peek().valueInString() .contains("order_id=12345,site_id=0,price=20.5,order_time=")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java index 78fd055..561f4be 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java @@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.planner; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; @@ -43,7 +44,7 @@ import org.apache.calcite.sql.type.SqlTypeName; */ public class MockedBeamSQLTable extends BaseBeamTable { - public static final List<BeamSQLRow> CONTENT = new ArrayList<>(); + public static final ConcurrentLinkedQueue<BeamSQLRow> CONTENT = new ConcurrentLinkedQueue<>(); private List<BeamSQLRow> inputRecords; @@ -142,12 +143,6 @@ public class MockedBeamSQLTable extends BaseBeamTable { @Override public PDone expand(PCollection<BeamSQLRow> input) { input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() { - - @Setup - public void setup() { - CONTENT.clear(); - } - @ProcessElement public void processElement(ProcessContext c) { CONTENT.add(c.element()); http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index 864d4b7..4935c3b 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -18,8 +18,10 @@ package org.apache.beam.dsls.sql.rel; +import java.util.Collection; import java.util.Date; -import java.util.List; +import java.util.Iterator; + import org.apache.beam.dsls.sql.BeamSQLEnvironment; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; @@ -27,6 +29,7 @@ import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -62,7 +65,6 @@ public class BeamSortRelTest { @Test public void testOrderBy_basic() throws Exception { - prepare(); String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " @@ -162,7 +164,6 @@ public class BeamSortRelTest { @Test public void testOrderBy_with_offset() throws Exception { - prepare(); String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " @@ -186,7 +187,6 @@ public class BeamSortRelTest { @Test public void testOrderBy_bigFetch() throws Exception { - prepare(); String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " @@ -216,7 +216,6 @@ public class BeamSortRelTest { @Test(expected = BeamSqlUnsupportedException.class) public void testOrderBy_exception() throws Exception { - prepare(); String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT " + " order_id, COUNT(*) " + "FROM ORDER_DETAILS " @@ -227,15 +226,20 @@ public class BeamSortRelTest { runner.compileBeamPipeline(sql, pipeline); } - public static void prepare() { + @Before + public void prepare() { runner.addTableMetadata("ORDER_DETAILS", orderDetailTable); runner.addTableMetadata("SUB_ORDER_RAM", subOrderRamTable); + MockedBeamSQLTable.CONTENT.clear(); } - private void assertEquals(List<BeamSQLRow> rows1, List<BeamSQLRow> rows2) { + private void assertEquals(Collection<BeamSQLRow> rows1, Collection<BeamSQLRow> rows2) { Assert.assertEquals(rows1.size(), rows2.size()); - for (int i = 0; i < rows1.size(); i++) { - Assert.assertEquals(rows1.get(i), rows2.get(i)); + + Iterator<BeamSQLRow> it1 = rows1.iterator(); + Iterator<BeamSQLRow> it2 = rows2.iterator(); + while (it1.hasNext()) { + Assert.assertEquals(it1.next(), it2.next()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java new file mode 100644 index 0000000..d4e1db2 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.rel; + +import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamValuesRel}. + */ +public class BeamValuesRelTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); + private static MockedBeamSQLTable stringTable = MockedBeamSQLTable + .of(SqlTypeName.VARCHAR, "name", + SqlTypeName.VARCHAR, "description"); + + private static MockedBeamSQLTable intTable = MockedBeamSQLTable + .of(SqlTypeName.INTEGER, "c0", + SqlTypeName.INTEGER, "c1"); + + @Test + public void testValues() throws Exception { + String sql = "insert into string_table(name, description) values " + + "('hello', 'world'), ('james', 'bond')"; + PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( + SqlTypeName.VARCHAR, "name", + SqlTypeName.VARCHAR, "description", + "hello", "world", + "james", "bond").getInputRecords()); + pipeline.run(); + } + + @Test + public void testValues_castInt() throws Exception { + String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; + PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( + SqlTypeName.INTEGER, "c0", + SqlTypeName.INTEGER, "c1", + 1, 2 + ).getInputRecords()); + pipeline.run(); + } + + @Test + public void testValues_onlySelect() throws Exception { + String sql = "select 1, '1'"; + PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( + SqlTypeName.INTEGER, "EXPR$0", + SqlTypeName.CHAR, "EXPR$1", + 1, "1" + ).getInputRecords()); + pipeline.run(); + } + + @BeforeClass + public static void prepareClass() { + runner.addTableMetadata("string_table", stringTable); + runner.addTableMetadata("int_table", intTable); + } + + @Before + public void prepare() { + MockedBeamSQLTable.CONTENT.clear(); + } +}
