[
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689481#comment-15689481
]
ASF GitHub Bot commented on FLINK-4832:
---------------------------------------
Github user ex00 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2840#discussion_r89277461
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
---
@@ -157,4 +161,41 @@ class DataSetAggregate(
case _ => result
}
}
+
+ /**
+ * Dummy [[Row]] into a [[DataSet]] for result after map operations.
+ * @param mapOperator after which insert dummy records
+ * @param tableEnv [[BatchTableEnvironment]] for getting rel builder
and type factory
+ * @tparam IN mapOperator input type
+ * @tparam OUT mapOperator output type
+ * @return DataSet of type Row is contains null literals for columns
+ */
+ private def dummyRow[IN,OUT](
+ mapOperator: MapOperator[IN,OUT],
+ tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+ val builder: RelDataTypeFactory.FieldInfoBuilder =
getCluster.getTypeFactory.builder
+ val rowInfo = mapOperator.getResultType.asInstanceOf[RowTypeInfo]
+
+ val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] =
+ ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+ for (fieldName <- rowInfo.getFieldNames)
+ yield {
+ val columnType = tableEnv.getTypeFactory
+ .createTypeFromTypeInfo(rowInfo.getTypeAt(fieldName))
+ builder.add(fieldName, columnType)
+ tableEnv.getRelBuilder.getRexBuilder
+ .makeLiteral(null,columnType,false).asInstanceOf[RexLiteral]
+ }))
+
+ val dataType = builder.build()
+
+ val relNode = RelFactories.DEFAULT_VALUES_FACTORY
+ .createValues(getCluster, dataType, nullLiterals)
+
+ DataSetValuesRule.INSTANCE.asInstanceOf[DataSetValuesRule]
--- End diff --
sorry, I don't understand your point. Could you explain, please?
Do you propose add new `DataSetNullValues extends LogicalAggregate with
DataSetRel` for opportunity translate this to `DataSet`? and create rule for
initialize new DataSetNullValues. It is correct?
Could I use ` DataSetValues` directly, maybe? for example
```scala
val relNode = RelFactories.DEFAULT_VALUES_FACTORY
.createValues(getCluster, dataType, nullLiterals)
new DataSetValues(
cluster,
relNode.getTraitSet.replace(DataSetConvention.INSTANCE),
dataType,
nullLiterals,
"DataSetNullValuesRule"
).translateToPlan(tableEnv,Some(TypeConverter.DEFAULT_ROW_TYPE)).asInstanceOf[DataSet[Row]]
```
> Count/Sum 0 elements
> --------------------
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should
> improve DataSet aggregations for this. Maybe by union the original DataSet
> with a dummy record or by using a MapPartition function. Coming up with a
> good design for this is also part of this issue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)