Johannes created FLINK-4586:
-------------------------------
Summary: NumberSequenceIterator and Accumulator threading issue
Key: FLINK-4586
URL: https://issues.apache.org/jira/browse/FLINK-4586
Project: Flink
Issue Type: Bug
Components: DataSet API
Affects Versions: 1.1.2
Reporter: Johannes
Priority: Minor
There is a strange problem when using the NumberSequenceIterator in combination
with an AverageAccumulator.
It seems like the individual accumulators are reinitialized and overwrite parts
of intermediate solution.
The following scala snippit exemplifies the problem.
Instead of printing the correct average, the result should be {{50.5}} but is
something completely different, like {{8.08}}, dependent on the number of cores
used.
If the parallelism is set to {{1}} the result is correct, which seems like
there is a problem with threading. The problem occurs using the java and scala
API.
{code}
env
.fromParallelCollection(new NumberSequenceIterator(1, 100))
.map(new RichMapFunction[Long, Long] {
var a : AverageAccumulator = _
override def map(value: Long): Long = {
a.add(value)
value
}
override def open(parameters: Configuration): Unit = {
a = new AverageAccumulator
getRuntimeContext.addAccumulator("test", a)
}
})
.reduce((a, b) => a + b)
.print()
val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)