Repository: beam Updated Branches: refs/heads/DSL_SQL 4c5b7584a -> 5c1f2cbc6
upgrade to version 2.1.0-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03a913a9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03a913a9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03a913a9 Branch: refs/heads/DSL_SQL Commit: 03a913a95c99474841a175b727925ba7c1eed4c9 Parents: 4c5b758 Author: mingmxu <[email protected]> Authored: Wed Jun 7 19:27:32 2017 -0700 Committer: mingmxu <[email protected]> Committed: Wed Jun 7 19:27:32 2017 -0700 ---------------------------------------------------------------------- dsls/pom.xml | 2 +- dsls/sql/pom.xml | 43 +++----------- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 59 ++++++++++---------- .../dsls/sql/schema/kafka/BeamKafkaTable.java | 20 +++++-- .../dsls/sql/schema/text/BeamTextCSVTable.java | 2 +- .../schema/text/BeamTextCSVTableIOWriter.java | 2 +- 6 files changed, 52 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/pom.xml b/dsls/pom.xml index 6f9d635..a741563 100644 --- a/dsls/pom.xml +++ b/dsls/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index bc658e6..39e32c4 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-dsls-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> </parent> <artifactId>beam-dsls-sql</artifactId> @@ -117,41 +117,6 @@ </plugins> </build> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-core</artifactId> - <version>0.6.0</version> - </dependency> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-direct-java</artifactId> - <version>0.6.0</version> - </dependency> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-kafka</artifactId> - <version>0.6.0</version> - </dependency> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-java</artifactId> - <version>0.6.0</version> - </dependency> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-common-runner-api</artifactId> - <version>0.6.0</version> - </dependency> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-construction-java</artifactId> - <version>0.6.0</version> - </dependency> - </dependencies> - </dependencyManagement> - <dependencies> <dependency> <groupId>junit</groupId> @@ -213,5 +178,11 @@ <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + <scope>provided</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index f161d27..14a0f31 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -54,9 +54,8 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> { } @Override - public void encode(BeamSQLRow value, OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - listCoder.encode(value.getNullFields(), outStream, context.nested()); + public void encode(BeamSQLRow value, OutputStream outStream) throws CoderException, IOException { + listCoder.encode(value.getNullFields(), outStream); for (int idx = 0; idx < value.size(); ++idx) { if (value.getNullFields().contains(idx)) { @@ -65,36 +64,35 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> { switch (value.getDataType().getFieldsType().get(idx)) { case INTEGER: - intCoder.encode(value.getInteger(idx), outStream, context.nested()); + intCoder.encode(value.getInteger(idx), outStream); break; case SMALLINT: - intCoder.encode((int) value.getShort(idx), outStream, context.nested()); + intCoder.encode((int) value.getShort(idx), outStream); break; case TINYINT: - intCoder.encode((int) value.getByte(idx), outStream, context.nested()); + intCoder.encode((int) value.getByte(idx), outStream); break; case DOUBLE: - doubleCoder.encode(value.getDouble(idx), outStream, context.nested()); + doubleCoder.encode(value.getDouble(idx), outStream); break; case FLOAT: - doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested()); + doubleCoder.encode((double) value.getFloat(idx), outStream); break; case DECIMAL: - bigDecimalCoder.encode(value.getBigDecimal(idx), outStream, context.nested()); + bigDecimalCoder.encode(value.getBigDecimal(idx), outStream); break; case BIGINT: - longCoder.encode(value.getLong(idx), outStream, context.nested()); + longCoder.encode(value.getLong(idx), outStream); break; case VARCHAR: case CHAR: - stringCoder.encode(value.getString(idx), outStream, context.nested()); + stringCoder.encode(value.getString(idx), outStream); break; case TIME: - longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), - outStream, context.nested()); + longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); break; case TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream, context.nested()); + longCoder.encode(value.getDate(idx).getTime(), outStream); break; default: @@ -102,14 +100,13 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> { } } - instantCoder.encode(value.getWindowStart(), outStream, context.nested()); - instantCoder.encode(value.getWindowEnd(), outStream, context); + instantCoder.encode(value.getWindowStart(), outStream); + instantCoder.encode(value.getWindowEnd(), outStream); } @Override - public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - List<Integer> nullFields = listCoder.decode(inStream, context.nested()); + public BeamSQLRow decode(InputStream inStream) throws CoderException, IOException { + List<Integer> nullFields = listCoder.decode(inStream); BeamSQLRow record = new BeamSQLRow(tableSchema); record.setNullFields(nullFields); @@ -121,37 +118,37 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> { switch (tableSchema.getFieldsType().get(idx)) { case INTEGER: - record.addField(idx, intCoder.decode(inStream, context.nested())); + record.addField(idx, intCoder.decode(inStream)); break; case SMALLINT: - record.addField(idx, intCoder.decode(inStream, context.nested()).shortValue()); + record.addField(idx, intCoder.decode(inStream).shortValue()); break; case TINYINT: - record.addField(idx, intCoder.decode(inStream, context.nested()).byteValue()); + record.addField(idx, intCoder.decode(inStream).byteValue()); break; case DOUBLE: - record.addField(idx, doubleCoder.decode(inStream, context.nested())); + record.addField(idx, doubleCoder.decode(inStream)); break; case FLOAT: - record.addField(idx, doubleCoder.decode(inStream, context.nested()).floatValue()); + record.addField(idx, doubleCoder.decode(inStream).floatValue()); break; case BIGINT: - record.addField(idx, longCoder.decode(inStream, context.nested())); + record.addField(idx, longCoder.decode(inStream)); break; case DECIMAL: - record.addField(idx, bigDecimalCoder.decode(inStream, context.nested())); + record.addField(idx, bigDecimalCoder.decode(inStream)); break; case VARCHAR: case CHAR: - record.addField(idx, stringCoder.decode(inStream, context.nested())); + record.addField(idx, stringCoder.decode(inStream)); break; case TIME: GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTime(new Date(longCoder.decode(inStream, context.nested()))); + calendar.setTime(new Date(longCoder.decode(inStream))); record.addField(idx, calendar); break; case TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream, context.nested()))); + record.addField(idx, new Date(longCoder.decode(inStream))); break; default: @@ -159,8 +156,8 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> { } } - record.setWindowStart(instantCoder.decode(inStream, context.nested())); - record.setWindowEnd(instantCoder.decode(inStream, context)); + record.setWindowStart(instantCoder.decode(inStream)); + record.setWindowEnd(instantCoder.decode(inStream)); return record; } http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java index 7342cee..aa7cf3a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -33,6 +33,8 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; /** * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to @@ -75,9 +77,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab @Override public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply("read", - KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics) - .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(ByteArrayCoder.of()).withoutMetadata()) + KafkaIO.<byte[], byte[]>read() + .withBootstrapServers(bootstrapServers) + .withTopics(topics) + .updateConsumerProperties(configUpdates) + .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withoutMetadata()) .apply("in_format", getPTransformForInput()); } @@ -90,9 +96,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab @Override public PDone expand(PCollection<BeamSQLRow> input) { return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", - KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers) - .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(ByteArrayCoder.of())); + KafkaIO.<byte[], byte[]>write() + .withBootstrapServers(bootstrapServers) + .withTopic(topics.get(0)) + .withKeySerializer(ByteArraySerializer.class) + .withValueSerializer(ByteArraySerializer.class)); } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java index 6b21289..41742c7 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java @@ -58,7 +58,7 @@ public class BeamTextCSVTable extends BeamTextTable { @Override public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply("decodeRecord", TextIO.Read.from(filePattern)) + return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) .apply("parseCSVLine", new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat)); } http://git-wip-us.apache.org/repos/asf/beam/blob/03a913a9/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java index eade842..9b9cbd2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -55,6 +55,6 @@ public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSQLRow> BeamSQLRow row = ctx.element(); ctx.output(beamSQLRow2CsvLine(row, csvFormat)); } - })).apply(TextIO.Write.to(filePattern)); + })).apply(TextIO.write().to(filePattern)); } }
