Repository: flink Updated Branches: refs/heads/release-1.3 353d60045 -> 0f86deed2
[FLINK-6650] [table] Improve the error message for toAppendStream This closes #3958. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f86deed Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f86deed Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f86deed Branch: refs/heads/release-1.3 Commit: 0f86deed28ab326c8cdb886e3b5ea32da76beab7 Parents: 353d600 Author: sunjincheng121 <[email protected]> Authored: Mon May 22 17:04:13 2017 +0800 Committer: twalthr <[email protected]> Committed: Wed May 24 16:05:04 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/table/api/StreamTableEnvironment.scala | 2 +- .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala | 4 ++-- .../table/plan/nodes/datastream/DataStreamOverAggregate.scala | 4 ++-- .../org/apache/flink/table/runtime/aggregate/AggregateUtil.scala | 3 +++ 4 files changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0f86deed/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index c430b21..bc5038d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -630,7 +630,7 @@ abstract class StreamTableEnvironment( if (!withChangeFlag && !isAppendOnly(logicalPlan)) { throw new TableException( "Table is not an append-only table. " + - "Output needs to handle update and delete changes.") + "Use the toRetractStream() in order to handle add and retract messages.") } // get CRow plan http://git-wip-us.apache.org/repos/asf/flink/blob/0f86deed/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 1ac013a..d860cbe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -126,9 +126,9 @@ class DataStreamGroupWindowAggregate( val physicalNamedProperties = namedProperties .filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType)) - val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input) - if (consumeRetraction) { + if (inputIsAccRetract) { throw new TableException( "Retraction on windowed GroupBy aggregation is not supported yet. " + "Note: Windowed GroupBy aggregation should not follow a " + http://git-wip-us.apache.org/repos/asf/flink/blob/0f86deed/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index a9fbf02..08f0356 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -116,9 +116,9 @@ class DataStreamOverAggregate( val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) - val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input) - if (consumeRetraction) { + if (inputIsAccRetract) { throw new TableException( "Retraction on Over window aggregation is not supported yet. " + "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") http://git-wip-us.apache.org/repos/asf/flink/blob/0f86deed/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 8073959..2907b99 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -152,6 +152,9 @@ object AggregateUtil { * @param inputRowType Input row type * @param inputFieldTypes Types of the physical input fields * @param groupings the position (in the input Row) of the grouping keys + * @param queryConfig The configuration of the query to generate. + * @param generateRetraction It is a tag that indicates whether generate retract record. + * @param consumeRetraction It is a tag that indicates whether consume the retract record. * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ private[flink] def createGroupAggregateFunction(
