yunfengzhou-hub commented on code in PR #177:
URL: https://github.com/apache/flink-ml/pull/177#discussion_r1026042607


##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java:
##########
@@ -86,6 +87,29 @@ public void testAggregate() throws Exception {
         assertEquals("190", stringSum.get(0));
     }
 
+    @Test
+    public void testAggregateWithNonNeutralInitialAccumulator() throws 
Exception {
+        DataStream<Long> dataStream =
+                env.fromParallelCollection(new NumberSequenceIterator(0L, 
19L), Types.LONG);
+        DataStream<String> result =
+                DataStreamUtils.aggregate(
+                        dataStream, new 
TestAggregateFuncWithNonNeutralInitialAccumulator());
+        List<String> stringSumList = 
IteratorUtils.toList(result.executeAndCollect());
+        assertEquals(1, stringSumList.size());
+        String stringSum1 = stringSumList.get(0);
+
+        env.setParallelism(env.getParallelism() + 1);

Review Comment:
   The parallelism of the environment would be reset in the `before()` method, 
which happens before each test case, so there is no need to reset the 
parallelism to the default value here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to