[Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f4689da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f4689da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f4689da Branch: refs/heads/tableOnCalcite Commit: 4f4689da2ec55b7fb07d3ea8e79a139ee49246c2 Parents: 0a628d9 Author: chengxiang li <[email protected]> Authored: Mon Feb 1 15:18:14 2016 +0800 Committer: vasia <[email protected]> Committed: Thu Feb 11 16:48:08 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/plan/PlanGenException.scala | 26 ++++ .../flink/api/table/plan/TypeConverter.scala | 13 +- .../plan/functions/AggregateFunction.scala | 71 +++++++++ .../table/plan/functions/FunctionUtils.scala | 37 +++++ .../plan/functions/aggregate/Aggregate.scala | 42 ++++++ .../functions/aggregate/AggregateFactory.scala | 135 +++++++++++++++++ .../plan/functions/aggregate/AvgAggregate.scala | 148 +++++++++++++++++++ .../functions/aggregate/CountAggregate.scala | 34 +++++ .../plan/functions/aggregate/MaxAggregate.scala | 136 +++++++++++++++++ .../plan/functions/aggregate/MinAggregate.scala | 136 +++++++++++++++++ .../plan/functions/aggregate/SumAggregate.scala | 130 ++++++++++++++++ .../plan/nodes/dataset/DataSetGroupReduce.scala | 6 +- .../table/plan/nodes/dataset/DataSetJoin.scala | 6 +- .../plan/nodes/dataset/DataSetReduce.scala | 6 +- .../rules/dataset/DataSetAggregateRule.scala | 17 ++- .../plan/rules/dataset/DataSetJoinRule.scala | 102 ++++++++++++- 16 files changed, 1025 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala new file mode 100644 index 0000000..2fd400d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan + +class PlanGenException(message: String, exception: Exception) extends + RuntimeException(message: String, exception: Exception){ + + def this(message: String){ + this(message, null) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index 227b3e8..b7cb200 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.plan import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.rel.core.JoinRelType._ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} @@ -29,8 +29,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.api.table.typeinfo.RowTypeInfo import org.apache.flink.api.table.{Row, TableException} - import scala.collection.JavaConversions._ +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.sql.`type`.SqlTypeName object TypeConverter { @@ -139,4 +142,10 @@ object TypeConverter { returnType.asInstanceOf[TypeInformation[Any]] } + def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match { + case INNER => JoinType.INNER + case LEFT => JoinType.LEFT_OUTER + case RIGHT => JoinType.RIGHT_OUTER + case FULL => JoinType.FULL_OUTER + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala new file mode 100644 index 0000000..4abf2d2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions + +import java.lang.Iterable + +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input, + * feed to the aggregates, and collect the record with aggregated value. + * + * @param aggregates Sql aggregate functions. + * @param fields The grouped keys' index. + */ +class AggregateFunction( + private val aggregates: Array[Aggregate[_ <: Any]], + private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] { + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(fields) + Preconditions.checkArgument(aggregates.size == fields.size) + + aggregates.foreach(_.initiateAggregate) + } + + override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = { + var currentValue: Any = null + + // iterate all input records, feed to each aggregate. + val aggregateAndField = aggregates.zip(fields) + records.foreach { + value => + currentValue = value + aggregateAndField.foreach { + case (aggregate, field) => + aggregate.aggregate(FunctionUtils.getFieldValue(value, field)) + } + } + + // reuse the latest record, and set all the aggregated values. + aggregateAndField.foreach { + case (aggregate, field) => + FunctionUtils.putFieldValue(currentValue, field, aggregate.getAggregated()) + } + + out.collect(currentValue) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala new file mode 100644 index 0000000..9d62b7c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions + +import org.apache.flink.api.table.Row + +object FunctionUtils { + + def getFieldValue(record: Any, fieldIndex: Int): Any = { + record match { + case row: Row => row.productElement(fieldIndex) + case _ => throw new UnsupportedOperationException("Do not support types other than Row now.") + } + } + + def putFieldValue(record: Any, fieldIndex: Int, fieldValue: Any): Unit = { + record match { + case row: Row => row.setField(fieldIndex, fieldValue) + case _ => throw new UnsupportedOperationException("Do not support types other than Row now.") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala new file mode 100644 index 0000000..5800d5f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +/** + * Represent a Sql aggregate function, user should initiate the aggregate at first, then feed it + * with grouped aggregate field values, and get aggregated value finally. + * @tparam T + */ +trait Aggregate[T] { + /** + * Initiate current aggregate state. + */ + def initiateAggregate + + /** + * Feed the aggregate field value. + * @param value + */ + def aggregate(value: Any) + + /** + * Return final aggregated value. + * @return + */ + def getAggregated(): T +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala new file mode 100644 index 0000000..a95a163 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +import java.util + +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun._ +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.plan.PlanGenException +import org.apache.flink.api.table.plan.functions.AggregateFunction + +object AggregateFactory { + + def createAggregateInstance(aggregateCalls: Seq[AggregateCall]): + RichGroupReduceFunction[Any, Any] = { + + val fieldIndexes = new Array[Int](aggregateCalls.size) + val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size) + aggregateCalls.zipWithIndex.map { case (aggregateCall, index) => + val sqlType = aggregateCall.getType + val argList: util.List[Integer] = aggregateCall.getArgList + // currently assume only aggregate on singleton field. + if (argList.isEmpty) { + if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { + fieldIndexes(index) = 0 + } else { + throw new PlanGenException("Aggregate fields should not be empty.") + } + } else { + fieldIndexes(index) = argList.get(0); + } + aggregateCall.getAggregation match { + case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => { + sqlType.getSqlTypeName match { + case TINYINT => + aggregates(index) = new TinyIntSumAggregate + case SMALLINT => + aggregates(index) = new SmallIntSumAggregate + case INTEGER => + aggregates(index) = new IntSumAggregate + case BIGINT => + aggregates(index) = new LongSumAggregate + case FLOAT => + aggregates(index) = new FloatSumAggregate + case DOUBLE => + aggregates(index) = new DoubleSumAggregate + case sqlType: SqlTypeName => + throw new PlanGenException("Sum aggregate does no support type:" + sqlType) + } + } + case _: SqlAvgAggFunction => { + sqlType.getSqlTypeName match { + case TINYINT => + aggregates(index) = new TinyIntAvgAggregate + case SMALLINT => + aggregates(index) = new SmallIntAvgAggregate + case INTEGER => + aggregates(index) = new IntAvgAggregate + case BIGINT => + aggregates(index) = new LongAvgAggregate + case FLOAT => + aggregates(index) = new FloatAvgAggregate + case DOUBLE => + aggregates(index) = new DoubleAvgAggregate + case sqlType: SqlTypeName => + throw new PlanGenException("Avg aggregate does no support type:" + sqlType) + } + } + case sqlMinMaxFunction: SqlMinMaxAggFunction => { + if (sqlMinMaxFunction.isMin) { + sqlType.getSqlTypeName match { + case TINYINT => + aggregates(index) = new TinyIntMinAggregate + case SMALLINT => + aggregates(index) = new SmallIntMinAggregate + case INTEGER => + aggregates(index) = new IntMinAggregate + case BIGINT => + aggregates(index) = new LongMinAggregate + case FLOAT => + aggregates(index) = new FloatMinAggregate + case DOUBLE => + aggregates(index) = new DoubleMinAggregate + case sqlType: SqlTypeName => + throw new PlanGenException("Min aggregate does no support type:" + sqlType) + } + } else { + sqlType.getSqlTypeName match { + case TINYINT => + aggregates(index) = new TinyIntMaxAggregate + case SMALLINT => + aggregates(index) = new SmallIntMaxAggregate + case INTEGER => + aggregates(index) = new IntMaxAggregate + case BIGINT => + aggregates(index) = new LongMaxAggregate + case FLOAT => + aggregates(index) = new FloatMaxAggregate + case DOUBLE => + aggregates(index) = new DoubleMaxAggregate + case sqlType: SqlTypeName => + throw new PlanGenException("Max aggregate does no support type:" + sqlType) + } + } + } + case _: SqlCountAggFunction => + aggregates(index) = new CountAggregate + case unSupported: SqlAggFunction => + throw new PlanGenException("unsupported Function: " + unSupported.getName) + } + } + + new AggregateFunction(aggregates, fieldIndexes) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala new file mode 100644 index 0000000..e9c5f8f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class AvgAggregate[T] extends Aggregate[T] { + +} + +// TinyInt average aggregate return Int as aggregated value. +class TinyIntAvgAggregate extends AvgAggregate[Int] { + private var avgValue: Double = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { + avgValue = 0 + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Byte] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Int = { + avgValue.toInt + } +} + +// SmallInt average aggregate return Int as aggregated value. +class SmallIntAvgAggregate extends AvgAggregate[Int] { + private var avgValue: Double = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { + avgValue = 0 + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Short] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Int = { + avgValue.toInt + } +} + +// Int average aggregate return Int as aggregated value. +class IntAvgAggregate extends AvgAggregate[Int] { + private var avgValue: Double = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { + avgValue = 0 + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Int] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Int = { + avgValue.toInt + } +} + +// Long average aggregate return Long as aggregated value. +class LongAvgAggregate extends AvgAggregate[Long] { + private var avgValue: Double = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { + avgValue = 0 + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Long] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Long = { + avgValue.toLong + } +} + +// Float average aggregate return Float as aggregated value. +class FloatAvgAggregate extends AvgAggregate[Float] { + private var avgValue: Double = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { + avgValue = 0 + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Float] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Float = { + avgValue.toFloat + } +} + +// Double average aggregate return Double as aggregated value. +class DoubleAvgAggregate extends AvgAggregate[Double] { + private var avgValue: Double = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { + avgValue = 0 + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Double] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Double = { + avgValue + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala new file mode 100644 index 0000000..ab6b170 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +class CountAggregate extends Aggregate[Long] { + private var count: Long = 0L + + override def initiateAggregate: Unit = { + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + } + + override def getAggregated(): Long = { + count + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala new file mode 100644 index 0000000..072eb3f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class MaxAggregate[T] extends Aggregate[T]{ + +} + +class TinyIntMaxAggregate extends MaxAggregate[Byte] { + private var max = Byte.MaxValue + + override def initiateAggregate: Unit = { + max = Byte.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Byte] + if (current < max) { + max = current + } + } + + override def getAggregated(): Byte = { + max + } +} + +class SmallIntMaxAggregate extends MaxAggregate[Short] { + private var max = Short.MaxValue + + override def initiateAggregate: Unit = { + max = Short.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Short] + if (current < max) { + max = current + } + } + + override def getAggregated(): Short = { + max + } +} + +class IntMaxAggregate extends MaxAggregate[Int] { + private var max = Int.MaxValue + + override def initiateAggregate: Unit = { + max = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Int] + if (current < max) { + max = current + } + } + + override def getAggregated(): Int = { + max + } +} + +class LongMaxAggregate extends MaxAggregate[Long] { + private var max = Long.MaxValue + + override def initiateAggregate: Unit = { + max = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Long] + if (current < max) { + max = current + } + } + + override def getAggregated(): Long = { + max + } +} + +class FloatMaxAggregate extends MaxAggregate[Float] { + private var max = Float.MaxValue + + override def initiateAggregate: Unit = { + max = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Float] + if (current < max) { + max = current + } + } + + override def getAggregated(): Float = { + max + } +} + +class DoubleMaxAggregate extends MaxAggregate[Double] { + private var max = Double.MaxValue + + override def initiateAggregate: Unit = { + max = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Double] + if (current < max) { + max = current + } + } + + override def getAggregated(): Double = { + max + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala new file mode 100644 index 0000000..c233b8e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class MinAggregate[T] extends Aggregate[T]{ + +} + +class TinyIntMinAggregate extends MinAggregate[Byte] { + private var min = Byte.MaxValue + + override def initiateAggregate: Unit = { + min = Byte.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Byte] + if (current < min) { + min = current + } + } + + override def getAggregated(): Byte = { + min + } +} + +class SmallIntMinAggregate extends MinAggregate[Short] { + private var min = Short.MaxValue + + override def initiateAggregate: Unit = { + min = Short.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Short] + if (current < min) { + min = current + } + } + + override def getAggregated(): Short = { + min + } +} + +class IntMinAggregate extends MinAggregate[Int] { + private var min = Int.MaxValue + + override def initiateAggregate: Unit = { + min = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Int] + if (current < min) { + min = current + } + } + + override def getAggregated(): Int = { + min + } +} + +class LongMinAggregate extends MinAggregate[Long] { + private var min = Long.MaxValue + + override def initiateAggregate: Unit = { + min = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Long] + if (current < min) { + min = current + } + } + + override def getAggregated(): Long = { + min + } +} + +class FloatMinAggregate extends MinAggregate[Float] { + private var min = Float.MaxValue + + override def initiateAggregate: Unit = { + min = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Float] + if (current < min) { + min = current + } + } + + override def getAggregated(): Float = { + min + } +} + +class DoubleMinAggregate extends MinAggregate[Double] { + private var min = Double.MaxValue + + override def initiateAggregate: Unit = { + min = Int.MaxValue + } + + override def aggregate(value: Any): Unit = { + val current = value.asInstanceOf[Double] + if (current < min) { + min = current + } + } + + override def getAggregated(): Double = { + min + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala new file mode 100644 index 0000000..14d1a73 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.functions.aggregate + +abstract class SumAggregate[T] extends Aggregate[T]{ + +} + +// TinyInt sum aggregate return Int as aggregated value. +class TinyIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { + sumValue = 0 + } + + + override def getAggregated(): Int = { + sumValue + } + + override def aggregate(value: Any): Unit = { + sumValue += value.asInstanceOf[Byte] + } +} + +// SmallInt sum aggregate return Int as aggregated value. +class SmallIntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { + sumValue = 0 + } + + override def getAggregated(): Int = { + sumValue + } + + override def aggregate(value: Any): Unit = { + sumValue += value.asInstanceOf[Short] + } +} + +// Int sum aggregate return Int as aggregated value. +class IntSumAggregate extends SumAggregate[Int] { + + private var sumValue: Int = 0 + + override def initiateAggregate: Unit = { + sumValue = 0 + } + + + override def getAggregated(): Int = { + sumValue + } + + override def aggregate(value: Any): Unit = { + sumValue += value.asInstanceOf[Int] + } +} + +// Long sum aggregate return Long as aggregated value. +class LongSumAggregate extends SumAggregate[Long] { + + private var sumValue: Long = 0L + + override def initiateAggregate: Unit = { + sumValue = 0 + } + + override def aggregate(value: Any): Unit = { + sumValue += value.asInstanceOf[Long] + } + + override def getAggregated(): Long = { + sumValue + } +} + +// Float sum aggregate return Float as aggregated value. +class FloatSumAggregate extends SumAggregate[Float] { + private var sumValue: Float = 0 + + override def initiateAggregate: Unit = { + sumValue = 0 + } + + override def aggregate(value: Any): Unit = { + sumValue += value.asInstanceOf[Float] + } + + override def getAggregated(): Float = { + sumValue + } +} + +// Double sum aggregate return Double as aggregated value. +class DoubleSumAggregate extends SumAggregate[Double] { + private var sumValue: Double = 0 + + override def initiateAggregate: Unit = { + sumValue = 0 + } + + override def aggregate(value: Any): Unit = { + sumValue += value.asInstanceOf[Double] + } + + override def getAggregated(): Double = { + sumValue + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala index ae76d29..70810c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -18,9 +18,9 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet @@ -36,7 +36,7 @@ class DataSetGroupReduce( rowType: RelDataType, opName: String, groupingKeys: Array[Int], - func: GroupReduceFunction[Row, Row]) + func: GroupReduceFunction[Any, Any]) extends SingleRel(cluster, traitSet, input) with DataSetRel { http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index de436be..6f988be 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -18,9 +18,9 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.api.common.functions.JoinFunction import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.typeinfo.TypeInformation @@ -42,7 +42,7 @@ class DataSetJoin( joinKeysRight: Array[Int], joinType: JoinType, joinHint: JoinHint, - func: JoinFunction[Row, Row, Row]) + func: JoinFunction[Any, Any, Any]) extends BiRel(cluster, traitSet, left, right) with DataSetRel { http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala index e6fc0f9..361f869 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala @@ -18,9 +18,9 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet @@ -36,7 +36,7 @@ class DataSetReduce( rowType: RelDataType, opName: String, groupingKeys: Array[Int], - func: ReduceFunction[Row]) + func: ReduceFunction[Any]) extends SingleRel(cluster, traits, input) with DataSetRel { http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala index 1d17d63..9ecd9d0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala @@ -21,9 +21,12 @@ package org.apache.flink.api.table.plan.rules.dataset import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetReduce} +import org.apache.flink.api.table.plan.functions.aggregate.AggregateFactory +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetGroupReduce} import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, FlinkConvention} +import scala.collection.JavaConversions._ + class DataSetAggregateRule extends ConverterRule( classOf[FlinkAggregate], @@ -37,14 +40,20 @@ class DataSetAggregateRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) - new DataSetReduce( + val grouping = agg.getGroupSet.asList().map { + case a: java.lang.Integer => a.intValue + }.toArray + + val aggregateFunction = AggregateFactory.createAggregateInstance(agg.getAggCallList) + + new DataSetGroupReduce( rel.getCluster, traitSet, convInput, rel.getRowType, agg.toString, - Array[Int](), - null) + grouping, + aggregateFunction) } } http://git-wip-us.apache.org/repos/asf/flink/blob/4f4689da/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala index 3d2117d..69c86c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala @@ -20,10 +20,17 @@ package org.apache.flink.api.table.plan.rules.dataset import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataType} import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.java.operators.join.JoinType +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, FlinkJoin} +import org.apache.flink.api.table.plan.TypeConverter._ + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer class DataSetJoinRule extends ConverterRule( @@ -39,6 +46,10 @@ class DataSetJoinRule val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) + val joinKeys = getJoinKeys(join) + + // There would be a FlinkProject after FlinkJoin to handle the output fields afterward join, + // so we do not need JoinFunction here by now. new DataSetJoin( rel.getCluster, traitSet, @@ -46,12 +57,93 @@ class DataSetJoinRule convRight, rel.getRowType, join.toString, - Array[Int](), - Array[Int](), - JoinType.INNER, + joinKeys._1, + joinKeys._2, + sqlJoinTypeToFlinkJoinType(join.getJoinType), null, null) } + + private def getJoinKeys(join: FlinkJoin): (Array[Int], Array[Int]) = { + val joinKeys = ArrayBuffer.empty[(Int, Int)] + parseJoinRexNode(join.getCondition.asInstanceOf[RexCall], joinKeys) + + val joinedRowType= join.getRowType + val leftRowType = join.getLeft.getRowType + val rightRowType = join.getRight.getRowType + + // The fetched join key index from Calcite is based on joined row type, we need + // the join key index based on left/right input row type. + val joinKeyPairs: ArrayBuffer[(Int, Int)] = joinKeys.map { + case (first, second) => + var leftIndex = findIndexInSingleInput(first, joinedRowType, leftRowType) + if (leftIndex == -1) { + leftIndex = findIndexInSingleInput(second, joinedRowType, leftRowType) + if (leftIndex == -1) { + throw new PlanGenException("Invalid join condition, could not find " + + joinedRowType.getFieldNames.get(first) + " and " + + joinedRowType.getFieldNames.get(second) + " in left table") + } + val rightIndex = findIndexInSingleInput(first, joinedRowType, rightRowType) + if (rightIndex == -1) { + throw new PlanGenException("Invalid join condition could not find " + + joinedRowType.getFieldNames.get(first) + " in right table") + } + (leftIndex, rightIndex) + } else { + val rightIndex = findIndexInSingleInput(second, joinedRowType, rightRowType) + if (rightIndex == -1) { + throw new PlanGenException("Invalid join condition could not find " + + joinedRowType.getFieldNames.get(second) + " in right table") + } + (leftIndex, rightIndex) + } + } + + val joinKeysPair = joinKeyPairs.unzip + + (joinKeysPair._1.toArray, joinKeysPair._2.toArray) + } + + // Parse the join condition recursively, find all the join keys' index. + private def parseJoinRexNode(condition: RexCall, joinKeys: ArrayBuffer[(Int, Int)]): Unit = { + condition.getOperator.getKind match { + case SqlKind.AND => + condition.getOperands.foreach { + operand => parseJoinRexNode(operand.asInstanceOf[RexCall], joinKeys) + } + case SqlKind.EQUALS => + val operands = condition.getOperands + val leftIndex = operands(0).asInstanceOf[RexInputRef].getIndex + val rightIndex = operands(1).asInstanceOf[RexInputRef].getIndex + joinKeys += (leftIndex -> rightIndex) + case _ => + // Do not support operands like OR in join condition due to the limitation + // of current Flink JoinOperator implementation. + throw new PlanGenException("Do not support operands other than " + + "AND and Equals in join condition now.") + } + } + + // Find the field index of input row type. + private def findIndexInSingleInput( + globalIndex: Int, + joinedRowType: RelDataType, + inputRowType: RelDataType): Int = { + + val fieldType: RelDataTypeField = joinedRowType.getFieldList.get(globalIndex) + inputRowType.getFieldList.zipWithIndex.foreach { + case (inputFieldType, index) => + if (inputFieldType.getName.equals(fieldType.getName) + && inputFieldType.getType.equals(fieldType.getType)) { + + return index + } + } + + // return -1 if match none field of input row type. + -1 + } } object DataSetJoinRule {
