Repository: beam Updated Branches: refs/heads/DSL_SQL 25babc999 -> 127790212
[BEAM-2292] Add BeamPCollectionTable to create table from PCollection<BeamSQLRow> Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1232bf11 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1232bf11 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1232bf11 Branch: refs/heads/DSL_SQL Commit: 1232bf11f2c7585f68f8f87ded30dee6a45b4fa5 Parents: 25babc9 Author: mingmxu <[email protected]> Authored: Sun May 14 13:22:30 2017 -0700 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Sun May 21 16:55:56 2017 +0200 ---------------------------------------------------------------------- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 3 +- .../beam/dsls/sql/schema/BaseBeamTable.java | 6 +- .../dsls/sql/schema/BeamPCollectionTable.java | 62 +++++++++++++++++++ .../dsls/sql/schema/kafka/BeamKafkaTable.java | 12 +--- .../dsls/sql/schema/text/BeamTextCSVTable.java | 8 ++- .../schema/text/BeamTextCSVTableIOReader.java | 9 +-- .../dsls/sql/planner/MockedBeamSQLTable.java | 5 +- .../sql/schema/BeamPCollectionTableTest.java | 64 ++++++++++++++++++++ .../sql/schema/text/BeamTextCSVTableTest.java | 12 ++-- 9 files changed, 150 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index 61f53eb..f4d5001 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -49,8 +49,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { String stageName = BeamSQLRelUtils.getStageName(this); - PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName, - sourceTable.buildIOReader()); + PCollection<BeamSQLRow> sourceStream = sourceTable.buildIOReader(planCreator.getPipeline()); return sourceStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java index 2ecfa38..52d2bbd 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java @@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql.schema; import java.io.Serializable; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.calcite.DataContext; @@ -53,10 +53,10 @@ public abstract class BaseBeamTable implements ScannableTable, Serializable { public abstract BeamIOType getSourceType(); /** - * create a {@code IO.read()} instance to read from source. + * create a {@code PCollection<BeamSQLRow>} from source. * */ - public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader(); + public abstract PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline); /** * create a {@code IO.write()} instance to write to target. http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java new file mode 100644 index 0000000..1c3ab5b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelProtoDataType; + +/** + * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSQLRow>} as a virtual table, + * then a downstream query can query directly. + */ +public class BeamPCollectionTable extends BaseBeamTable { + private BeamIOType ioType; + private PCollection<BeamSQLRow> upstream; + + protected BeamPCollectionTable(RelProtoDataType protoRowType) { + super(protoRowType); + } + + public BeamPCollectionTable(PCollection<BeamSQLRow> upstream, RelProtoDataType protoRowType){ + this(protoRowType); + ioType = upstream.isBounded().equals(IsBounded.BOUNDED) + ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; + this.upstream = upstream; + } + + @Override + public BeamIOType getSourceType() { + return ioType; + } + + @Override + public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) { + return upstream; + } + + @Override + public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { + throw new BeamInvalidOperatorException("cannot use [BeamPCollectionTable] as target"); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java index c8c851c..7342cee 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; @@ -72,19 +73,12 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab getPTransformForOutput(); @Override - public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { - return new PTransform<PBegin, PCollection<BeamSQLRow>>() { - - @Override - public PCollection<BeamSQLRow> expand(PBegin input) { - return input.apply("read", + public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply("read", KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics) .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of()) .withValueCoder(ByteArrayCoder.of()).withoutMetadata()) .apply("in_format", getPTransformForInput()); - - } - }; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java index b9e6b81..6b21289 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java @@ -19,6 +19,8 @@ package org.apache.beam.dsls.sql.schema.text; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -55,8 +57,10 @@ public class BeamTextCSVTable extends BeamTextTable { } @Override - public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { - return new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat); + public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply("decodeRecord", TextIO.Read.from(filePattern)) + .apply("parseCSVLine", + new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java index 3c031ce..59d77c0 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java @@ -24,11 +24,9 @@ import java.io.Serializable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVFormat; @@ -36,7 +34,7 @@ import org.apache.commons.csv.CSVFormat; * IOReader for {@code BeamTextCSVTable}. */ public class BeamTextCSVTableIOReader - extends PTransform<PBegin, PCollection<BeamSQLRow>> + extends PTransform<PCollection<String>, PCollection<BeamSQLRow>> implements Serializable { private String filePattern; protected BeamSQLRecordType beamSqlRecordType; @@ -50,9 +48,8 @@ public class BeamTextCSVTableIOReader } @Override - public PCollection<BeamSQLRow> expand(PBegin input) { - return input.apply("decodeRecord", TextIO.Read.from(filePattern)) - .apply(ParDo.of(new DoFn<String, BeamSQLRow>() { + public PCollection<BeamSQLRow> expand(PCollection<String> input) { + return input.apply(ParDo.of(new DoFn<String, BeamSQLRow>() { @ProcessElement public void processElement(ProcessContext ctx) { String str = ctx.element(); http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java index 8ccb332..78fd055 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java @@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -119,8 +120,8 @@ public class MockedBeamSQLTable extends BaseBeamTable { } @Override - public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { - return Create.of(inputRecords); + public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply(Create.of(inputRecords)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java new file mode 100644 index 0000000..6f24e2a --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import org.apache.beam.dsls.sql.planner.BasePlanner; +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test case for BeamPCollectionTable. + */ +public class BeamPCollectionTableTest extends BasePlanner{ + public static TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void prepareTable(){ + RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("c1", SqlTypeName.INTEGER) + .add("c2", SqlTypeName.VARCHAR).build(); + } + }; + + BeamSQLRow row = new BeamSQLRow(BeamSQLRecordType.from( + protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY))); + row.addField(0, 1); + row.addField(1, "hello world."); + PCollection<BeamSQLRow> inputStream = PBegin.in(pipeline).apply(Create.of(row)); + runner.addTableMetadata("COLLECTION_TABLE", + new BeamPCollectionTable(inputStream, protoRowType)); + } + + @Test + public void testSelectFromPCollectionTable() throws Exception{ + String sql = "select c1, c2 from COLLECTION_TABLE"; + runner.executionPlan(sql); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/1232bf11/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java index 3bc29e4..4c403ac 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java @@ -80,22 +80,20 @@ public class BeamTextCSVTableTest { private static File writerTargetFile; @Test public void testBuildIOReader() { - PCollection<BeamSQLRow> rows = pipeline.apply( - new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader()); + PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(), + readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); PAssert.that(rows).containsInAnyOrder(testDataRows); pipeline.run(); } @Test public void testBuildIOWriter() { - // reader from a source file, then write into a target file - pipeline.apply( - new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader()) + new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) .apply(new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()) .buildIOWriter()); pipeline.run(); - PCollection<BeamSQLRow> rows = pipeline2.apply( - new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOReader()); + PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(), + writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); // confirm the two reads match PAssert.that(rows).containsInAnyOrder(testDataRows);
