Repository: beam Updated Branches: refs/heads/DSL_SQL f1c2b6540 -> ca8760373
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java new file mode 100644 index 0000000..9dde0f1 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests to explain queries. + * + */ +public class BeamPlannerExplainTest extends BasePlanner { + + @Test + public void selectAll() throws Exception { + String sql = "SELECT * FROM ORDER_DETAILS"; + String plan = runner.explainQuery(sql); + + String expectedPlan = + "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n" + + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; + Assert.assertEquals("explain doesn't match", expectedPlan, plan); + } + + @Test + public void selectWithFilter() throws Exception { + String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + + "WHERE SITE_ID = 0 and price > 20"; + String plan = runner.explainQuery(sql); + + String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" + + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" + + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; + Assert.assertEquals("explain doesn't match", expectedPlan, plan); + } + + @Test + public void insertSelectFilter() throws Exception { + String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " + + " order_id, site_id, price " + "FROM ORDER_DETAILS " + + "WHERE SITE_ID = 0 and price > 20"; + String plan = runner.explainQuery(sql); + + String expectedPlan = + "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n" + + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n" + + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" + + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" + + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; + Assert.assertEquals("explain doesn't match", expectedPlan, plan); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java new file mode 100644 index 0000000..d32b19b --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import org.apache.beam.sdk.Pipeline; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests to execute a query. + * + */ +public class BeamPlannerSubmitTest extends BasePlanner { + @Test + public void insertSelectFilter() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; + Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + runner.getPlanner().planner.close(); + + pipeline.run().waitUntilFinish(); + + Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); + Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", + MockedBeamSQLTable.CONTENT.get(0)); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java new file mode 100644 index 0000000..8631a6e --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelProtoDataType; + +/** + * A mock table use to check input/output. + * + */ +public class MockedBeamSQLTable extends BaseBeamTable { + + /** + * + */ + private static final long serialVersionUID = 1373168368414036932L; + + public static final List<String> CONTENT = new ArrayList<>(); + + public MockedBeamSQLTable(RelProtoDataType protoRowType) { + super(protoRowType); + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + @Override + public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { + BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType); + row1.addField(0, 12345L); + row1.addField(1, 0); + row1.addField(2, 10.5); + row1.addField(3, new Date()); + + BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType); + row2.addField(0, 12345L); + row2.addField(1, 1); + row2.addField(2, 20.5); + row2.addField(3, new Date()); + + BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType); + row3.addField(0, 12345L); + row3.addField(1, 0); + row3.addField(2, 20.5); + row3.addField(3, new Date()); + + BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType); + row4.addField(0, null); + row4.addField(1, null); + row4.addField(2, 20.5); + row4.addField(3, new Date()); + + return Create.of(row1, row2, row3); + } + + @Override + public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { + return new OutputStore(); + } + + /** + * Keep output in {@code CONTENT} for validation. + * + */ + public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, PDone> { + + @Override + public PDone expand(PCollection<BeamSQLRow> input) { + input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() { + + @Setup + public void setup() { + CONTENT.clear(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + CONTENT.add(c.element().valueInString()); + } + + @Teardown + public void close() { + + } + + })); + return PDone.in(input.getPipeline()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java deleted file mode 100644 index 56e45c4..0000000 --- a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; -import org.junit.BeforeClass; - -/** - * prepare {@code BeamSqlRunner} for test. - * - */ -public class BasePlanner { - public static BeamSqlRunner runner = new BeamSqlRunner(); - - @BeforeClass - public static void prepare() { - runner.addTable("ORDER_DETAILS", getTable()); - runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); - runner.addTable("SUB_ORDER_RAM", getTable()); - } - - private static BaseBeamTable getTable() { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); - } - }; - - return new MockedBeamSQLTable(protoRowType); - } - - public static BaseBeamTable getTable(String bootstrapServer, String topic) { - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); - } - }; - - Map<String, Object> consumerPara = new HashMap<String, Object>(); - consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - - return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) - .updateConsumerProperties(consumerPara); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java deleted file mode 100644 index a77878f..0000000 --- a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests to explain queries. - * - */ -public class BeamPlannerExplainTest extends BasePlanner { - - @Test - public void selectAll() throws Exception { - String sql = "SELECT * FROM ORDER_DETAILS"; - String plan = runner.explainQuery(sql); - - String expectedPlan = - "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n" - + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; - Assert.assertEquals("explain doesn't match", expectedPlan, plan); - } - - @Test - public void selectWithFilter() throws Exception { - String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 and price > 20"; - String plan = runner.explainQuery(sql); - - String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" - + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" - + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; - Assert.assertEquals("explain doesn't match", expectedPlan, plan); - } - - @Test - public void insertSelectFilter() throws Exception { - String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " - + " order_id, site_id, price " + "FROM ORDER_DETAILS " - + "WHERE SITE_ID = 0 and price > 20"; - String plan = runner.explainQuery(sql); - - String expectedPlan = - "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n" - + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n" - + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" - + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" - + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; - Assert.assertEquals("explain doesn't match", expectedPlan, plan); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java deleted file mode 100644 index eb097a9..0000000 --- a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import org.apache.beam.sdk.Pipeline; -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests to execute a query. - * - */ -public class BeamPlannerSubmitTest extends BasePlanner { - @Test - public void insertSelectFilter() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); - runner.getPlanner().planner.close(); - - pipeline.run().waitUntilFinish(); - - Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); - Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", MockedBeamSQLTable.CONTENT.get(0)); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java deleted file mode 100644 index 31f5578..0000000 --- a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.beam.dsls.sql.planner; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.beam.dsls.sql.schema.BaseBeamTable; -import org.beam.dsls.sql.schema.BeamIOType; -import org.beam.dsls.sql.schema.BeamSQLRow; - -/** - * A mock table use to check input/output. - * - */ -public class MockedBeamSQLTable extends BaseBeamTable { - - /** - * - */ - private static final long serialVersionUID = 1373168368414036932L; - - public static final List<String> CONTENT = new ArrayList<>(); - - public MockedBeamSQLTable(RelProtoDataType protoRowType) { - super(protoRowType); - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; - } - - @Override - public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { - BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType); - row1.addField(0, 12345L); - row1.addField(1, 0); - row1.addField(2, 10.5); - row1.addField(3, new Date()); - - BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType); - row2.addField(0, 12345L); - row2.addField(1, 1); - row2.addField(2, 20.5); - row2.addField(3, new Date()); - - BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType); - row3.addField(0, 12345L); - row3.addField(1, 0); - row3.addField(2, 20.5); - row3.addField(3, new Date()); - - BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType); - row4.addField(0, null); - row4.addField(1, null); - row4.addField(2, 20.5); - row4.addField(3, new Date()); - - return Create.of(row1, row2, row3); - } - - @Override - public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { - return new OutputStore(); - } - - /** - * Keep output in {@code CONTENT} for validation. - * - */ - public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, PDone> { - - @Override - public PDone expand(PCollection<BeamSQLRow> input) { - input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() { - - @Setup - public void setup() { - CONTENT.clear(); - } - - @ProcessElement - public void processElement(ProcessContext c) { - CONTENT.add(c.element().valueInString()); - } - - @Teardown - public void close() { - - } - - })); - return PDone.in(input.getPipeline()); - } - - } - -}
