http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java deleted file mode 100644 index 9582ffa..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.Serializable; -import java.lang.reflect.ParameterizedType; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * abstract class of aggregation functions in Beam SQL. - * - * <p>There're several constrains for a UDAF:<br> - * 1. A constructor with an empty argument list is required;<br> - * 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double - * /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT - * /TIMESTAMP/DECIMAL;<br> - * 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br> - */ -public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable { - public BeamSqlUdaf(){} - - /** - * create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}. - */ - public abstract AccumT init(); - - /** - * add an input value, equals to {@link CombineFn#addInput(Object, Object)}. - */ - public abstract AccumT add(AccumT accumulator, InputT input); - - /** - * merge aggregation objects from parallel tasks, equals to - * {@link CombineFn#mergeAccumulators(Iterable)}. - */ - public abstract AccumT merge(Iterable<AccumT> accumulators); - - /** - * extract output value from aggregation object, equals to - * {@link CombineFn#extractOutput(Object)}. - */ - public abstract OutputT result(AccumT accumulator); - - /** - * get the coder for AccumT which stores the intermediate result. - * By default it's fetched from {@link CoderRegistry}. - */ - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) - throws CannotProvideCoderException { - return registry.getCoder( - (Class<AccumT>) ((ParameterizedType) getClass() - .getGenericSuperclass()).getActualTypeArguments()[1]); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java deleted file mode 100644 index 2066353..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.Serializable; - -/** - * Interface to create a UDF in Beam SQL. - * - * <p>A static method {@code eval} is required. Here is an example: - * - * <blockquote><pre> - * public static class MyLeftFunction { - * public String eval( - * @Parameter(name = "s") String s, - * @Parameter(name = "n", optional = true) Integer n) { - * return s.substring(0, n == null ? 1 : n); - * } - * }</pre></blockquote> - * - * <p>The first parameter is named "s" and is mandatory, - * and the second parameter is named "n" and is optional. - */ -public interface BeamSqlUdf extends Serializable { - String UDF_METHOD = "eval"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java deleted file mode 100644 index 4b7e76b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema; - -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.math.BigDecimal; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.util.NlsString; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVPrinter; -import org.apache.commons.csv.CSVRecord; - -/** - * Utility methods for working with {@code BeamTable}. - */ -public final class BeamTableUtils { - public static BeamSqlRow csvLine2BeamSqlRow( - CSVFormat csvFormat, - String line, - BeamSqlRowType beamSqlRowType) { - BeamSqlRow row = new BeamSqlRow(beamSqlRowType); - try (StringReader reader = new StringReader(line)) { - CSVParser parser = csvFormat.parse(reader); - CSVRecord rawRecord = parser.getRecords().get(0); - - if (rawRecord.size() != beamSqlRowType.size()) { - throw new IllegalArgumentException(String.format( - "Expect %d fields, but actually %d", - beamSqlRowType.size(), rawRecord.size() - )); - } else { - for (int idx = 0; idx < beamSqlRowType.size(); idx++) { - String raw = rawRecord.get(idx); - addFieldWithAutoTypeCasting(row, idx, raw); - } - } - } catch (IOException e) { - throw new IllegalArgumentException("decodeRecord failed!", e); - } - return row; - } - - public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) { - StringWriter writer = new StringWriter(); - try (CSVPrinter printer = csvFormat.print(writer)) { - for (int i = 0; i < row.size(); i++) { - printer.print(row.getFieldValue(i).toString()); - } - printer.println(); - } catch (IOException e) { - throw new IllegalArgumentException("encodeRecord failed!", e); - } - return writer.toString(); - } - - public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) { - if (rawObj == null) { - row.addField(idx, null); - return; - } - - SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx); - // auto-casting for numberics - if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) - || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { - String raw = rawObj.toString(); - switch (columnType) { - case TINYINT: - row.addField(idx, Byte.valueOf(raw)); - break; - case SMALLINT: - row.addField(idx, Short.valueOf(raw)); - break; - case INTEGER: - row.addField(idx, Integer.valueOf(raw)); - break; - case BIGINT: - row.addField(idx, Long.valueOf(raw)); - break; - case FLOAT: - row.addField(idx, Float.valueOf(raw)); - break; - case DOUBLE: - row.addField(idx, Double.valueOf(raw)); - break; - default: - throw new UnsupportedOperationException( - String.format("Column type %s is not supported yet!", columnType)); - } - } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) { - // convert NlsString to String - if (rawObj instanceof NlsString) { - row.addField(idx, ((NlsString) rawObj).getValue()); - } else { - row.addField(idx, rawObj); - } - } else { - // keep the origin - row.addField(idx, rawObj); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java deleted file mode 100644 index a18f3de..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema.kafka; - -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; - -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.commons.csv.CSVFormat; - -/** - * A Kafka topic that saves records as CSV format. - * - */ -public class BeamKafkaCSVTable extends BeamKafkaTable { - private CSVFormat csvFormat; - public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, - List<String> topics) { - this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT); - } - - public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, - List<String> topics, CSVFormat format) { - super(beamSqlRowType, bootstrapServers, topics); - this.csvFormat = format; - } - - @Override - public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> - getPTransformForInput() { - return new CsvRecorderDecoder(beamSqlRowType, csvFormat); - } - - @Override - public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> - getPTransformForOutput() { - return new CsvRecorderEncoder(beamSqlRowType, csvFormat); - } - - /** - * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}. - * - */ - public static class CsvRecorderDecoder - extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> { - private BeamSqlRowType rowType; - private CSVFormat format; - public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) { - this.rowType = rowType; - this.format = format; - } - - @Override - public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) { - return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() { - @ProcessElement - public void processElement(ProcessContext c) { - String rowInString = new String(c.element().getValue()); - c.output(csvLine2BeamSqlRow(format, rowInString, rowType)); - } - })); - } - } - - /** - * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}. - * - */ - public static class CsvRecorderEncoder - extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> { - private BeamSqlRowType rowType; - private CSVFormat format; - public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) { - this.rowType = rowType; - this.format = format; - } - - @Override - public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) { - return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() { - @ProcessElement - public void processElement(ProcessContext c) { - BeamSqlRow in = c.element(); - c.output(KV.of(new byte[] {}, beamSqlRow2CsvLine(in, format).getBytes())); - } - })); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java deleted file mode 100644 index faa2706..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema.kafka; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; - -/** - * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to - * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}. - * - */ -public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { - - private String bootstrapServers; - private List<String> topics; - private Map<String, Object> configUpdates; - - protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) { - super(beamSqlRowType); - } - - public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, - List<String> topics) { - super(beamSqlRowType); - this.bootstrapServers = bootstrapServers; - this.topics = topics; - } - - public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { - this.configUpdates = configUpdates; - return this; - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; - } - - public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> - getPTransformForInput(); - - public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> - getPTransformForOutput(); - - @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply("read", - KafkaIO.<byte[], byte[]>read() - .withBootstrapServers(bootstrapServers) - .withTopics(topics) - .updateConsumerProperties(configUpdates) - .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withoutMetadata()) - .apply("in_format", getPTransformForInput()); - } - - @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - checkArgument(topics != null && topics.size() == 1, - "Only one topic can be acceptable as output."); - - return new PTransform<PCollection<BeamSqlRow>, PDone>() { - @Override - public PDone expand(PCollection<BeamSqlRow> input) { - return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", - KafkaIO.<byte[], byte[]>write() - .withBootstrapServers(bootstrapServers) - .withTopic(topics.get(0)) - .withKeySerializer(ByteArraySerializer.class) - .withValueSerializer(ByteArraySerializer.class)); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java deleted file mode 100644 index 0418372..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * table schema for KafkaIO. - */ -package org.apache.beam.dsls.sql.schema.kafka; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java deleted file mode 100644 index 4c41826..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * define table schema, to map with Beam IO components. - * - */ -package org.apache.beam.dsls.sql.schema; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java deleted file mode 100644 index 9ed56b4..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema.text; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.commons.csv.CSVFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV. - * - * <p> - * {@link CSVFormat} itself has many dialects, check its javadoc for more info. - * </p> - */ -public class BeamTextCSVTable extends BeamTextTable { - private static final Logger LOG = LoggerFactory - .getLogger(BeamTextCSVTable.class); - - private CSVFormat csvFormat; - - /** - * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. - */ - public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) { - this(beamSqlRowType, filePattern, CSVFormat.DEFAULT); - } - - public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern, - CSVFormat csvFormat) { - super(beamSqlRowType, filePattern); - this.csvFormat = csvFormat; - } - - @Override - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) - .apply("parseCSVLine", - new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat)); - } - - @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java deleted file mode 100644 index 874c3e4..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema.text; - -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; - -import java.io.Serializable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.commons.csv.CSVFormat; - -/** - * IOReader for {@code BeamTextCSVTable}. - */ -public class BeamTextCSVTableIOReader - extends PTransform<PCollection<String>, PCollection<BeamSqlRow>> - implements Serializable { - private String filePattern; - protected BeamSqlRowType beamSqlRowType; - protected CSVFormat csvFormat; - - public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern, - CSVFormat csvFormat) { - this.filePattern = filePattern; - this.beamSqlRowType = beamSqlRowType; - this.csvFormat = csvFormat; - } - - @Override - public PCollection<BeamSqlRow> expand(PCollection<String> input) { - return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() { - @ProcessElement - public void processElement(ProcessContext ctx) { - String str = ctx.element(); - ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType)); - } - })); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java deleted file mode 100644 index f61bb71..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema.text; - -import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; - -import java.io.Serializable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.commons.csv.CSVFormat; - -/** - * IOWriter for {@code BeamTextCSVTable}. - */ -public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone> - implements Serializable { - private String filePattern; - protected BeamSqlRowType beamSqlRowType; - protected CSVFormat csvFormat; - - public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern, - CSVFormat csvFormat) { - this.filePattern = filePattern; - this.beamSqlRowType = beamSqlRowType; - this.csvFormat = csvFormat; - } - - @Override public PDone expand(PCollection<BeamSqlRow> input) { - return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() { - - @ProcessElement public void processElement(ProcessContext ctx) { - BeamSqlRow row = ctx.element(); - ctx.output(beamSqlRow2CsvLine(row, csvFormat)); - } - })).apply(TextIO.write().to(filePattern)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java deleted file mode 100644 index 6dc6cd0..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.schema.text; - -import java.io.Serializable; - -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; - -/** - * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). - */ -public abstract class BeamTextTable extends BaseBeamTable implements Serializable { - protected String filePattern; - - protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) { - super(beamSqlRowType); - this.filePattern = filePattern; - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.BOUNDED; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java deleted file mode 100644 index f48f2fe..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Table schema for text files. - */ -package org.apache.beam.dsls.sql.schema.text; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java deleted file mode 100644 index 5b21765..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.transform; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.BigDecimalCoder; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.KV; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.schema.impl.AggregateFunctionImpl; -import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; -import org.apache.calcite.util.ImmutableBitSet; -import org.joda.time.Instant; - -/** - * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation. - */ -public class BeamAggregationTransforms implements Serializable{ - /** - * Merge KV to single record. - */ - public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { - private BeamSqlRowType outRowType; - private List<String> aggFieldNames; - private int windowStartFieldIdx; - - public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList - , int windowStartFieldIdx) { - this.outRowType = outRowType; - this.aggFieldNames = new ArrayList<>(); - for (AggregateCall ac : aggList) { - aggFieldNames.add(ac.getName()); - } - this.windowStartFieldIdx = windowStartFieldIdx; - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - BeamSqlRow outRecord = new BeamSqlRow(outRowType); - outRecord.updateWindowRange(c.element().getKey(), window); - - KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element(); - for (String f : kvRecord.getKey().getDataType().getFieldsName()) { - outRecord.addField(f, kvRecord.getKey().getFieldValue(f)); - } - for (int idx = 0; idx < aggFieldNames.size(); ++idx) { - outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx)); - } - if (windowStartFieldIdx != -1) { - outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate()); - } - - c.output(outRecord); - } - } - - /** - * extract group-by fields. - */ - public static class AggregationGroupByKeyFn - implements SerializableFunction<BeamSqlRow, BeamSqlRow> { - private List<Integer> groupByKeys; - - public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) { - this.groupByKeys = new ArrayList<>(); - for (int i : groupSet.asList()) { - if (i != windowFieldIdx) { - groupByKeys.add(i); - } - } - } - - @Override - public BeamSqlRow apply(BeamSqlRow input) { - BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType()); - BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey); - keyOfRecord.updateWindowRange(input, null); - - for (int idx = 0; idx < groupByKeys.size(); ++idx) { - keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx))); - } - return keyOfRecord; - } - - private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) { - List<String> fieldNames = new ArrayList<>(); - List<Integer> fieldTypes = new ArrayList<>(); - for (int idx : groupByKeys) { - fieldNames.add(dataType.getFieldsName().get(idx)); - fieldTypes.add(dataType.getFieldsType().get(idx)); - } - return BeamSqlRowType.create(fieldNames, fieldTypes); - } - } - - /** - * Assign event timestamp. - */ - public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> { - private int windowFieldIdx = -1; - - public WindowTimestampFn(int windowFieldIdx) { - super(); - this.windowFieldIdx = windowFieldIdx; - } - - @Override - public Instant apply(BeamSqlRow input) { - return new Instant(input.getDate(windowFieldIdx).getTime()); - } - } - - /** - * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}. - */ - public static class AggregationAdaptor - extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> { - private List<BeamSqlUdaf> aggregators; - private List<BeamSqlExpression> sourceFieldExps; - private BeamSqlRowType finalRowType; - - public AggregationAdaptor(List<AggregateCall> aggregationCalls, - BeamSqlRowType sourceRowType) { - aggregators = new ArrayList<>(); - sourceFieldExps = new ArrayList<>(); - List<String> outFieldsName = new ArrayList<>(); - List<Integer> outFieldsType = new ArrayList<>(); - for (AggregateCall call : aggregationCalls) { - int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0; - BeamSqlExpression sourceExp = new BeamSqlInputRefExpression( - CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex); - sourceFieldExps.add(sourceExp); - - outFieldsName.add(call.name); - int outFieldType = CalciteUtils.toJavaType(call.type.getSqlTypeName()); - outFieldsType.add(outFieldType); - - switch (call.getAggregation().getName()) { - case "COUNT": - aggregators.add(new BeamBuiltinAggregations.Count()); - break; - case "MAX": - aggregators.add(BeamBuiltinAggregations.Max.create(call.type.getSqlTypeName())); - break; - case "MIN": - aggregators.add(BeamBuiltinAggregations.Min.create(call.type.getSqlTypeName())); - break; - case "SUM": - aggregators.add(BeamBuiltinAggregations.Sum.create(call.type.getSqlTypeName())); - break; - case "AVG": - aggregators.add(BeamBuiltinAggregations.Avg.create(call.type.getSqlTypeName())); - break; - default: - if (call.getAggregation() instanceof SqlUserDefinedAggFunction) { - // handle UDAF. - SqlUserDefinedAggFunction udaf = (SqlUserDefinedAggFunction) call.getAggregation(); - AggregateFunctionImpl fn = (AggregateFunctionImpl) udaf.function; - try { - aggregators.add((BeamSqlUdaf) fn.declaringClass.newInstance()); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } else { - throw new UnsupportedOperationException( - String.format("Aggregator [%s] is not supported", - call.getAggregation().getName())); - } - break; - } - } - finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType); - } - @Override - public AggregationAccumulator createAccumulator() { - AggregationAccumulator initialAccu = new AggregationAccumulator(); - for (BeamSqlUdaf agg : aggregators) { - initialAccu.accumulatorElements.add(agg.init()); - } - return initialAccu; - } - @Override - public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) { - AggregationAccumulator deltaAcc = new AggregationAccumulator(); - for (int idx = 0; idx < aggregators.size(); ++idx) { - deltaAcc.accumulatorElements.add( - aggregators.get(idx).add(accumulator.accumulatorElements.get(idx), - sourceFieldExps.get(idx).evaluate(input).getValue())); - } - return deltaAcc; - } - @Override - public AggregationAccumulator mergeAccumulators(Iterable<AggregationAccumulator> accumulators) { - AggregationAccumulator deltaAcc = new AggregationAccumulator(); - for (int idx = 0; idx < aggregators.size(); ++idx) { - List accs = new ArrayList<>(); - Iterator<AggregationAccumulator> ite = accumulators.iterator(); - while (ite.hasNext()) { - accs.add(ite.next().accumulatorElements.get(idx)); - } - deltaAcc.accumulatorElements.add(aggregators.get(idx).merge(accs)); - } - return deltaAcc; - } - @Override - public BeamSqlRow extractOutput(AggregationAccumulator accumulator) { - BeamSqlRow result = new BeamSqlRow(finalRowType); - for (int idx = 0; idx < aggregators.size(); ++idx) { - result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx))); - } - return result; - } - @Override - public Coder<AggregationAccumulator> getAccumulatorCoder( - CoderRegistry registry, Coder<BeamSqlRow> inputCoder) - throws CannotProvideCoderException { - registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of()); - List<Coder> aggAccuCoderList = new ArrayList<>(); - for (BeamSqlUdaf udaf : aggregators) { - aggAccuCoderList.add(udaf.getAccumulatorCoder(registry)); - } - return new AggregationAccumulatorCoder(aggAccuCoderList); - } - } - - /** - * A class to holder varied accumulator objects. - */ - public static class AggregationAccumulator{ - private List accumulatorElements = new ArrayList<>(); - } - - /** - * Coder for {@link AggregationAccumulator}. - */ - public static class AggregationAccumulatorCoder extends CustomCoder<AggregationAccumulator>{ - private VarIntCoder sizeCoder = VarIntCoder.of(); - private List<Coder> elementCoders; - - public AggregationAccumulatorCoder(List<Coder> elementCoders) { - this.elementCoders = elementCoders; - } - - @Override - public void encode(AggregationAccumulator value, OutputStream outStream) - throws CoderException, IOException { - sizeCoder.encode(value.accumulatorElements.size(), outStream); - for (int idx = 0; idx < value.accumulatorElements.size(); ++idx) { - elementCoders.get(idx).encode(value.accumulatorElements.get(idx), outStream); - } - } - - @Override - public AggregationAccumulator decode(InputStream inStream) throws CoderException, IOException { - AggregationAccumulator accu = new AggregationAccumulator(); - int size = sizeCoder.decode(inStream); - for (int idx = 0; idx < size; ++idx) { - accu.accumulatorElements.add(elementCoders.get(idx).decode(inStream)); - } - return accu; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java deleted file mode 100644 index fab2666..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.transform; - -import java.math.BigDecimal; -import java.util.Date; -import java.util.Iterator; -import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; -import org.apache.beam.sdk.coders.BigDecimalCoder; -import org.apache.beam.sdk.coders.ByteCoder; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.DoubleCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.values.KV; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Built-in aggregations functions for COUNT/MAX/MIN/SUM/AVG. - */ -class BeamBuiltinAggregations { - /** - * Built-in aggregation for COUNT. - */ - public static final class Count<T> extends BeamSqlUdaf<T, Long, Long> { - public Count() {} - - @Override - public Long init() { - return 0L; - } - - @Override - public Long add(Long accumulator, T input) { - return accumulator + 1; - } - - @Override - public Long merge(Iterable<Long> accumulators) { - long v = 0L; - Iterator<Long> ite = accumulators.iterator(); - while (ite.hasNext()) { - v += ite.next(); - } - return v; - } - - @Override - public Long result(Long accumulator) { - return accumulator; - } - } - - /** - * Built-in aggregation for MAX. - */ - public static final class Max<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> { - public static Max create(SqlTypeName fieldType) { - switch (fieldType) { - case INTEGER: - return new BeamBuiltinAggregations.Max<Integer>(fieldType); - case SMALLINT: - return new BeamBuiltinAggregations.Max<Short>(fieldType); - case TINYINT: - return new BeamBuiltinAggregations.Max<Byte>(fieldType); - case BIGINT: - return new BeamBuiltinAggregations.Max<Long>(fieldType); - case FLOAT: - return new BeamBuiltinAggregations.Max<Float>(fieldType); - case DOUBLE: - return new BeamBuiltinAggregations.Max<Double>(fieldType); - case TIMESTAMP: - return new BeamBuiltinAggregations.Max<Date>(fieldType); - case DECIMAL: - return new BeamBuiltinAggregations.Max<BigDecimal>(fieldType); - default: - throw new UnsupportedOperationException( - String.format("[%s] is not support in MAX", fieldType)); - } - } - - private final SqlTypeName fieldType; - private Max(SqlTypeName fieldType) { - this.fieldType = fieldType; - } - - @Override - public T init() { - return null; - } - - @Override - public T add(T accumulator, T input) { - return (accumulator == null || accumulator.compareTo(input) < 0) ? input : accumulator; - } - - @Override - public T merge(Iterable<T> accumulators) { - Iterator<T> ite = accumulators.iterator(); - T mergedV = ite.next(); - while (ite.hasNext()) { - T v = ite.next(); - mergedV = mergedV.compareTo(v) > 0 ? mergedV : v; - } - return mergedV; - } - - @Override - public T result(T accumulator) { - return accumulator; - } - - @Override - public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { - return BeamBuiltinAggregations.getSqlTypeCoder(fieldType); - } - } - - /** - * Built-in aggregation for MIN. - */ - public static final class Min<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> { - public static Min create(SqlTypeName fieldType) { - switch (fieldType) { - case INTEGER: - return new BeamBuiltinAggregations.Min<Integer>(fieldType); - case SMALLINT: - return new BeamBuiltinAggregations.Min<Short>(fieldType); - case TINYINT: - return new BeamBuiltinAggregations.Min<Byte>(fieldType); - case BIGINT: - return new BeamBuiltinAggregations.Min<Long>(fieldType); - case FLOAT: - return new BeamBuiltinAggregations.Min<Float>(fieldType); - case DOUBLE: - return new BeamBuiltinAggregations.Min<Double>(fieldType); - case TIMESTAMP: - return new BeamBuiltinAggregations.Min<Date>(fieldType); - case DECIMAL: - return new BeamBuiltinAggregations.Min<BigDecimal>(fieldType); - default: - throw new UnsupportedOperationException( - String.format("[%s] is not support in MIN", fieldType)); - } - } - - private final SqlTypeName fieldType; - private Min(SqlTypeName fieldType) { - this.fieldType = fieldType; - } - - @Override - public T init() { - return null; - } - - @Override - public T add(T accumulator, T input) { - return (accumulator == null || accumulator.compareTo(input) > 0) ? input : accumulator; - } - - @Override - public T merge(Iterable<T> accumulators) { - Iterator<T> ite = accumulators.iterator(); - T mergedV = ite.next(); - while (ite.hasNext()) { - T v = ite.next(); - mergedV = mergedV.compareTo(v) < 0 ? mergedV : v; - } - return mergedV; - } - - @Override - public T result(T accumulator) { - return accumulator; - } - - @Override - public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { - return BeamBuiltinAggregations.getSqlTypeCoder(fieldType); - } - } - - /** - * Built-in aggregation for SUM. - */ - public static final class Sum<T> extends BeamSqlUdaf<T, BigDecimal, T> { - public static Sum create(SqlTypeName fieldType) { - switch (fieldType) { - case INTEGER: - return new BeamBuiltinAggregations.Sum<Integer>(fieldType); - case SMALLINT: - return new BeamBuiltinAggregations.Sum<Short>(fieldType); - case TINYINT: - return new BeamBuiltinAggregations.Sum<Byte>(fieldType); - case BIGINT: - return new BeamBuiltinAggregations.Sum<Long>(fieldType); - case FLOAT: - return new BeamBuiltinAggregations.Sum<Float>(fieldType); - case DOUBLE: - return new BeamBuiltinAggregations.Sum<Double>(fieldType); - case TIMESTAMP: - return new BeamBuiltinAggregations.Sum<Date>(fieldType); - case DECIMAL: - return new BeamBuiltinAggregations.Sum<BigDecimal>(fieldType); - default: - throw new UnsupportedOperationException( - String.format("[%s] is not support in SUM", fieldType)); - } - } - - private SqlTypeName fieldType; - private Sum(SqlTypeName fieldType) { - this.fieldType = fieldType; - } - - @Override - public BigDecimal init() { - return new BigDecimal(0); - } - - @Override - public BigDecimal add(BigDecimal accumulator, T input) { - return accumulator.add(new BigDecimal(input.toString())); - } - - @Override - public BigDecimal merge(Iterable<BigDecimal> accumulators) { - BigDecimal v = new BigDecimal(0); - Iterator<BigDecimal> ite = accumulators.iterator(); - while (ite.hasNext()) { - v = v.add(ite.next()); - } - return v; - } - - @Override - public T result(BigDecimal accumulator) { - Object result = null; - switch (fieldType) { - case INTEGER: - result = accumulator.intValue(); - break; - case BIGINT: - result = accumulator.longValue(); - break; - case SMALLINT: - result = accumulator.shortValue(); - break; - case TINYINT: - result = accumulator.byteValue(); - break; - case DOUBLE: - result = accumulator.doubleValue(); - break; - case FLOAT: - result = accumulator.floatValue(); - break; - case DECIMAL: - result = accumulator; - break; - default: - break; - } - return (T) result; - } - } - - /** - * Built-in aggregation for AVG. - */ - public static final class Avg<T> extends BeamSqlUdaf<T, KV<BigDecimal, Long>, T> { - public static Avg create(SqlTypeName fieldType) { - switch (fieldType) { - case INTEGER: - return new BeamBuiltinAggregations.Avg<Integer>(fieldType); - case SMALLINT: - return new BeamBuiltinAggregations.Avg<Short>(fieldType); - case TINYINT: - return new BeamBuiltinAggregations.Avg<Byte>(fieldType); - case BIGINT: - return new BeamBuiltinAggregations.Avg<Long>(fieldType); - case FLOAT: - return new BeamBuiltinAggregations.Avg<Float>(fieldType); - case DOUBLE: - return new BeamBuiltinAggregations.Avg<Double>(fieldType); - case TIMESTAMP: - return new BeamBuiltinAggregations.Avg<Date>(fieldType); - case DECIMAL: - return new BeamBuiltinAggregations.Avg<BigDecimal>(fieldType); - default: - throw new UnsupportedOperationException( - String.format("[%s] is not support in AVG", fieldType)); - } - } - - private SqlTypeName fieldType; - private Avg(SqlTypeName fieldType) { - this.fieldType = fieldType; - } - - @Override - public KV<BigDecimal, Long> init() { - return KV.of(new BigDecimal(0), 0L); - } - - @Override - public KV<BigDecimal, Long> add(KV<BigDecimal, Long> accumulator, T input) { - return KV.of( - accumulator.getKey().add(new BigDecimal(input.toString())), - accumulator.getValue() + 1); - } - - @Override - public KV<BigDecimal, Long> merge(Iterable<KV<BigDecimal, Long>> accumulators) { - BigDecimal v = new BigDecimal(0); - long s = 0; - Iterator<KV<BigDecimal, Long>> ite = accumulators.iterator(); - while (ite.hasNext()) { - KV<BigDecimal, Long> r = ite.next(); - v = v.add(r.getKey()); - s += r.getValue(); - } - return KV.of(v, s); - } - - @Override - public T result(KV<BigDecimal, Long> accumulator) { - BigDecimal decimalAvg = accumulator.getKey().divide( - new BigDecimal(accumulator.getValue())); - Object result = null; - switch (fieldType) { - case INTEGER: - result = decimalAvg.intValue(); - break; - case BIGINT: - result = decimalAvg.longValue(); - break; - case SMALLINT: - result = decimalAvg.shortValue(); - break; - case TINYINT: - result = decimalAvg.byteValue(); - break; - case DOUBLE: - result = decimalAvg.doubleValue(); - break; - case FLOAT: - result = decimalAvg.floatValue(); - break; - case DECIMAL: - result = decimalAvg; - break; - default: - break; - } - return (T) result; - } - - @Override - public Coder<KV<BigDecimal, Long>> getAccumulatorCoder(CoderRegistry registry) - throws CannotProvideCoderException { - return KvCoder.of(BigDecimalCoder.of(), VarLongCoder.of()); - } - } - - /** - * Find {@link Coder} for Beam SQL field types. - */ - private static Coder getSqlTypeCoder(SqlTypeName sqlType) { - switch (sqlType) { - case INTEGER: - return VarIntCoder.of(); - case SMALLINT: - return SerializableCoder.of(Short.class); - case TINYINT: - return ByteCoder.of(); - case BIGINT: - return VarLongCoder.of(); - case FLOAT: - return SerializableCoder.of(Float.class); - case DOUBLE: - return DoubleCoder.of(); - case TIMESTAMP: - return SerializableCoder.of(Date.class); - case DECIMAL: - return BigDecimalCoder.of(); - default: - throw new UnsupportedOperationException( - String.format("Cannot find a Coder for data type [%s]", sqlType)); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java deleted file mode 100644 index 9ea4376..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.transform; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.util.Pair; - -/** - * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation. - */ -public class BeamJoinTransforms { - - /** - * A {@code SimpleFunction} to extract join fields from the specified row. - */ - public static class ExtractJoinFields - extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { - private final boolean isLeft; - private final List<Pair<Integer, Integer>> joinColumns; - - public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) { - this.isLeft = isLeft; - this.joinColumns = joinColumns; - } - - @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { - // build the type - // the name of the join field is not important - List<String> names = new ArrayList<>(joinColumns.size()); - List<Integer> types = new ArrayList<>(joinColumns.size()); - for (int i = 0; i < joinColumns.size(); i++) { - names.add("c" + i); - types.add(isLeft - ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : - input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); - } - BeamSqlRowType type = BeamSqlRowType.create(names, types); - - // build the row - BeamSqlRow row = new BeamSqlRow(type); - for (int i = 0; i < joinColumns.size(); i++) { - row.addField(i, input - .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue())); - } - return KV.of(row, input); - } - } - - - /** - * A {@code DoFn} which implement the sideInput-JOIN. - */ - public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { - private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView; - private final JoinRelType joinType; - private final BeamSqlRow rightNullRow; - private final boolean swap; - - public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow, - PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView, - boolean swap) { - this.joinType = joinType; - this.rightNullRow = rightNullRow; - this.sideInputView = sideInputView; - this.swap = swap; - } - - @ProcessElement public void processElement(ProcessContext context) { - BeamSqlRow key = context.element().getKey(); - BeamSqlRow leftRow = context.element().getValue(); - Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView); - Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key); - - if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) { - Iterator<BeamSqlRow> it = rightRowsIterable.iterator(); - while (it.hasNext()) { - context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap)); - } - } else { - if (joinType == JoinRelType.LEFT) { - context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap)); - } - } - } - } - - - /** - * A {@code SimpleFunction} to combine two rows into one. - */ - public static class JoinParts2WholeRow - extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> { - @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) { - KV<BeamSqlRow, BeamSqlRow> parts = input.getValue(); - BeamSqlRow leftRow = parts.getKey(); - BeamSqlRow rightRow = parts.getValue(); - return combineTwoRowsIntoOne(leftRow, rightRow, false); - } - } - - /** - * As the method name suggests: combine two rows into one wide row. - */ - private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, - BeamSqlRow rightRow, boolean swap) { - if (swap) { - return combineTwoRowsIntoOneHelper(rightRow, leftRow); - } else { - return combineTwoRowsIntoOneHelper(leftRow, rightRow); - } - } - - /** - * As the method name suggests: combine two rows into one wide row. - */ - private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow, - BeamSqlRow rightRow) { - // build the type - List<String> names = new ArrayList<>(leftRow.size() + rightRow.size()); - names.addAll(leftRow.getDataType().getFieldsName()); - names.addAll(rightRow.getDataType().getFieldsName()); - - List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); - types.addAll(leftRow.getDataType().getFieldsType()); - types.addAll(rightRow.getDataType().getFieldsType()); - BeamSqlRowType type = BeamSqlRowType.create(names, types); - - BeamSqlRow row = new BeamSqlRow(type); - // build the row - for (int i = 0; i < leftRow.size(); i++) { - row.addField(i, leftRow.getFieldValue(i)); - } - - for (int i = 0; i < rightRow.size(); i++) { - row.addField(i + leftRow.size(), rightRow.getFieldValue(i)); - } - - return row; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java deleted file mode 100644 index a983cf5..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.transform; - -import java.util.Iterator; - -import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations. - */ -public abstract class BeamSetOperatorsTransforms { - /** - * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}. - */ - public static class BeamSqlRow2KvFn extends - SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { - @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { - return KV.of(input, input); - } - } - - /** - * Filter function used for Set operators. - */ - public static class SetOperatorFilteringDoFn extends - DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> { - private TupleTag<BeamSqlRow> leftTag; - private TupleTag<BeamSqlRow> rightTag; - private BeamSetOperatorRelBase.OpType opType; - // ALL? - private boolean all; - - public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag, - BeamSetOperatorRelBase.OpType opType, boolean all) { - this.leftTag = leftTag; - this.rightTag = rightTag; - this.opType = opType; - this.all = all; - } - - @ProcessElement public void processElement(ProcessContext ctx) { - CoGbkResult coGbkResult = ctx.element().getValue(); - Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag); - Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag); - switch (opType) { - case UNION: - if (all) { - // output both left & right - Iterator<BeamSqlRow> iter = leftRows.iterator(); - while (iter.hasNext()) { - ctx.output(iter.next()); - } - iter = rightRows.iterator(); - while (iter.hasNext()) { - ctx.output(iter.next()); - } - } else { - // only output the key - ctx.output(ctx.element().getKey()); - } - break; - case INTERSECT: - if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { - if (all) { - for (BeamSqlRow leftRow : leftRows) { - ctx.output(leftRow); - } - } else { - ctx.output(ctx.element().getKey()); - } - } - break; - case MINUS: - if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { - Iterator<BeamSqlRow> iter = leftRows.iterator(); - if (all) { - // output all - while (iter.hasNext()) { - ctx.output(iter.next()); - } - } else { - // only output one - ctx.output(iter.next()); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java deleted file mode 100644 index d4dbc6a..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.transform; - -import java.util.List; -import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; -import org.apache.beam.dsls.sql.rel.BeamFilterRel; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step. - * - */ -public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> { - - private String stepName; - private BeamSqlExpressionExecutor executor; - - public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) { - super(); - this.stepName = stepName; - this.executor = executor; - } - - @Setup - public void setup() { - executor.prepare(); - } - - @ProcessElement - public void processElement(ProcessContext c) { - BeamSqlRow in = c.element(); - - List<Object> result = executor.execute(in); - - if ((Boolean) result.get(0)) { - c.output(in); - } - } - - @Teardown - public void close() { - executor.close(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java deleted file mode 100644 index d8a2a63..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.transform; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * A test PTransform to display output in console. - * - */ -public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> { - - private String stepName; - - public BeamSqlOutputToConsoleFn(String stepName) { - super(); - this.stepName = stepName; - } - - @ProcessElement - public void processElement(ProcessContext c) { - System.out.println("Output: " + c.element().getDataValues()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java deleted file mode 100644 index 886ddcf..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.transform; - -import java.util.List; -import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; -import org.apache.beam.dsls.sql.rel.BeamProjectRel; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.schema.BeamTableUtils; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; - -/** - * - * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step. - * - */ -public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> { - private String stepName; - private BeamSqlExpressionExecutor executor; - private BeamSqlRowType outputRowType; - - public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor, - BeamSqlRowType outputRowType) { - super(); - this.stepName = stepName; - this.executor = executor; - this.outputRowType = outputRowType; - } - - @Setup - public void setup() { - executor.prepare(); - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - BeamSqlRow inputRow = c.element(); - List<Object> results = executor.execute(inputRow); - - BeamSqlRow outRow = new BeamSqlRow(outputRowType); - outRow.updateWindowRange(inputRow, window); - - for (int idx = 0; idx < results.size(); ++idx) { - BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx)); - } - - c.output(outRow); - } - - @Teardown - public void close() { - executor.close(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java deleted file mode 100644 index 5169749..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql pipeline. - */ -package org.apache.beam.dsls.sql.transform; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java deleted file mode 100644 index 4b8696b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.utils; - -import java.sql.Types; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Utility methods for Calcite related operations. - */ -public class CalciteUtils { - private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>(); - private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>(); - static { - JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT); - JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT); - JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER); - JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT); - - JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT); - JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE); - - JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL); - - JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR); - JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR); - - JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE); - JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME); - JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP); - - JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN); - - for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) { - CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey()); - } - } - - /** - * Get the corresponding {@code SqlTypeName} for an integer sql type. - */ - public static SqlTypeName toCalciteType(int type) { - return JAVA_TO_CALCITE_MAPPING.get(type); - } - - /** - * Get the integer sql type from Calcite {@code SqlTypeName}. - */ - public static Integer toJavaType(SqlTypeName typeName) { - return CALCITE_TO_JAVA_MAPPING.get(typeName); - } - - /** - * Get the {@code SqlTypeName} for the specified column of a table. - */ - public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) { - return toCalciteType(schema.getFieldsType().get(index)); - } - - /** - * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table. - */ - public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) { - List<String> fieldNames = new ArrayList<>(); - List<Integer> fieldTypes = new ArrayList<>(); - for (RelDataTypeField f : tableInfo.getFieldList()) { - fieldNames.add(f.getName()); - fieldTypes.add(toJavaType(f.getType().getSqlTypeName())); - } - return BeamSqlRowType.create(fieldNames, fieldTypes); - } - - /** - * Create an instance of {@code RelDataType} so it can be used to create a table. - */ - public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) { - return new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a) { - RelDataTypeFactory.FieldInfoBuilder builder = a.builder(); - for (int idx = 0; idx < that.getFieldsName().size(); ++idx) { - builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx))); - } - return builder.build(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java deleted file mode 100644 index b5c861a..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Utility classes. - */ -package org.apache.beam.dsls.sql.utils;