http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java new file mode 100644 index 0000000..4bedec1 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema.kafka; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.csv.CSVFormat; + +/** + * A Kafka topic that saves records as CSV format. + * + */ +public class BeamKafkaCSVTable extends BeamKafkaTable { + private CSVFormat csvFormat; + public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, + List<String> topics) { + this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT); + } + + public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, + List<String> topics, CSVFormat format) { + super(beamSqlRowType, bootstrapServers, topics); + this.csvFormat = format; + } + + @Override + public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> + getPTransformForInput() { + return new CsvRecorderDecoder(beamSqlRowType, csvFormat); + } + + @Override + public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput() { + return new CsvRecorderEncoder(beamSqlRowType, csvFormat); + } + + /** + * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}. + * + */ + public static class CsvRecorderDecoder + extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> { + private BeamRecordSqlType rowType; + private CSVFormat format; + public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) { + this.rowType = rowType; + this.format = format; + } + + @Override + public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) { + return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() { + @ProcessElement + public void processElement(ProcessContext c) { + String rowInString = new String(c.element().getValue()); + c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType)); + } + })); + } + } + + /** + * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}. + * + */ + public static class CsvRecorderEncoder + extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> { + private BeamRecordSqlType rowType; + private CSVFormat format; + public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) { + this.rowType = rowType; + this.format = format; + } + + @Override + public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() { + @ProcessElement + public void processElement(ProcessContext c) { + BeamRecord in = c.element(); + c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes())); + } + })); + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java new file mode 100644 index 0000000..1113abf --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema.kafka; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +/** + * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to + * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}. + * + */ +public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { + + private String bootstrapServers; + private List<String> topics; + private Map<String, Object> configUpdates; + + protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) { + super(beamSqlRowType); + } + + public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, + List<String> topics) { + super(beamSqlRowType); + this.bootstrapServers = bootstrapServers; + this.topics = topics; + } + + public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { + this.configUpdates = configUpdates; + return this; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> + getPTransformForInput(); + + public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput(); + + @Override + public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply("read", + KafkaIO.<byte[], byte[]>read() + .withBootstrapServers(bootstrapServers) + .withTopics(topics) + .updateConsumerProperties(configUpdates) + .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withoutMetadata()) + .apply("in_format", getPTransformForInput()); + } + + @Override + public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { + checkArgument(topics != null && topics.size() == 1, + "Only one topic can be acceptable as output."); + + return new PTransform<PCollection<BeamRecord>, PDone>() { + @Override + public PDone expand(PCollection<BeamRecord> input) { + return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", + KafkaIO.<byte[], byte[]>write() + .withBootstrapServers(bootstrapServers) + .withTopic(topics.get(0)) + .withKeySerializer(ByteArraySerializer.class) + .withValueSerializer(ByteArraySerializer.class)); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java new file mode 100644 index 0000000..6752e3c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * table schema for KafkaIO. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema.kafka; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java new file mode 100644 index 0000000..86e7d06 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * define table schema, to map with Beam IO components. + * + */ +package org.apache.beam.sdk.extensions.sql.impl.schema; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java new file mode 100644 index 0000000..a2dd6fb --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema.text; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.commons.csv.CSVFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV. + * + * <p> + * {@link CSVFormat} itself has many dialects, check its javadoc for more info. + * </p> + */ +public class BeamTextCSVTable extends BeamTextTable { + private static final Logger LOG = LoggerFactory + .getLogger(BeamTextCSVTable.class); + + private CSVFormat csvFormat; + + /** + * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. + */ + public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern) { + this(beamSqlRowType, filePattern, CSVFormat.DEFAULT); + } + + public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern, + CSVFormat csvFormat) { + super(beamSqlRowType, filePattern); + this.csvFormat = csvFormat; + } + + @Override + public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) + .apply("parseCSVLine", + new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat)); + } + + @Override + public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { + return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java new file mode 100644 index 0000000..95f7063 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema.text; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.csv.CSVFormat; + +/** + * IOReader for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableIOReader + extends PTransform<PCollection<String>, PCollection<BeamRecord>> + implements Serializable { + private String filePattern; + protected BeamRecordSqlType beamSqlRowType; + protected CSVFormat csvFormat; + + public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern, + CSVFormat csvFormat) { + this.filePattern = filePattern; + this.beamSqlRowType = beamSqlRowType; + this.csvFormat = csvFormat; + } + + @Override + public PCollection<BeamRecord> expand(PCollection<String> input) { + return input.apply(ParDo.of(new DoFn<String, BeamRecord>() { + @ProcessElement + public void processElement(ProcessContext ctx) { + String str = ctx.element(); + ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType)); + } + })); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java new file mode 100644 index 0000000..4660ccb --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema.text; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.commons.csv.CSVFormat; + +/** + * IOWriter for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone> + implements Serializable { + private String filePattern; + protected BeamRecordSqlType beamSqlRowType; + protected CSVFormat csvFormat; + + public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern, + CSVFormat csvFormat) { + this.filePattern = filePattern; + this.beamSqlRowType = beamSqlRowType; + this.csvFormat = csvFormat; + } + + @Override public PDone expand(PCollection<BeamRecord> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() { + + @ProcessElement public void processElement(ProcessContext ctx) { + BeamRecord row = ctx.element(); + ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat)); + } + })).apply(TextIO.write().to(filePattern)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java new file mode 100644 index 0000000..b0d9c11 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.schema.text; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType; + +/** + * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). + */ +public abstract class BeamTextTable extends BaseBeamTable implements Serializable { + protected String filePattern; + + protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) { + super(beamSqlRowType); + this.filePattern = filePattern; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.BOUNDED; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java new file mode 100644 index 0000000..8927dca --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Table schema for text files. + */ +package org.apache.beam.sdk.extensions.sql.impl.schema.text; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java index 40b7b58..9a50e21 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java @@ -33,11 +33,11 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index 7a8d10d..3c6b20f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -22,8 +22,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.BeamRecord; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java index aac38c7..719fbf3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java @@ -19,10 +19,10 @@ package org.apache.beam.sdk.extensions.sql.impl.transform; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index 8b6206b..8c44780 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; +import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java deleted file mode 100644 index 0564820..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.io.Serializable; - -/** - * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. - */ -public abstract class BaseBeamTable implements BeamSqlTable, Serializable { - protected BeamRecordSqlType beamSqlRowType; - public BaseBeamTable(BeamRecordSqlType beamSqlRowType) { - this.beamSqlRowType = beamSqlRowType; - } - - @Override public BeamRecordSqlType getRowType() { - return beamSqlRowType; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java deleted file mode 100644 index bda3ca1..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.io.Serializable; - -/** - * Type as a source IO, determined whether it's a STREAMING process, or batch - * process. - */ -public enum BeamIOType implements Serializable { - BOUNDED, UNBOUNDED; -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java deleted file mode 100644 index 9d9988e..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PDone; - -/** - * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table, - * then a downstream query can query directly. - */ -public class BeamPCollectionTable extends BaseBeamTable { - private BeamIOType ioType; - private transient PCollection<BeamRecord> upstream; - - protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) { - super(beamSqlRowType); - } - - public BeamPCollectionTable(PCollection<BeamRecord> upstream, - BeamRecordSqlType beamSqlRowType){ - this(beamSqlRowType); - ioType = upstream.isBounded().equals(IsBounded.BOUNDED) - ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; - this.upstream = upstream; - } - - @Override - public BeamIOType getSourceType() { - return ioType; - } - - @Override - public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { - return upstream; - } - - @Override - public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { - throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java deleted file mode 100644 index 1845988..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.math.BigDecimal; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.BigDecimalCoder; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.ByteCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.BeamRecordType; - -/** - * Type provider for {@link BeamRecord} with SQL types. - * - * <p>Limited SQL types are supported now, visit - * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a> - * for more details. - * - */ -public class BeamRecordSqlType extends BeamRecordType { - private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); - static { - SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); - - SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); - SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); - } - - public List<Integer> fieldTypes; - - protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) { - super(fieldsName, fieldsCoder); - } - - private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes - , List<Coder> fieldsCoder) { - super(fieldsName, fieldsCoder); - this.fieldTypes = fieldTypes; - } - - public static BeamRecordSqlType create(List<String> fieldNames, - List<Integer> fieldTypes) { - if (fieldNames.size() != fieldTypes.size()) { - throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match."); - } - List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size()); - for (int idx = 0; idx < fieldTypes.size(); ++idx) { - switch (fieldTypes.get(idx)) { - case Types.INTEGER: - fieldCoders.add(BigEndianIntegerCoder.of()); - break; - case Types.SMALLINT: - fieldCoders.add(ShortCoder.of()); - break; - case Types.TINYINT: - fieldCoders.add(ByteCoder.of()); - break; - case Types.DOUBLE: - fieldCoders.add(DoubleCoder.of()); - break; - case Types.FLOAT: - fieldCoders.add(FloatCoder.of()); - break; - case Types.DECIMAL: - fieldCoders.add(BigDecimalCoder.of()); - break; - case Types.BIGINT: - fieldCoders.add(BigEndianLongCoder.of()); - break; - case Types.VARCHAR: - case Types.CHAR: - fieldCoders.add(StringUtf8Coder.of()); - break; - case Types.TIME: - fieldCoders.add(TimeCoder.of()); - break; - case Types.DATE: - case Types.TIMESTAMP: - fieldCoders.add(DateCoder.of()); - break; - case Types.BOOLEAN: - fieldCoders.add(BooleanCoder.of()); - break; - - default: - throw new UnsupportedOperationException( - "Data type: " + fieldTypes.get(idx) + " not supported yet!"); - } - } - return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders); - } - - @Override - public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException { - if (null == fieldValue) {// no need to do type check for NULL value - return; - } - - int fieldType = fieldTypes.get(index); - Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType); - if (javaClazz == null) { - throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!"); - } - - if (!fieldValue.getClass().equals(javaClazz)) { - throw new IllegalArgumentException( - String.format("[%s](%s) doesn't match type [%s]", - fieldValue, fieldValue.getClass(), fieldType) - ); - } - } - - public List<Integer> getFieldTypes() { - return fieldTypes; - } - - public Integer getFieldTypeByIndex(int index){ - return fieldTypes.get(index); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof BeamRecordSqlType) { - BeamRecordSqlType ins = (BeamRecordSqlType) obj; - return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames()); - } else { - return false; - } - } - - @Override - public int hashCode() { - return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode(); - } - - @Override - public String toString() { - return "BeamRecordSqlType [fieldNames=" + getFieldNames() - + ", fieldTypes=" + fieldTypes + "]"; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java deleted file mode 100644 index 89eefd1..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.math.BigDecimal; -import java.util.Date; -import java.util.GregorianCalendar; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.BigDecimalCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.values.BeamRecord; - -/** - * A {@link Coder} encodes {@link BeamRecord}. - */ -@Experimental -public class BeamSqlRecordHelper { - - public static BeamRecordSqlType getSqlRecordType(BeamRecord record) { - return (BeamRecordSqlType) record.getDataType(); - } - - /** - * {@link Coder} for Java type {@link Short}. - */ - public static class ShortCoder extends CustomCoder<Short> { - private static final ShortCoder INSTANCE = new ShortCoder(); - - public static ShortCoder of() { - return INSTANCE; - } - - private ShortCoder() { - } - - @Override - public void encode(Short value, OutputStream outStream) throws CoderException, IOException { - new DataOutputStream(outStream).writeShort(value); - } - - @Override - public Short decode(InputStream inStream) throws CoderException, IOException { - return new DataInputStream(inStream).readShort(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - } - } - /** - * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}. - */ - public static class FloatCoder extends CustomCoder<Float> { - private static final FloatCoder INSTANCE = new FloatCoder(); - private static final BigDecimalCoder CODER = BigDecimalCoder.of(); - - public static FloatCoder of() { - return INSTANCE; - } - - private FloatCoder() { - } - - @Override - public void encode(Float value, OutputStream outStream) throws CoderException, IOException { - CODER.encode(new BigDecimal(value), outStream); - } - - @Override - public Float decode(InputStream inStream) throws CoderException, IOException { - return CODER.decode(inStream).floatValue(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - } - } - /** - * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}. - */ - public static class DoubleCoder extends CustomCoder<Double> { - private static final DoubleCoder INSTANCE = new DoubleCoder(); - private static final BigDecimalCoder CODER = BigDecimalCoder.of(); - - public static DoubleCoder of() { - return INSTANCE; - } - - private DoubleCoder() { - } - - @Override - public void encode(Double value, OutputStream outStream) throws CoderException, IOException { - CODER.encode(new BigDecimal(value), outStream); - } - - @Override - public Double decode(InputStream inStream) throws CoderException, IOException { - return CODER.decode(inStream).doubleValue(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - } - } - - /** - * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}. - */ - public static class TimeCoder extends CustomCoder<GregorianCalendar> { - private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); - private static final TimeCoder INSTANCE = new TimeCoder(); - - public static TimeCoder of() { - return INSTANCE; - } - - private TimeCoder() { - } - - @Override - public void encode(GregorianCalendar value, OutputStream outStream) - throws CoderException, IOException { - longCoder.encode(value.getTime().getTime(), outStream); - } - - @Override - public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException { - GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTime(new Date(longCoder.decode(inStream))); - return calendar; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - } - } - /** - * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}. - */ - public static class DateCoder extends CustomCoder<Date> { - private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); - private static final DateCoder INSTANCE = new DateCoder(); - - public static DateCoder of() { - return INSTANCE; - } - - private DateCoder() { - } - - @Override - public void encode(Date value, OutputStream outStream) throws CoderException, IOException { - longCoder.encode(value.getTime(), outStream); - } - - @Override - public Date decode(InputStream inStream) throws CoderException, IOException { - return new Date(longCoder.decode(inStream)); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - } - } - - /** - * {@link Coder} for Java type {@link Boolean}. - */ - public static class BooleanCoder extends CustomCoder<Boolean> { - private static final BooleanCoder INSTANCE = new BooleanCoder(); - - public static BooleanCoder of() { - return INSTANCE; - } - - private BooleanCoder() { - } - - @Override - public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException { - new DataOutputStream(outStream).writeBoolean(value); - } - - @Override - public Boolean decode(InputStream inStream) throws CoderException, IOException { - return new DataInputStream(inStream).readBoolean(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java deleted file mode 100644 index 828ac43..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -/** - * This interface defines a Beam Sql Table. - */ -public interface BeamSqlTable { - /** - * In Beam SQL, there's no difference between a batch query and a streaming - * query. {@link BeamIOType} is used to validate the sources. - */ - BeamIOType getSourceType(); - - /** - * create a {@code PCollection<BeamSqlRow>} from source. - * - */ - PCollection<BeamRecord> buildIOReader(Pipeline pipeline); - - /** - * create a {@code IO.write()} instance to write to target. - * - */ - PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter(); - - /** - * Get the schema info of the table. - */ - BeamRecordSqlType getRowType(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java deleted file mode 100644 index 191b78e..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema; - -import java.io.Serializable; - -/** - * Interface to create a UDF in Beam SQL. - * - * <p>A static method {@code eval} is required. Here is an example: - * - * <blockquote><pre> - * public static class MyLeftFunction { - * public String eval( - * @Parameter(name = "s") String s, - * @Parameter(name = "n", optional = true) Integer n) { - * return s.substring(0, n == null ? 1 : n); - * } - * }</pre></blockquote> - * - * <p>The first parameter is named "s" and is mandatory, - * and the second parameter is named "n" and is optional. - */ -public interface BeamSqlUdf extends Serializable { - String UDF_METHOD = "eval"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java deleted file mode 100644 index 687a082..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema; - -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.util.NlsString; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVPrinter; -import org.apache.commons.csv.CSVRecord; - -/** - * Utility methods for working with {@code BeamTable}. - */ -public final class BeamTableUtils { - public static BeamRecord csvLine2BeamSqlRow( - CSVFormat csvFormat, - String line, - BeamRecordSqlType beamRecordSqlType) { - List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount()); - try (StringReader reader = new StringReader(line)) { - CSVParser parser = csvFormat.parse(reader); - CSVRecord rawRecord = parser.getRecords().get(0); - - if (rawRecord.size() != beamRecordSqlType.getFieldCount()) { - throw new IllegalArgumentException(String.format( - "Expect %d fields, but actually %d", - beamRecordSqlType.getFieldCount(), rawRecord.size() - )); - } else { - for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) { - String raw = rawRecord.get(idx); - fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw)); - } - } - } catch (IOException e) { - throw new IllegalArgumentException("decodeRecord failed!", e); - } - return new BeamRecord(beamRecordSqlType, fieldsValue); - } - - public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) { - StringWriter writer = new StringWriter(); - try (CSVPrinter printer = csvFormat.print(writer)) { - for (int i = 0; i < row.getFieldCount(); i++) { - printer.print(row.getFieldValue(i).toString()); - } - printer.println(); - } catch (IOException e) { - throw new IllegalArgumentException("encodeRecord failed!", e); - } - return writer.toString(); - } - - public static Object autoCastField(int fieldType, Object rawObj) { - if (rawObj == null) { - return null; - } - - SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType); - // auto-casting for numberics - if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) - || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { - String raw = rawObj.toString(); - switch (columnType) { - case TINYINT: - return Byte.valueOf(raw); - case SMALLINT: - return Short.valueOf(raw); - case INTEGER: - return Integer.valueOf(raw); - case BIGINT: - return Long.valueOf(raw); - case FLOAT: - return Float.valueOf(raw); - case DOUBLE: - return Double.valueOf(raw); - default: - throw new UnsupportedOperationException( - String.format("Column type %s is not supported yet!", columnType)); - } - } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) { - // convert NlsString to String - if (rawObj instanceof NlsString) { - return ((NlsString) rawObj).getValue(); - } else { - return rawObj; - } - } else { - return rawObj; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java deleted file mode 100644 index 8c7e6f0..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema.kafka; - -import java.util.List; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.commons.csv.CSVFormat; - -/** - * A Kafka topic that saves records as CSV format. - * - */ -public class BeamKafkaCSVTable extends BeamKafkaTable { - private CSVFormat csvFormat; - public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, - List<String> topics) { - this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT); - } - - public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, - List<String> topics, CSVFormat format) { - super(beamSqlRowType, bootstrapServers, topics); - this.csvFormat = format; - } - - @Override - public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> - getPTransformForInput() { - return new CsvRecorderDecoder(beamSqlRowType, csvFormat); - } - - @Override - public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> - getPTransformForOutput() { - return new CsvRecorderEncoder(beamSqlRowType, csvFormat); - } - - /** - * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}. - * - */ - public static class CsvRecorderDecoder - extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> { - private BeamRecordSqlType rowType; - private CSVFormat format; - public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) { - this.rowType = rowType; - this.format = format; - } - - @Override - public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) { - return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() { - @ProcessElement - public void processElement(ProcessContext c) { - String rowInString = new String(c.element().getValue()); - c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType)); - } - })); - } - } - - /** - * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}. - * - */ - public static class CsvRecorderEncoder - extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> { - private BeamRecordSqlType rowType; - private CSVFormat format; - public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) { - this.rowType = rowType; - this.format = format; - } - - @Override - public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) { - return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() { - @ProcessElement - public void processElement(ProcessContext c) { - BeamRecord in = c.element(); - c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes())); - } - })); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java deleted file mode 100644 index 1d57839..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.schema.kafka; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; - -/** - * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to - * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}. - * - */ -public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { - - private String bootstrapServers; - private List<String> topics; - private Map<String, Object> configUpdates; - - protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) { - super(beamSqlRowType); - } - - public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, - List<String> topics) { - super(beamSqlRowType); - this.bootstrapServers = bootstrapServers; - this.topics = topics; - } - - public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { - this.configUpdates = configUpdates; - return this; - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; - } - - public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> - getPTransformForInput(); - - public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> - getPTransformForOutput(); - - @Override - public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply("read", - KafkaIO.<byte[], byte[]>read() - .withBootstrapServers(bootstrapServers) - .withTopics(topics) - .updateConsumerProperties(configUpdates) - .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withoutMetadata()) - .apply("in_format", getPTransformForInput()); - } - - @Override - public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { - checkArgument(topics != null && topics.size() == 1, - "Only one topic can be acceptable as output."); - - return new PTransform<PCollection<BeamRecord>, PDone>() { - @Override - public PDone expand(PCollection<BeamRecord> input) { - return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", - KafkaIO.<byte[], byte[]>write() - .withBootstrapServers(bootstrapServers) - .withTopic(topics.get(0)) - .withKeySerializer(ByteArraySerializer.class) - .withValueSerializer(ByteArraySerializer.class)); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java deleted file mode 100644 index f0ddeb6..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * table schema for KafkaIO. - */ -package org.apache.beam.sdk.extensions.sql.schema.kafka; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java deleted file mode 100644 index 9655ebd..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * define table schema, to map with Beam IO components. - * - */ -package org.apache.beam.sdk.extensions.sql.schema; http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java deleted file mode 100644 index 79e56e6..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema.text; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.commons.csv.CSVFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV. - * - * <p> - * {@link CSVFormat} itself has many dialects, check its javadoc for more info. - * </p> - */ -public class BeamTextCSVTable extends BeamTextTable { - private static final Logger LOG = LoggerFactory - .getLogger(BeamTextCSVTable.class); - - private CSVFormat csvFormat; - - /** - * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. - */ - public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern) { - this(beamSqlRowType, filePattern, CSVFormat.DEFAULT); - } - - public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern, - CSVFormat csvFormat) { - super(beamSqlRowType, filePattern); - this.csvFormat = csvFormat; - } - - @Override - public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) - .apply("parseCSVLine", - new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat)); - } - - @Override - public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() { - return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java deleted file mode 100644 index 018dae5..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema.text; - -import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PCollection; -import org.apache.commons.csv.CSVFormat; - -/** - * IOReader for {@code BeamTextCSVTable}. - */ -public class BeamTextCSVTableIOReader - extends PTransform<PCollection<String>, PCollection<BeamRecord>> - implements Serializable { - private String filePattern; - protected BeamRecordSqlType beamSqlRowType; - protected CSVFormat csvFormat; - - public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern, - CSVFormat csvFormat) { - this.filePattern = filePattern; - this.beamSqlRowType = beamSqlRowType; - this.csvFormat = csvFormat; - } - - @Override - public PCollection<BeamRecord> expand(PCollection<String> input) { - return input.apply(ParDo.of(new DoFn<String, BeamRecord>() { - @ProcessElement - public void processElement(ProcessContext ctx) { - String str = ctx.element(); - ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType)); - } - })); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java deleted file mode 100644 index 53eb382..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema.text; - -import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.BeamRecord; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.commons.csv.CSVFormat; - -/** - * IOWriter for {@code BeamTextCSVTable}. - */ -public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone> - implements Serializable { - private String filePattern; - protected BeamRecordSqlType beamSqlRowType; - protected CSVFormat csvFormat; - - public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern, - CSVFormat csvFormat) { - this.filePattern = filePattern; - this.beamSqlRowType = beamSqlRowType; - this.csvFormat = csvFormat; - } - - @Override public PDone expand(PCollection<BeamRecord> input) { - return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() { - - @ProcessElement public void processElement(ProcessContext ctx) { - BeamRecord row = ctx.element(); - ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat)); - } - })).apply(TextIO.write().to(filePattern)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java deleted file mode 100644 index 80e81aa..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.schema.text; - -import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; -import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; - -/** - * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). - */ -public abstract class BeamTextTable extends BaseBeamTable implements Serializable { - protected String filePattern; - - protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) { - super(beamSqlRowType); - this.filePattern = filePattern; - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.BOUNDED; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java deleted file mode 100644 index f914e2e..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Table schema for text files. - */ -package org.apache.beam.sdk.extensions.sql.schema.text;
