Repository: beam Updated Branches: refs/heads/DSL_SQL 3e678a75a -> 7e918a76e
[BEAM-2079] Support TextIO as SQL source/sink Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59abc764 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59abc764 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59abc764 Branch: refs/heads/DSL_SQL Commit: 59abc7640866f58427817c026af15df30947f3a6 Parents: 3e678a7 Author: James Xu <xumingmi...@gmail.com> Authored: Thu May 4 11:44:14 2017 +0800 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Wed May 10 07:24:01 2017 +0200 ---------------------------------------------------------------------- dsls/sql/pom.xml | 5 + .../dsls/sql/schema/text/BeamTextCSVTable.java | 66 +++++++ .../schema/text/BeamTextCSVTableIOReader.java | 123 +++++++++++++ .../schema/text/BeamTextCSVTableIOWriter.java | 76 ++++++++ .../dsls/sql/schema/text/BeamTextTable.java | 45 +++++ .../beam/dsls/sql/schema/text/package-info.java | 22 +++ .../sql/schema/text/BeamTextCSVTableTest.java | 179 +++++++++++++++++++ 7 files changed, 516 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index 6139ada..15692e9 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -204,5 +204,10 @@ <artifactId>hamcrest-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>1.4</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java new file mode 100644 index 0000000..b9e6b81 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema.text; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.commons.csv.CSVFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV. + * + * <p> + * {@link CSVFormat} itself has many dialects, check its javadoc for more info. + * </p> + */ +public class BeamTextCSVTable extends BeamTextTable { + private static final Logger LOG = LoggerFactory + .getLogger(BeamTextCSVTable.class); + + private CSVFormat csvFormat; + + /** + * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. + */ + public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern) { + this(protoDataType, filePattern, CSVFormat.DEFAULT); + } + + public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern, + CSVFormat csvFormat) { + super(protoDataType, filePattern); + this.csvFormat = csvFormat; + } + + @Override + public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() { + return new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat); + } + + @Override + public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { + return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, csvFormat); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java new file mode 100644 index 0000000..cf7c095 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema.text; + +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; + +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * IOReader for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableIOReader + extends PTransform<PBegin, PCollection<BeamSQLRow>> + implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOReader.class); + private String filePattern; + protected BeamSQLRecordType beamSqlRecordType; + protected CSVFormat csvFormat; + + public BeamTextCSVTableIOReader(BeamSQLRecordType beamSqlRecordType, String filePattern, + CSVFormat csvFormat) { + this.filePattern = filePattern; + this.beamSqlRecordType = beamSqlRecordType; + this.csvFormat = csvFormat; + } + + @Override + public PCollection<BeamSQLRow> expand(PBegin input) { + return input.apply("decodeRecord", TextIO.Read.from(filePattern)) + .apply(ParDo.of(new DoFn<String, BeamSQLRow>() { + @ProcessElement + public void processElement(ProcessContext ctx) { + String str = ctx.element(); + + try (StringReader reader = new StringReader(str)) { + CSVRecord rawRecord = null; + try { + CSVParser parser = csvFormat.parse(reader); + rawRecord = parser.getRecords().get(0); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid text filePattern: " + filePattern, e); + } + + BeamSQLRow row = new BeamSQLRow(beamSqlRecordType); + if (rawRecord.size() != beamSqlRecordType.size()) { + throw new IllegalArgumentException(String.format( + "Invalid filePattern: {}, expect %d fields, but actually %d", str, + filePattern, beamSqlRecordType.size(), rawRecord.size() + )); + } else { + for (int idx = 0; idx < beamSqlRecordType.size(); idx++) { + String raw = rawRecord.get(idx); + addFieldWithAutoTypeCasting(row, idx, raw); + } + ctx.output(row); + } + } + } + })); + } + + public void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) { + SqlTypeName columnType = row.getDataType().getFieldsType().get(idx); + switch (columnType) { + case TINYINT: + row.addField(idx, Byte.valueOf(raw)); + break; + case SMALLINT: + row.addField(idx, Short.valueOf(raw)); + break; + case INTEGER: + row.addField(idx, Integer.valueOf(raw)); + break; + case BIGINT: + row.addField(idx, Long.valueOf(raw)); + break; + case FLOAT: + row.addField(idx, Float.valueOf(raw)); + break; + case DOUBLE: + row.addField(idx, Double.valueOf(raw)); + break; + case VARCHAR: + row.addField(idx, raw); + break; + default: + throw new BeamSqlUnsupportedException(String.format( + "Column type %s is not supported yet!", columnType)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java new file mode 100644 index 0000000..6104cd8 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema.text; + +import java.io.IOException; +import java.io.Serializable; +import java.io.StringWriter; + +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * IOWriter for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSQLRow>, PDone> + implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOWriter.class); + + private String filePattern; + protected BeamSQLRecordType beamSqlRecordType; + protected CSVFormat csvFormat; + + public BeamTextCSVTableIOWriter(BeamSQLRecordType beamSqlRecordType, String filePattern, + CSVFormat csvFormat) { + this.filePattern = filePattern; + this.beamSqlRecordType = beamSqlRecordType; + this.csvFormat = csvFormat; + } + + @Override public PDone expand(PCollection<BeamSQLRow> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, String>() { + + @ProcessElement public void processElement(ProcessContext ctx) { + BeamSQLRow row = ctx.element(); + StringWriter writer = new StringWriter(); + + try (CSVPrinter printer = csvFormat.print(writer)) { + for (int i = 0; i < row.size(); i++) { + printer.print(row.getFieldValue(i).toString()); + } + printer.println(); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid filePattern: " + filePattern, e); + } + + ctx.output(writer.toString()); + } + })).apply(TextIO.Write.to(filePattern)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java new file mode 100644 index 0000000..3353761 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema.text; + +import java.io.Serializable; + +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.calcite.rel.type.RelProtoDataType; + +/** + * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). + */ +public abstract class BeamTextTable extends BaseBeamTable implements Serializable { + protected String filePattern; + protected BeamTextTable(RelProtoDataType protoRowType) { + super(protoRowType); + } + + protected BeamTextTable(RelProtoDataType protoDataType, String filePattern) { + super(protoDataType); + this.filePattern = filePattern; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.BOUNDED; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java new file mode 100644 index 0000000..f48f2fe --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Table schema for text files. + */ +package org.apache.beam.dsls.sql.schema.text; http://git-wip-us.apache.org/repos/asf/beam/blob/59abc764/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java new file mode 100644 index 0000000..e06f8da --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema.text; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableTest { + + @Rule public TestPipeline pipeline = TestPipeline.create(); + @Rule public TestPipeline pipeline2 = TestPipeline.create(); + + /** + * testData. + * + * <p> + * The types of the csv fields are: + * integer,bigint,float,double,string + * </p> + */ + private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" }; + private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" }; + + private static List<Object[]> testData = Arrays.asList(data1, data2); + private static List<BeamSQLRow> testDataRows = new ArrayList<BeamSQLRow>() {{ + for (Object[] data : testData) { + add(buildRow(data)); + } + }}; + private static ConcurrentLinkedQueue<Object[]> actualData = new ConcurrentLinkedQueue<>(); + + private static Path tempFolder; + private static File readerSourceFile; + private static File writerTargetFile; + + @Test public void testBuildIOReader() { + PCollection<BeamSQLRow> rows = pipeline.apply( + new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader()); + PAssert.that(rows).containsInAnyOrder(testDataRows); + pipeline.run(); + } + + @Test public void testBuildIOWriter() { + // reader from a source file, then write into a target file + pipeline.apply( + new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader()) + .apply(new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()) + .buildIOWriter()); + pipeline.run(); + + PCollection<BeamSQLRow> rows = pipeline2.apply( + new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOReader()); + + // confirm the two reads match + PAssert.that(rows).containsInAnyOrder(testDataRows); + pipeline2.run(); + } + + @BeforeClass public static void setUp() throws IOException { + tempFolder = Files.createTempDirectory("BeamTextTableTest"); + readerSourceFile = writeToFile(testData, "readerSourceFile.txt"); + writerTargetFile = writeToFile(testData, "writerTargetFile.txt"); + } + + @AfterClass public static void teardownClass() throws IOException { + Files.walkFileTree(tempFolder, new SimpleFileVisitor<Path>() { + + @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) + throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + } + + private static File writeToFile(List<Object[]> rows, String filename) throws IOException { + File file = tempFolder.resolve(filename).toFile(); + OutputStream output = new FileOutputStream(file); + writeToStreamAndClose(rows, output); + return file; + } + + /** + * Helper that writes the given lines (adding a newline in between) to a stream, then closes the + * stream. + */ + private static void writeToStreamAndClose(List<Object[]> rows, OutputStream outputStream) { + try (PrintStream writer = new PrintStream(outputStream)) { + CSVPrinter printer = CSVFormat.DEFAULT.print(writer); + for (Object[] row : rows) { + for (Object field : row) { + printer.print(field); + } + printer.println(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private RelProtoDataType buildRowType() { + return new RelProtoDataType() { + + @Override public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder().add("id", SqlTypeName.INTEGER).add("order_id", SqlTypeName.BIGINT) + .add("price", SqlTypeName.FLOAT).add("amount", SqlTypeName.DOUBLE) + .add("user_name", SqlTypeName.VARCHAR).build(); + } + }; + } + + private static RelDataType buildRelDataType() { + return BeamQueryPlanner.TYPE_FACTORY.builder().add("id", SqlTypeName.INTEGER) + .add("order_id", SqlTypeName.BIGINT).add("price", SqlTypeName.FLOAT) + .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); + } + + private static BeamSQLRecordType buildBeamSQLRecordType() { + return BeamSQLRecordType.from(buildRelDataType()); + } + + private static BeamSQLRow buildRow(Object[] data) { + return new BeamSQLRow(buildBeamSQLRecordType(), Arrays.asList(data)); + } +}