[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15576379#comment-15576379 ]
ASF GitHub Bot commented on FLINK-4586: --------------------------------------- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2639 [FLINK-4586] [core] Broken AverageAccumulator You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4586_broken_averageaccumulator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2639 ---- commit 58d79b40ed837542e727c5f4f3a410ccb5f9ff24 Author: Greg Hogan <c...@greghogan.com> Date: 2016-10-14T20:18:52Z [FLINK-4586] [core] Broken AverageAccumulator ---- > NumberSequenceIterator and Accumulator threading issue > ------------------------------------------------------ > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API > Affects Versions: 1.1.2 > Reporter: Johannes > Assignee: Greg Hogan > Priority: Minor > Fix For: 1.2.0, 1.1.4 > > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)