[
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15817983#comment-15817983
]
ASF GitHub Bot commented on FLINK-4692:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/2938#discussion_r94944955
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetWindowAggregate.scala
---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner,
RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.api.table.{BatchTableEnvironment,
FlinkTypeFactory, Row}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.typeutils._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Flink RelNode which matches along with a LogicalWindowAggregate.
+ */
+class DataSetWindowAggregate(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs:
java.util.List[RelNode]): RelNode = {
+ new DataSetWindowAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }
+ }window: ($window), " +
+ s"select: (${
+ aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+ }))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping),
!grouping.isEmpty)
+ .item("window", window)
+ .item(
+ "select", aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties))
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata:
RelMetadataQuery): RelOptCost = {
+ val child = this.getInput
+ val rowCnt = metadata.getRowCount(child)
+ val rowSize = this.estimateRowSize(child.getRowType)
+ val aggCnt = this.namedAggregates.size
+ planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt *
rowSize)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
+ tableEnv,
+ // tell the input operator that this operator currently only
supports Rows as input
+ Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+ val result = window match {
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ createEventTimeTumblingWindowDataSet(inputDS,
isTimeInterval(size.resultType))
+ case EventTimeSessionGroupWindow(_, _, _) =>
+ throw new UnsupportedOperationException(
+ "Event-time session windows on batch are currently not
supported")
--- End diff --
I would use "in a batch environment" instead of "on batch" in general.
> Add tumbling group-windows for batch tables
> -------------------------------------------
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Jark Wu
>
> Add Tumble group-windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)