http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java new file mode 100644 index 0000000..9655ebd --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * define table schema, to map with Beam IO components. + * + */ +package org.apache.beam.sdk.extensions.sql.schema;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java new file mode 100644 index 0000000..c44faab --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.schema.text; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.commons.csv.CSVFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV. + * + * <p> + * {@link CSVFormat} itself has many dialects, check its javadoc for more info. + * </p> + */ +public class BeamTextCSVTable extends BeamTextTable { + private static final Logger LOG = LoggerFactory + .getLogger(BeamTextCSVTable.class); + + private CSVFormat csvFormat; + + /** + * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. + */ + public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) { + this(beamSqlRowType, filePattern, CSVFormat.DEFAULT); + } + + public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern, + CSVFormat csvFormat) { + super(beamSqlRowType, filePattern); + this.csvFormat = csvFormat; + } + + @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) + .apply("parseCSVLine", + new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat)); + } + + @Override + public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java new file mode 100644 index 0000000..06109c3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.schema.text; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.csv.CSVFormat; + +/** + * IOReader for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableIOReader + extends PTransform<PCollection<String>, PCollection<BeamSqlRow>> + implements Serializable { + private String filePattern; + protected BeamSqlRowType beamSqlRowType; + protected CSVFormat csvFormat; + + public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern, + CSVFormat csvFormat) { + this.filePattern = filePattern; + this.beamSqlRowType = beamSqlRowType; + this.csvFormat = csvFormat; + } + + @Override + public PCollection<BeamSqlRow> expand(PCollection<String> input) { + return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() { + @ProcessElement + public void processElement(ProcessContext ctx) { + String str = ctx.element(); + ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType)); + } + })); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java new file mode 100644 index 0000000..1684b37 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.schema.text; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.commons.csv.CSVFormat; + +/** + * IOWriter for {@code BeamTextCSVTable}. + */ +public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone> + implements Serializable { + private String filePattern; + protected BeamSqlRowType beamSqlRowType; + protected CSVFormat csvFormat; + + public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern, + CSVFormat csvFormat) { + this.filePattern = filePattern; + this.beamSqlRowType = beamSqlRowType; + this.csvFormat = csvFormat; + } + + @Override public PDone expand(PCollection<BeamSqlRow> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() { + + @ProcessElement public void processElement(ProcessContext ctx) { + BeamSqlRow row = ctx.element(); + ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat)); + } + })).apply(TextIO.write().to(filePattern)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java new file mode 100644 index 0000000..e85608d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.schema.text; + +import java.io.Serializable; +import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; + +/** + * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). + */ +public abstract class BeamTextTable extends BaseBeamTable implements Serializable { + protected String filePattern; + + protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) { + super(beamSqlRowType); + this.filePattern = filePattern; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.BOUNDED; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java new file mode 100644 index 0000000..f914e2e --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Table schema for text files. + */ +package org.apache.beam.sdk.extensions.sql.schema.text; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java new file mode 100644 index 0000000..6a27da8 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.transform; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.schema.impl.AggregateFunctionImpl; +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; +import org.apache.calcite.util.ImmutableBitSet; +import org.joda.time.Instant; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation. + */ +public class BeamAggregationTransforms implements Serializable{ + /** + * Merge KV to single record. + */ + public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { + private BeamSqlRowType outRowType; + private List<String> aggFieldNames; + private int windowStartFieldIdx; + + public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList + , int windowStartFieldIdx) { + this.outRowType = outRowType; + this.aggFieldNames = new ArrayList<>(); + for (AggregateCall ac : aggList) { + aggFieldNames.add(ac.getName()); + } + this.windowStartFieldIdx = windowStartFieldIdx; + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + BeamSqlRow outRecord = new BeamSqlRow(outRowType); + outRecord.updateWindowRange(c.element().getKey(), window); + + KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element(); + for (String f : kvRecord.getKey().getDataType().getFieldsName()) { + outRecord.addField(f, kvRecord.getKey().getFieldValue(f)); + } + for (int idx = 0; idx < aggFieldNames.size(); ++idx) { + outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx)); + } + if (windowStartFieldIdx != -1) { + outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate()); + } + + c.output(outRecord); + } + } + + /** + * extract group-by fields. + */ + public static class AggregationGroupByKeyFn + implements SerializableFunction<BeamSqlRow, BeamSqlRow> { + private List<Integer> groupByKeys; + + public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) { + this.groupByKeys = new ArrayList<>(); + for (int i : groupSet.asList()) { + if (i != windowFieldIdx) { + groupByKeys.add(i); + } + } + } + + @Override + public BeamSqlRow apply(BeamSqlRow input) { + BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType()); + BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey); + keyOfRecord.updateWindowRange(input, null); + + for (int idx = 0; idx < groupByKeys.size(); ++idx) { + keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx))); + } + return keyOfRecord; + } + + private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) { + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (int idx : groupByKeys) { + fieldNames.add(dataType.getFieldsName().get(idx)); + fieldTypes.add(dataType.getFieldsType().get(idx)); + } + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + } + + /** + * Assign event timestamp. + */ + public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> { + private int windowFieldIdx = -1; + + public WindowTimestampFn(int windowFieldIdx) { + super(); + this.windowFieldIdx = windowFieldIdx; + } + + @Override + public Instant apply(BeamSqlRow input) { + return new Instant(input.getDate(windowFieldIdx).getTime()); + } + } + + /** + * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}. + */ + public static class AggregationAdaptor + extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> { + private List<BeamSqlUdaf> aggregators; + private List<BeamSqlExpression> sourceFieldExps; + private BeamSqlRowType finalRowType; + + public AggregationAdaptor(List<AggregateCall> aggregationCalls, + BeamSqlRowType sourceRowType) { + aggregators = new ArrayList<>(); + sourceFieldExps = new ArrayList<>(); + List<String> outFieldsName = new ArrayList<>(); + List<Integer> outFieldsType = new ArrayList<>(); + for (AggregateCall call : aggregationCalls) { + int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0; + BeamSqlExpression sourceExp = new BeamSqlInputRefExpression( + CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex); + sourceFieldExps.add(sourceExp); + + outFieldsName.add(call.name); + int outFieldType = CalciteUtils.toJavaType(call.type.getSqlTypeName()); + outFieldsType.add(outFieldType); + + switch (call.getAggregation().getName()) { + case "COUNT": + aggregators.add(new BeamBuiltinAggregations.Count()); + break; + case "MAX": + aggregators.add(BeamBuiltinAggregations.Max.create(call.type.getSqlTypeName())); + break; + case "MIN": + aggregators.add(BeamBuiltinAggregations.Min.create(call.type.getSqlTypeName())); + break; + case "SUM": + aggregators.add(BeamBuiltinAggregations.Sum.create(call.type.getSqlTypeName())); + break; + case "AVG": + aggregators.add(BeamBuiltinAggregations.Avg.create(call.type.getSqlTypeName())); + break; + default: + if (call.getAggregation() instanceof SqlUserDefinedAggFunction) { + // handle UDAF. + SqlUserDefinedAggFunction udaf = (SqlUserDefinedAggFunction) call.getAggregation(); + AggregateFunctionImpl fn = (AggregateFunctionImpl) udaf.function; + try { + aggregators.add((BeamSqlUdaf) fn.declaringClass.newInstance()); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } else { + throw new UnsupportedOperationException( + String.format("Aggregator [%s] is not supported", + call.getAggregation().getName())); + } + break; + } + } + finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType); + } + @Override + public AggregationAccumulator createAccumulator() { + AggregationAccumulator initialAccu = new AggregationAccumulator(); + for (BeamSqlUdaf agg : aggregators) { + initialAccu.accumulatorElements.add(agg.init()); + } + return initialAccu; + } + @Override + public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) { + AggregationAccumulator deltaAcc = new AggregationAccumulator(); + for (int idx = 0; idx < aggregators.size(); ++idx) { + deltaAcc.accumulatorElements.add( + aggregators.get(idx).add(accumulator.accumulatorElements.get(idx), + sourceFieldExps.get(idx).evaluate(input).getValue())); + } + return deltaAcc; + } + @Override + public AggregationAccumulator mergeAccumulators(Iterable<AggregationAccumulator> accumulators) { + AggregationAccumulator deltaAcc = new AggregationAccumulator(); + for (int idx = 0; idx < aggregators.size(); ++idx) { + List accs = new ArrayList<>(); + Iterator<AggregationAccumulator> ite = accumulators.iterator(); + while (ite.hasNext()) { + accs.add(ite.next().accumulatorElements.get(idx)); + } + deltaAcc.accumulatorElements.add(aggregators.get(idx).merge(accs)); + } + return deltaAcc; + } + @Override + public BeamSqlRow extractOutput(AggregationAccumulator accumulator) { + BeamSqlRow result = new BeamSqlRow(finalRowType); + for (int idx = 0; idx < aggregators.size(); ++idx) { + result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx))); + } + return result; + } + @Override + public Coder<AggregationAccumulator> getAccumulatorCoder( + CoderRegistry registry, Coder<BeamSqlRow> inputCoder) + throws CannotProvideCoderException { + registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of()); + List<Coder> aggAccuCoderList = new ArrayList<>(); + for (BeamSqlUdaf udaf : aggregators) { + aggAccuCoderList.add(udaf.getAccumulatorCoder(registry)); + } + return new AggregationAccumulatorCoder(aggAccuCoderList); + } + } + + /** + * A class to holder varied accumulator objects. + */ + public static class AggregationAccumulator{ + private List accumulatorElements = new ArrayList<>(); + } + + /** + * Coder for {@link AggregationAccumulator}. + */ + public static class AggregationAccumulatorCoder extends CustomCoder<AggregationAccumulator>{ + private VarIntCoder sizeCoder = VarIntCoder.of(); + private List<Coder> elementCoders; + + public AggregationAccumulatorCoder(List<Coder> elementCoders) { + this.elementCoders = elementCoders; + } + + @Override + public void encode(AggregationAccumulator value, OutputStream outStream) + throws CoderException, IOException { + sizeCoder.encode(value.accumulatorElements.size(), outStream); + for (int idx = 0; idx < value.accumulatorElements.size(); ++idx) { + elementCoders.get(idx).encode(value.accumulatorElements.get(idx), outStream); + } + } + + @Override + public AggregationAccumulator decode(InputStream inStream) throws CoderException, IOException { + AggregationAccumulator accu = new AggregationAccumulator(); + int size = sizeCoder.decode(inStream); + for (int idx = 0; idx < size; ++idx) { + accu.accumulatorElements.add(elementCoders.get(idx).decode(inStream)); + } + return accu; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java new file mode 100644 index 0000000..1183668 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.transform; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.Iterator; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; +import org.apache.beam.sdk.values.KV; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Built-in aggregations functions for COUNT/MAX/MIN/SUM/AVG. + */ +class BeamBuiltinAggregations { + /** + * Built-in aggregation for COUNT. + */ + public static final class Count<T> extends BeamSqlUdaf<T, Long, Long> { + public Count() {} + + @Override + public Long init() { + return 0L; + } + + @Override + public Long add(Long accumulator, T input) { + return accumulator + 1; + } + + @Override + public Long merge(Iterable<Long> accumulators) { + long v = 0L; + Iterator<Long> ite = accumulators.iterator(); + while (ite.hasNext()) { + v += ite.next(); + } + return v; + } + + @Override + public Long result(Long accumulator) { + return accumulator; + } + } + + /** + * Built-in aggregation for MAX. + */ + public static final class Max<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> { + public static Max create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Max<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Max<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Max<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Max<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Max<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Max<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Max<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Max<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in MAX", fieldType)); + } + } + + private final SqlTypeName fieldType; + private Max(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public T init() { + return null; + } + + @Override + public T add(T accumulator, T input) { + return (accumulator == null || accumulator.compareTo(input) < 0) ? input : accumulator; + } + + @Override + public T merge(Iterable<T> accumulators) { + Iterator<T> ite = accumulators.iterator(); + T mergedV = ite.next(); + while (ite.hasNext()) { + T v = ite.next(); + mergedV = mergedV.compareTo(v) > 0 ? mergedV : v; + } + return mergedV; + } + + @Override + public T result(T accumulator) { + return accumulator; + } + + @Override + public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { + return BeamBuiltinAggregations.getSqlTypeCoder(fieldType); + } + } + + /** + * Built-in aggregation for MIN. + */ + public static final class Min<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> { + public static Min create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Min<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Min<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Min<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Min<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Min<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Min<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Min<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Min<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in MIN", fieldType)); + } + } + + private final SqlTypeName fieldType; + private Min(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public T init() { + return null; + } + + @Override + public T add(T accumulator, T input) { + return (accumulator == null || accumulator.compareTo(input) > 0) ? input : accumulator; + } + + @Override + public T merge(Iterable<T> accumulators) { + Iterator<T> ite = accumulators.iterator(); + T mergedV = ite.next(); + while (ite.hasNext()) { + T v = ite.next(); + mergedV = mergedV.compareTo(v) < 0 ? mergedV : v; + } + return mergedV; + } + + @Override + public T result(T accumulator) { + return accumulator; + } + + @Override + public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException { + return BeamBuiltinAggregations.getSqlTypeCoder(fieldType); + } + } + + /** + * Built-in aggregation for SUM. + */ + public static final class Sum<T> extends BeamSqlUdaf<T, BigDecimal, T> { + public static Sum create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Sum<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Sum<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Sum<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Sum<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Sum<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Sum<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Sum<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Sum<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in SUM", fieldType)); + } + } + + private SqlTypeName fieldType; + private Sum(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public BigDecimal init() { + return new BigDecimal(0); + } + + @Override + public BigDecimal add(BigDecimal accumulator, T input) { + return accumulator.add(new BigDecimal(input.toString())); + } + + @Override + public BigDecimal merge(Iterable<BigDecimal> accumulators) { + BigDecimal v = new BigDecimal(0); + Iterator<BigDecimal> ite = accumulators.iterator(); + while (ite.hasNext()) { + v = v.add(ite.next()); + } + return v; + } + + @Override + public T result(BigDecimal accumulator) { + Object result = null; + switch (fieldType) { + case INTEGER: + result = accumulator.intValue(); + break; + case BIGINT: + result = accumulator.longValue(); + break; + case SMALLINT: + result = accumulator.shortValue(); + break; + case TINYINT: + result = accumulator.byteValue(); + break; + case DOUBLE: + result = accumulator.doubleValue(); + break; + case FLOAT: + result = accumulator.floatValue(); + break; + case DECIMAL: + result = accumulator; + break; + default: + break; + } + return (T) result; + } + } + + /** + * Built-in aggregation for AVG. + */ + public static final class Avg<T> extends BeamSqlUdaf<T, KV<BigDecimal, Long>, T> { + public static Avg create(SqlTypeName fieldType) { + switch (fieldType) { + case INTEGER: + return new BeamBuiltinAggregations.Avg<Integer>(fieldType); + case SMALLINT: + return new BeamBuiltinAggregations.Avg<Short>(fieldType); + case TINYINT: + return new BeamBuiltinAggregations.Avg<Byte>(fieldType); + case BIGINT: + return new BeamBuiltinAggregations.Avg<Long>(fieldType); + case FLOAT: + return new BeamBuiltinAggregations.Avg<Float>(fieldType); + case DOUBLE: + return new BeamBuiltinAggregations.Avg<Double>(fieldType); + case TIMESTAMP: + return new BeamBuiltinAggregations.Avg<Date>(fieldType); + case DECIMAL: + return new BeamBuiltinAggregations.Avg<BigDecimal>(fieldType); + default: + throw new UnsupportedOperationException( + String.format("[%s] is not support in AVG", fieldType)); + } + } + + private SqlTypeName fieldType; + private Avg(SqlTypeName fieldType) { + this.fieldType = fieldType; + } + + @Override + public KV<BigDecimal, Long> init() { + return KV.of(new BigDecimal(0), 0L); + } + + @Override + public KV<BigDecimal, Long> add(KV<BigDecimal, Long> accumulator, T input) { + return KV.of( + accumulator.getKey().add(new BigDecimal(input.toString())), + accumulator.getValue() + 1); + } + + @Override + public KV<BigDecimal, Long> merge(Iterable<KV<BigDecimal, Long>> accumulators) { + BigDecimal v = new BigDecimal(0); + long s = 0; + Iterator<KV<BigDecimal, Long>> ite = accumulators.iterator(); + while (ite.hasNext()) { + KV<BigDecimal, Long> r = ite.next(); + v = v.add(r.getKey()); + s += r.getValue(); + } + return KV.of(v, s); + } + + @Override + public T result(KV<BigDecimal, Long> accumulator) { + BigDecimal decimalAvg = accumulator.getKey().divide( + new BigDecimal(accumulator.getValue())); + Object result = null; + switch (fieldType) { + case INTEGER: + result = decimalAvg.intValue(); + break; + case BIGINT: + result = decimalAvg.longValue(); + break; + case SMALLINT: + result = decimalAvg.shortValue(); + break; + case TINYINT: + result = decimalAvg.byteValue(); + break; + case DOUBLE: + result = decimalAvg.doubleValue(); + break; + case FLOAT: + result = decimalAvg.floatValue(); + break; + case DECIMAL: + result = decimalAvg; + break; + default: + break; + } + return (T) result; + } + + @Override + public Coder<KV<BigDecimal, Long>> getAccumulatorCoder(CoderRegistry registry) + throws CannotProvideCoderException { + return KvCoder.of(BigDecimalCoder.of(), VarLongCoder.of()); + } + } + + /** + * Find {@link Coder} for Beam SQL field types. + */ + private static Coder getSqlTypeCoder(SqlTypeName sqlType) { + switch (sqlType) { + case INTEGER: + return VarIntCoder.of(); + case SMALLINT: + return SerializableCoder.of(Short.class); + case TINYINT: + return ByteCoder.of(); + case BIGINT: + return VarLongCoder.of(); + case FLOAT: + return SerializableCoder.of(Float.class); + case DOUBLE: + return DoubleCoder.of(); + case TIMESTAMP: + return SerializableCoder.of(Date.class); + case DECIMAL: + return BigDecimalCoder.of(); + default: + throw new UnsupportedOperationException( + String.format("Cannot find a Coder for data type [%s]", sqlType)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java new file mode 100644 index 0000000..d819421 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.transform; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.util.Pair; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation. + */ +public class BeamJoinTransforms { + + /** + * A {@code SimpleFunction} to extract join fields from the specified row. + */ + public static class ExtractJoinFields + extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { + private final boolean isLeft; + private final List<Pair<Integer, Integer>> joinColumns; + + public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) { + this.isLeft = isLeft; + this.joinColumns = joinColumns; + } + + @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + // build the type + // the name of the join field is not important + List<String> names = new ArrayList<>(joinColumns.size()); + List<Integer> types = new ArrayList<>(joinColumns.size()); + for (int i = 0; i < joinColumns.size(); i++) { + names.add("c" + i); + types.add(isLeft + ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : + input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); + } + BeamSqlRowType type = BeamSqlRowType.create(names, types); + + // build the row + BeamSqlRow row = new BeamSqlRow(type); + for (int i = 0; i < joinColumns.size(); i++) { + row.addField(i, input + .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue())); + } + return KV.of(row, input); + } + } + + + /** + * A {@code DoFn} which implement the sideInput-JOIN. + */ + public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { + private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView; + private final JoinRelType joinType; + private final BeamSqlRow rightNullRow; + private final boolean swap; + + public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow, + PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView, + boolean swap) { + this.joinType = joinType; + this.rightNullRow = rightNullRow; + this.sideInputView = sideInputView; + this.swap = swap; + } + + @ProcessElement public void processElement(ProcessContext context) { + BeamSqlRow key = context.element().getKey(); + BeamSqlRow leftRow = context.element().getValue(); + Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView); + Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key); + + if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) { + Iterator<BeamSqlRow> it = rightRowsIterable.iterator(); + while (it.hasNext()) { + context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap)); + } + } else { + if (joinType == JoinRelType.LEFT) { + context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap)); + } + } + } + } + + + /** + * A {@code SimpleFunction} to combine two rows into one. + */ + public static class JoinParts2WholeRow + extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> { + @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) { + KV<BeamSqlRow, BeamSqlRow> parts = input.getValue(); + BeamSqlRow leftRow = parts.getKey(); + BeamSqlRow rightRow = parts.getValue(); + return combineTwoRowsIntoOne(leftRow, rightRow, false); + } + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow, + BeamSqlRow rightRow, boolean swap) { + if (swap) { + return combineTwoRowsIntoOneHelper(rightRow, leftRow); + } else { + return combineTwoRowsIntoOneHelper(leftRow, rightRow); + } + } + + /** + * As the method name suggests: combine two rows into one wide row. + */ + private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow, + BeamSqlRow rightRow) { + // build the type + List<String> names = new ArrayList<>(leftRow.size() + rightRow.size()); + names.addAll(leftRow.getDataType().getFieldsName()); + names.addAll(rightRow.getDataType().getFieldsName()); + + List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); + types.addAll(leftRow.getDataType().getFieldsType()); + types.addAll(rightRow.getDataType().getFieldsType()); + BeamSqlRowType type = BeamSqlRowType.create(names, types); + + BeamSqlRow row = new BeamSqlRow(type); + // build the row + for (int i = 0; i < leftRow.size(); i++) { + row.addField(i, leftRow.getFieldValue(i)); + } + + for (int i = 0; i < rightRow.size(); i++) { + row.addField(i + leftRow.size(), rightRow.getFieldValue(i)); + } + + return row; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java new file mode 100644 index 0000000..8546160 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.transform; + +import java.util.Iterator; +import org.apache.beam.sdk.extensions.sql.rel.BeamSetOperatorRelBase; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations. + */ +public abstract class BeamSetOperatorsTransforms { + /** + * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}. + */ + public static class BeamSqlRow2KvFn extends + SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> { + @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) { + return KV.of(input, input); + } + } + + /** + * Filter function used for Set operators. + */ + public static class SetOperatorFilteringDoFn extends + DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> { + private TupleTag<BeamSqlRow> leftTag; + private TupleTag<BeamSqlRow> rightTag; + private BeamSetOperatorRelBase.OpType opType; + // ALL? + private boolean all; + + public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag, + BeamSetOperatorRelBase.OpType opType, boolean all) { + this.leftTag = leftTag; + this.rightTag = rightTag; + this.opType = opType; + this.all = all; + } + + @ProcessElement public void processElement(ProcessContext ctx) { + CoGbkResult coGbkResult = ctx.element().getValue(); + Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag); + Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag); + switch (opType) { + case UNION: + if (all) { + // output both left & right + Iterator<BeamSqlRow> iter = leftRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + iter = rightRows.iterator(); + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output the key + ctx.output(ctx.element().getKey()); + } + break; + case INTERSECT: + if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { + if (all) { + for (BeamSqlRow leftRow : leftRows) { + ctx.output(leftRow); + } + } else { + ctx.output(ctx.element().getKey()); + } + } + break; + case MINUS: + if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { + Iterator<BeamSqlRow> iter = leftRows.iterator(); + if (all) { + // output all + while (iter.hasNext()) { + ctx.output(iter.next()); + } + } else { + // only output one + ctx.output(iter.next()); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java new file mode 100644 index 0000000..372c38c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.transform; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step. + * + */ +public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> { + + private String stepName; + private BeamSqlExpressionExecutor executor; + + public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) { + super(); + this.stepName = stepName; + this.executor = executor; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + BeamSqlRow in = c.element(); + + List<Object> result = executor.execute(in); + + if ((Boolean) result.get(0)) { + c.output(in); + } + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java new file mode 100644 index 0000000..9221947 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.transform; + +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * A test PTransform to display output in console. + * + */ +public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> { + + private String stepName; + + public BeamSqlOutputToConsoleFn(String stepName) { + super(); + this.stepName = stepName; + } + + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println("Output: " + c.element().getDataValues()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java new file mode 100644 index 0000000..af398ea --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.transform; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor; +import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +/** + * + * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step. + * + */ +public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> { + private String stepName; + private BeamSqlExpressionExecutor executor; + private BeamSqlRowType outputRowType; + + public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor, + BeamSqlRowType outputRowType) { + super(); + this.stepName = stepName; + this.executor = executor; + this.outputRowType = outputRowType; + } + + @Setup + public void setup() { + executor.prepare(); + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + BeamSqlRow inputRow = c.element(); + List<Object> results = executor.execute(inputRow); + + BeamSqlRow outRow = new BeamSqlRow(outputRowType); + outRow.updateWindowRange(inputRow, window); + + for (int idx = 0; idx < results.size(); ++idx) { + BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx)); + } + + c.output(outRow); + } + + @Teardown + public void close() { + executor.close(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java new file mode 100644 index 0000000..7797ddf --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql pipeline. + */ +package org.apache.beam.sdk.extensions.sql.transform; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java new file mode 100644 index 0000000..9970955 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.utils; + +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Utility methods for Calcite related operations. + */ +public class CalciteUtils { + private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>(); + private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>(); + static { + JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT); + JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT); + JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER); + JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT); + + JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT); + JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE); + + JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL); + + JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR); + JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR); + + JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE); + JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME); + JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP); + + JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN); + + for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) { + CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey()); + } + } + + /** + * Get the corresponding {@code SqlTypeName} for an integer sql type. + */ + public static SqlTypeName toCalciteType(int type) { + return JAVA_TO_CALCITE_MAPPING.get(type); + } + + /** + * Get the integer sql type from Calcite {@code SqlTypeName}. + */ + public static Integer toJavaType(SqlTypeName typeName) { + return CALCITE_TO_JAVA_MAPPING.get(typeName); + } + + /** + * Get the {@code SqlTypeName} for the specified column of a table. + */ + public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) { + return toCalciteType(schema.getFieldsType().get(index)); + } + + /** + * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table. + */ + public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) { + List<String> fieldNames = new ArrayList<>(); + List<Integer> fieldTypes = new ArrayList<>(); + for (RelDataTypeField f : tableInfo.getFieldList()) { + fieldNames.add(f.getName()); + fieldTypes.add(toJavaType(f.getType().getSqlTypeName())); + } + return BeamSqlRowType.create(fieldNames, fieldTypes); + } + + /** + * Create an instance of {@code RelDataType} so it can be used to create a table. + */ + public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) { + return new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a) { + RelDataTypeFactory.FieldInfoBuilder builder = a.builder(); + for (int idx = 0; idx < that.getFieldsName().size(); ++idx) { + builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx))); + } + return builder.build(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java new file mode 100644 index 0000000..e4d6148 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utility classes. + */ +package org.apache.beam.sdk.extensions.sql.utils; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java deleted file mode 100644 index 922931c..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql; - -import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableSet; -import java.util.Set; -import org.apache.beam.sdk.util.ApiSurface; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Surface test for BeamSql api. - */ -@RunWith(JUnit4.class) -public class BeamSqlApiSurfaceTest { - @Test - public void testSdkApiSurface() throws Exception { - - @SuppressWarnings("unchecked") - final Set<String> allowed = - ImmutableSet.of( - "org.apache.beam", - "org.joda.time", - "org.apache.commons.csv"); - - ApiSurface surface = ApiSurface - .ofClass(BeamSqlCli.class) - .includingClass(BeamSql.class) - .includingClass(BeamSqlEnv.class) - .includingPackage("org.apache.beam.dsls.sql.schema", - getClass().getClassLoader()) - .pruningPrefix("java") - .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*Test") - .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*TestBase"); - - assertThat(surface, containsOnlyPackages(allowed)); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java deleted file mode 100644 index a142514..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.junit.Test; - -/** - * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window - * with BOUNDED PCollection. - */ -public class BeamSqlDslAggregationTest extends BeamSqlDslBase { - /** - * GROUP-BY with single aggregation function with bounded PCollection. - */ - @Test - public void testAggregationWithoutWindowWithBounded() throws Exception { - runAggregationWithoutWindow(boundedInput1); - } - - /** - * GROUP-BY with single aggregation function with unbounded PCollection. - */ - @Test - public void testAggregationWithoutWindowWithUnbounded() throws Exception { - runAggregationWithoutWindow(unboundedInput1); - } - - private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; - - PCollection<BeamSqlRow> result = - input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int2", 0); - record.addField("size", 4L); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with multiple aggregation functions with bounded PCollection. - */ - @Test - public void testAggregationFunctionsWithBounded() throws Exception{ - runAggregationFunctions(boundedInput1); - } - - /** - * GROUP-BY with multiple aggregation functions with unbounded PCollection. - */ - @Test - public void testAggregationFunctionsWithUnbounded() throws Exception{ - runAggregationFunctions(unboundedInput1); - } - - private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{ - String sql = "select f_int2, count(*) as size, " - + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1," - + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2," - + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3," - + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4," - + "sum(f_double) as sum5, avg(f_double) as avg5, " - + "max(f_double) as max5, min(f_double) as min5," - + "max(f_timestamp) as max6, min(f_timestamp) as min6 " - + "FROM TABLE_A group by f_int2"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testAggregationFunctions", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", - "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", - "max5", "min5", "max6", "min6"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, - Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT, - Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, - Types.TIMESTAMP, Types.TIMESTAMP)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int2", 0); - record.addField("size", 4L); - - record.addField("sum1", 10000L); - record.addField("avg1", 2500L); - record.addField("max1", 4000L); - record.addField("min1", 1000L); - - record.addField("sum2", (short) 10); - record.addField("avg2", (short) 2); - record.addField("max2", (short) 4); - record.addField("min2", (short) 1); - - record.addField("sum3", (byte) 10); - record.addField("avg3", (byte) 2); - record.addField("max3", (byte) 4); - record.addField("min3", (byte) 1); - - record.addField("sum4", 10.0F); - record.addField("avg4", 2.5F); - record.addField("max4", 4.0F); - record.addField("min4", 1.0F); - - record.addField("sum5", 10.0); - record.addField("avg5", 2.5); - record.addField("max5", 4.0); - record.addField("min5", 1.0); - - record.addField("max6", FORMAT.parse("2017-01-01 02:04:03")); - record.addField("min6", FORMAT.parse("2017-01-01 01:01:03")); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * Implicit GROUP-BY with DISTINCT with bounded PCollection. - */ - @Test - public void testDistinctWithBounded() throws Exception { - runDistinct(boundedInput1); - } - - /** - * Implicit GROUP-BY with DISTINCT with unbounded PCollection. - */ - @Test - public void testDistinctWithUnbounded() throws Exception { - runDistinct(unboundedInput1); - } - - private void runDistinct(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; - - PCollection<BeamSqlRow> result = - input.apply("testDistinct", BeamSql.simpleQuery(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int", 1); - record1.addField("f_long", 1000L); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int", 2); - record2.addField("f_long", 2000L); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int", 3); - record3.addField("f_long", 3000L); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int", 4); - record4.addField("f_long", 4000L); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection. - */ - @Test - public void testTumbleWindowWithBounded() throws Exception { - runTumbleWindow(boundedInput1); - } - - /** - * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection. - */ - @Test - public void testTumbleWindowWithUnbounded() throws Exception { - runTumbleWindow(unboundedInput1); - } - - private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," - + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`" - + " FROM TABLE_A" - + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testTumbleWindow", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "window_start"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); - record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); - record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 1L); - record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); - record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); - - PAssert.that(result).containsInAnyOrder(record1, record2); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection. - */ - @Test - public void testHopWindowWithBounded() throws Exception { - runHopWindow(boundedInput1); - } - - /** - * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection. - */ - @Test - public void testHopWindowWithUnbounded() throws Exception { - runHopWindow(unboundedInput1); - } - - private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," - + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`" - + " FROM PCOLLECTION" - + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; - PCollection<BeamSqlRow> result = - input.apply("testHopWindow", BeamSql.simpleQuery(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "window_start"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00")); - record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime())); - record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 3L); - record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); - record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); - record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int2", 0); - record3.addField("size", 1L); - record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00")); - record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); - record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime())); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int2", 0); - record4.addField("size", 1L); - record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); - record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); - record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * GROUP-BY with SESSION window with bounded PCollection. - */ - @Test - public void testSessionWindowWithBounded() throws Exception { - runSessionWindow(boundedInput1); - } - - /** - * GROUP-BY with SESSION window with unbounded PCollection. - */ - @Test - public void testSessionWindowWithUnbounded() throws Exception { - runSessionWindow(unboundedInput1); - } - - private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," - + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`" - + " FROM TABLE_A" - + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testSessionWindow", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create( - Arrays.asList("f_int2", "size", "window_start"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03")); - record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime())); - record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime())); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 1L); - record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03")); - record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime())); - record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime())); - - PAssert.that(result).containsInAnyOrder(record1, record2); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testWindowOnNonTimestampField() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage( - "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " - + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) - .apply("testWindowOnNonTimestampField", BeamSql.query(sql)); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testUnsupportedDistinct() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Encountered \"*\""); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2"; - - PCollection<BeamSqlRow> result = - boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); - - pipeline.run().waitUntilFinish(); - } -}
