[
https://issues.apache.org/jira/browse/FLINK-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jaromir Vanek updated FLINK-6394:
---------------------------------
Description:
I am using group combiner in DataSet API with disabled object reuse.
In code it may be expressed as follows:
{code:java}
tuples.groupBy(1)
.combineGroup((it, collector) -> {
// store first item for future use
Pojo stored = it.next();
while (it.hasNext()) {
....
}
})
{code}
It seems even the object reuse feature is disabled, my instance is actually
replaced when {{.next()}} is called on the iterator. It leads to very confusing
and wrong results.
I checked the Flink codebase and it seems {{CombiningUnilateralSortMerger}} is
actually reusing object instances even though object reuse is explicitly
disabled.
In spilling phase user's combiner is called with instance of
{{CombineValueIterator}} that actually reuses instances without any warning.
See
https://github.com/apache/flink/blob/d7b59d761601baba6765bb4fc407bcd9fd6a9387/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java#L550
When I disable combiner and use {{groupReduce}} only with the same reduce
function, results are fine.
Please let me know if you can confirm this as a bug. From my point it's very
critical as I am getting unpredictable results.
was:
I am using group combiner in DataSet API with disabled object reuse.
In code it may be expressed as follows:
{code:java}
tuples.groupBy(1)
.combineGroup((it, collector) -> {
// store first item for future use
Pojo stored = it.next();
while (it.hasNext()) {
....
}
})
{code}
It seems even the object reuse feature is disabled, my instance is actually
replaced when {{.next()}} is called on the iterator. It leads to very confusing
and wrong results.
I checked the Flink codebase and it seems {{CombiningUnilateralSortMerger}} is
actually reusing object instances even though object reuse is explicitly
disabled.
In spilling phase user's combiner is called with instance of
{{CombineValueIterator}} that actually reuses instances without any warning.
See
https://github.com/apache/flink/blob/d7b59d761601baba6765bb4fc407bcd9fd6a9387/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java#L550
When I disable combiner and use {{groupReduce}} only with the same reduce
function results are fine.
Please let me know if you can confirm this as a bug. From my point it's very
critical as I am getting unpredictable results.
> GroupCombine reuses instances even though object reuse is disabled
> ------------------------------------------------------------------
>
> Key: FLINK-6394
> URL: https://issues.apache.org/jira/browse/FLINK-6394
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.2.0
> Reporter: Jaromir Vanek
> Priority: Critical
>
> I am using group combiner in DataSet API with disabled object reuse.
> In code it may be expressed as follows:
> {code:java}
> tuples.groupBy(1)
> .combineGroup((it, collector) -> {
> // store first item for future use
> Pojo stored = it.next();
> while (it.hasNext()) {
> ....
> }
> })
> {code}
> It seems even the object reuse feature is disabled, my instance is actually
> replaced when {{.next()}} is called on the iterator. It leads to very
> confusing and wrong results.
> I checked the Flink codebase and it seems {{CombiningUnilateralSortMerger}}
> is actually reusing object instances even though object reuse is explicitly
> disabled.
> In spilling phase user's combiner is called with instance of
> {{CombineValueIterator}} that actually reuses instances without any warning.
> See
> https://github.com/apache/flink/blob/d7b59d761601baba6765bb4fc407bcd9fd6a9387/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java#L550
> When I disable combiner and use {{groupReduce}} only with the same reduce
> function, results are fine.
> Please let me know if you can confirm this as a bug. From my point it's very
> critical as I am getting unpredictable results.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)