Repository: beam Updated Branches: refs/heads/DSL_SQL 868bcbdad -> 0e08c87d7
[BEAM-2149] Improved kafka table implemention. 1. use robust CSV library to parse & print. 2. support different data types rather than just `String`. 3. a little cleanup for TextTable (to extract common methods). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8044e59e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8044e59e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8044e59e Branch: refs/heads/DSL_SQL Commit: 8044e59e1bfa175e9d686a4c332be85be22850f1 Parents: 868bcbd Author: James Xu <[email protected]> Authored: Sat May 13 01:07:01 2017 +0800 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Mon May 15 10:58:04 2017 +0200 ---------------------------------------------------------------------- .../beam/dsls/sql/schema/BeamTableUtils.java | 104 +++++++++++++++++ .../sql/schema/kafka/BeamKafkaCSVTable.java | 53 ++++----- .../schema/text/BeamTextCSVTableIOReader.java | 66 +---------- .../schema/text/BeamTextCSVTableIOWriter.java | 22 +--- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 113 +++++++++++++++++++ .../sql/schema/text/BeamTextCSVTableTest.java | 2 - 6 files changed, 245 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java new file mode 100644 index 0000000..bc622c2 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema; + +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; + +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.CSVRecord; + +/** + * Utility methods for working with {@code BeamTable}. + */ +public final class BeamTableUtils { + public static BeamSQLRow csvLine2BeamSQLRow( + CSVFormat csvFormat, + String line, + BeamSQLRecordType beamSqlRecordType) { + BeamSQLRow row = new BeamSQLRow(beamSqlRecordType); + try (StringReader reader = new StringReader(line)) { + CSVParser parser = csvFormat.parse(reader); + CSVRecord rawRecord = parser.getRecords().get(0); + + if (rawRecord.size() != beamSqlRecordType.size()) { + throw new IllegalArgumentException(String.format( + "Expect %d fields, but actually %d", line, + beamSqlRecordType.size(), rawRecord.size() + )); + } else { + for (int idx = 0; idx < beamSqlRecordType.size(); idx++) { + String raw = rawRecord.get(idx); + addFieldWithAutoTypeCasting(row, idx, raw); + } + } + } catch (IOException e) { + throw new IllegalArgumentException("decodeRecord failed!", e); + } + return row; + } + + public static String beamSQLRow2CsvLine(BeamSQLRow row, CSVFormat csvFormat) { + StringWriter writer = new StringWriter(); + try (CSVPrinter printer = csvFormat.print(writer)) { + for (int i = 0; i < row.size(); i++) { + printer.print(row.getFieldValue(i).toString()); + } + printer.println(); + } catch (IOException e) { + throw new IllegalArgumentException("encodeRecord failed!", e); + } + return writer.toString(); + } + + public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) { + SqlTypeName columnType = row.getDataType().getFieldsType().get(idx); + switch (columnType) { + case TINYINT: + row.addField(idx, Byte.valueOf(raw)); + break; + case SMALLINT: + row.addField(idx, Short.valueOf(raw)); + break; + case INTEGER: + row.addField(idx, Integer.valueOf(raw)); + break; + case BIGINT: + row.addField(idx, Long.valueOf(raw)); + break; + case FLOAT: + row.addField(idx, Float.valueOf(raw)); + break; + case DOUBLE: + row.addField(idx, Double.valueOf(raw)); + break; + case VARCHAR: + row.addField(idx, raw); + break; + default: + throw new BeamSqlUnsupportedException( + String.format("Column type %s is not supported yet!", columnType)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java index 0f40f33..127870c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java @@ -17,7 +17,11 @@ */ package org.apache.beam.dsls.sql.schema.kafka; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow; + import java.util.List; + import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.transforms.DoFn; @@ -26,33 +30,35 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.type.RelProtoDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.csv.CSVFormat; /** * A Kafka topic that saves records as CSV format. * */ public class BeamKafkaCSVTable extends BeamKafkaTable { - - public static final String DELIMITER = ","; - private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class); - + private CSVFormat csvFormat; public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers, List<String> topics) { + this(protoRowType, bootstrapServers, topics, CSVFormat.DEFAULT); + } + + public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers, + List<String> topics, CSVFormat format) { super(protoRowType, bootstrapServers, topics); + this.csvFormat = format; } @Override public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> getPTransformForInput() { - return new CsvRecorderDecoder(beamSqlRecordType); + return new CsvRecorderDecoder(beamSqlRecordType, csvFormat); } @Override public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() { - return new CsvRecorderEncoder(beamSqlRecordType); + return new CsvRecorderEncoder(beamSqlRecordType, csvFormat); } /** @@ -62,9 +68,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { public static class CsvRecorderDecoder extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> { private BeamSQLRecordType recordType; - - public CsvRecorderDecoder(BeamSQLRecordType recordType) { + private CSVFormat format; + public CsvRecorderDecoder(BeamSQLRecordType recordType, CSVFormat format) { this.recordType = recordType; + this.format = format; } @Override @@ -73,16 +80,7 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { @ProcessElement public void processElement(ProcessContext c) { String rowInString = new String(c.element().getValue()); - String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER); - if (parts.length != recordType.size()) { - LOG.error(String.format("invalid record: ", rowInString)); - } else { - BeamSQLRow sourceRecord = new BeamSQLRow(recordType); - for (int idx = 0; idx < parts.length; ++idx) { - sourceRecord.addField(idx, parts[idx]); - } - c.output(sourceRecord); - } + c.output(csvLine2BeamSQLRow(format, rowInString, recordType)); } })); } @@ -95,9 +93,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { public static class CsvRecorderEncoder extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> { private BeamSQLRecordType recordType; - - public CsvRecorderEncoder(BeamSQLRecordType recordType) { + private CSVFormat format; + public CsvRecorderEncoder(BeamSQLRecordType recordType, CSVFormat format) { this.recordType = recordType; + this.format = format; } @Override @@ -106,17 +105,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { @ProcessElement public void processElement(ProcessContext c) { BeamSQLRow in = c.element(); - StringBuffer sb = new StringBuffer(); - for (int idx = 0; idx < in.size(); ++idx) { - sb.append(DELIMITER); - sb.append(in.getFieldValue(idx).toString()); - } - c.output(KV.of(new byte[] {}, sb.substring(1).getBytes())); + c.output(KV.of(new byte[] {}, beamSQLRow2CsvLine(in, format).getBytes())); } })); - } - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java index cf7c095..3c031ce 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java @@ -18,11 +18,10 @@ package org.apache.beam.dsls.sql.schema.text; -import java.io.IOException; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow; + import java.io.Serializable; -import java.io.StringReader; -import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.io.TextIO; @@ -31,12 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * IOReader for {@code BeamTextCSVTable}. @@ -44,7 +38,6 @@ import org.slf4j.LoggerFactory; public class BeamTextCSVTableIOReader extends PTransform<PBegin, PCollection<BeamSQLRow>> implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOReader.class); private String filePattern; protected BeamSQLRecordType beamSqlRecordType; protected CSVFormat csvFormat; @@ -63,61 +56,8 @@ public class BeamTextCSVTableIOReader @ProcessElement public void processElement(ProcessContext ctx) { String str = ctx.element(); - - try (StringReader reader = new StringReader(str)) { - CSVRecord rawRecord = null; - try { - CSVParser parser = csvFormat.parse(reader); - rawRecord = parser.getRecords().get(0); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid text filePattern: " + filePattern, e); - } - - BeamSQLRow row = new BeamSQLRow(beamSqlRecordType); - if (rawRecord.size() != beamSqlRecordType.size()) { - throw new IllegalArgumentException(String.format( - "Invalid filePattern: {}, expect %d fields, but actually %d", str, - filePattern, beamSqlRecordType.size(), rawRecord.size() - )); - } else { - for (int idx = 0; idx < beamSqlRecordType.size(); idx++) { - String raw = rawRecord.get(idx); - addFieldWithAutoTypeCasting(row, idx, raw); - } - ctx.output(row); - } - } + ctx.output(csvLine2BeamSQLRow(csvFormat, str, beamSqlRecordType)); } })); } - - public void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) { - SqlTypeName columnType = row.getDataType().getFieldsType().get(idx); - switch (columnType) { - case TINYINT: - row.addField(idx, Byte.valueOf(raw)); - break; - case SMALLINT: - row.addField(idx, Short.valueOf(raw)); - break; - case INTEGER: - row.addField(idx, Integer.valueOf(raw)); - break; - case BIGINT: - row.addField(idx, Long.valueOf(raw)); - break; - case FLOAT: - row.addField(idx, Float.valueOf(raw)); - break; - case DOUBLE: - row.addField(idx, Double.valueOf(raw)); - break; - case VARCHAR: - row.addField(idx, raw); - break; - default: - throw new BeamSqlUnsupportedException(String.format( - "Column type %s is not supported yet!", columnType)); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java index 6104cd8..eade842 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -18,9 +18,9 @@ package org.apache.beam.dsls.sql.schema.text; -import java.io.IOException; +import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine; + import java.io.Serializable; -import java.io.StringWriter; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; @@ -31,17 +31,12 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * IOWriter for {@code BeamTextCSVTable}. */ public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSQLRow>, PDone> implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOWriter.class); - private String filePattern; protected BeamSQLRecordType beamSqlRecordType; protected CSVFormat csvFormat; @@ -58,18 +53,7 @@ public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSQLRow> @ProcessElement public void processElement(ProcessContext ctx) { BeamSQLRow row = ctx.element(); - StringWriter writer = new StringWriter(); - - try (CSVPrinter printer = csvFormat.print(writer)) { - for (int i = 0; i < row.size(); i++) { - printer.print(row.getFieldValue(i).toString()); - } - printer.println(); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid filePattern: " + filePattern, e); - } - - ctx.output(writer.toString()); + ctx.output(beamSQLRow2CsvLine(row, csvFormat)); } })).apply(TextIO.Write.to(filePattern)); } http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java new file mode 100644 index 0000000..d20af0c --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.schema.kafka; + +import java.io.Serializable; + +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.csv.CSVFormat; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for BeamKafkaCSVTable. + */ +public class BeamKafkaCSVTableTest { + @Rule + public TestPipeline pipeline = TestPipeline.create(); + public static BeamSQLRow row1 = new BeamSQLRow(genRowType()); + public static BeamSQLRow row2 = new BeamSQLRow(genRowType()); + + @BeforeClass + public static void setUp() { + row1.addField(0, 1L); + row1.addField(1, 1); + row1.addField(2, 1.0); + + row2.addField(0, 2L); + row2.addField(1, 2); + row2.addField(2, 2.0); + } + + @Test public void testCsvRecorderDecoder() throws Exception { + PCollection<BeamSQLRow> result = pipeline + .apply( + Create.of("1,\"1\",1.0", "2,2,2.0") + ) + .apply(ParDo.of(new String2KvBytes())) + .apply( + new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) + ); + + PAssert.that(result).containsInAnyOrder(row1, row2); + + pipeline.run(); + } + + @Test public void testCsvRecorderEncoder() throws Exception { + PCollection<BeamSQLRow> result = pipeline + .apply( + Create.of(row1, row2) + ) + .apply( + new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT) + ).apply( + new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT) + ); + + PAssert.that(result).containsInAnyOrder(row1, row2); + + pipeline.run(); + } + + private static BeamSQLRecordType genRowType() { + return BeamSQLRecordType.from( + new RelProtoDataType() { + @Override public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("order_id", SqlTypeName.BIGINT) + .add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE) + .build(); + } + }.apply(BeamQueryPlanner.TYPE_FACTORY)); + } + + private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>> + implements Serializable { + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(KV.of(new byte[] {}, ctx.element().getBytes())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java index e06f8da..3bc29e4 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java @@ -31,7 +31,6 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; @@ -75,7 +74,6 @@ public class BeamTextCSVTableTest { add(buildRow(data)); } }}; - private static ConcurrentLinkedQueue<Object[]> actualData = new ConcurrentLinkedQueue<>(); private static Path tempFolder; private static File readerSourceFile;
