Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r88978610 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -157,4 +161,41 @@ class DataSetAggregate( case _ => result } } + + /** + * Dummy [[Row]] into a [[DataSet]] for result after map operations. + * @param mapOperator after which insert dummy records + * @param tableEnv [[BatchTableEnvironment]] for getting rel builder and type factory + * @tparam IN mapOperator input type + * @tparam OUT mapOperator output type + * @return DataSet of type Row is contains null literals for columns + */ + private def dummyRow[IN,OUT]( + mapOperator: MapOperator[IN,OUT], + tableEnv: BatchTableEnvironment): DataSet[Row] = { + + val builder: RelDataTypeFactory.FieldInfoBuilder = getCluster.getTypeFactory.builder + val rowInfo = mapOperator.getResultType.asInstanceOf[RowTypeInfo] + + val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] = + ImmutableList.of(ImmutableList.copyOf[RexLiteral]( + for (fieldName <- rowInfo.getFieldNames) + yield { + val columnType = tableEnv.getTypeFactory + .createTypeFromTypeInfo(rowInfo.getTypeAt(fieldName)) + builder.add(fieldName, columnType) + tableEnv.getRelBuilder.getRexBuilder + .makeLiteral(null,columnType,false).asInstanceOf[RexLiteral] + })) + + val dataType = builder.build() + + val relNode = RelFactories.DEFAULT_VALUES_FACTORY + .createValues(getCluster, dataType, nullLiterals) + + DataSetValuesRule.INSTANCE.asInstanceOf[DataSetValuesRule] --- End diff -- Can we directly create a `ValuesInputFormat` as in `DataSetValues`. Using the `DataSetValuesRule` outside of the optimizer does not seem like a clean design. Alternatively, we can think about implementing the whole fix as a RelOptRule which injects a LogicalValues with a null row in front of a LogicalAggregate without groupingSet. In addition we would need to make sure that LogicalAggregate is only translated into DataSetAggregate if the LogicalValues exists.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---