[ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:
----------------------------
    Description: 
There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solutions.

The following scala snippit exemplifies the problem.

Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.

If the parallelism is set to {{1}} the result is correct, which indicates a 
likely threading problem. 

The problem occurs using the java and scala API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
        var a : AverageAccumulator = _

        override def map(value: Long): Long = {
          a.add(value)
          value
        }

        override def open(parameters: Configuration): Unit = {
          a = new AverageAccumulator
          getRuntimeContext.addAccumulator("test", a)
        }
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}

  was:
There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solution.

The following scala snippit exemplifies the problem.

Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.

If the parallelism is set to {{1}} the result is correct, which indicates a 
likely threading problem. 

The problem occurs using the java and scala API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
        var a : AverageAccumulator = _

        override def map(value: Long): Long = {
          a.add(value)
          value
        }

        override def open(parameters: Configuration): Unit = {
          a = new AverageAccumulator
          getRuntimeContext.addAccumulator("test", a)
        }
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}


> NumberSequenceIterator and Accumulator threading issue
> ------------------------------------------------------
>
>                 Key: FLINK-4586
>                 URL: https://issues.apache.org/jira/browse/FLINK-4586
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API
>    Affects Versions: 1.1.2
>            Reporter: Johannes
>            Priority: Minor
>         Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>       var a : AverageAccumulator = _
>       override def map(value: Long): Long = {
>         a.add(value)
>         value
>       }
>       override def open(parameters: Configuration): Unit = {
>         a = new AverageAccumulator
>         getRuntimeContext.addAccumulator("test", a)
>       }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to