[ 
https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882017#comment-15882017
 ] 

Xingcan Cui commented on FLINK-5891:
------------------------------------

Thanks for your explanation, [~greghogan]. I'm afraid my PR on 
https://issues.apache.org/jira/browse/FLINK-1526 gets the same problem as I 
also store values with non-primitive types (anyhow the primitive types will not 
be affected, right?) from the received messages. I saw the following code in 
Flink's ML lib. To avoid the reference problem, it makes a deep copy of each 
{{StreamRecord<IN> element}}.
{code:title=AbstractCEPBasePatternOperator.java | borderStyle=solid}
...
// we have to buffer the elements until we receive the proper watermark
if (getExecutionConfig().isObjectReuseEnabled()) {
    // copy the StreamRecord so that it cannot be changed
    priorityQueue.offer(new 
StreamRecord<IN>(inputSerializer.copy(element.getValue()),     
element.getTimestamp()));
} else {
    priorityQueue.offer(element);
}
updatePriorityQueue(priorityQueue);
...
{code}
So, what's your suggestions on fixing this? I'd like to work on it (and surely 
also the PR of Flink-1526).

> ConnectedComponents is broken when object reuse enabled
> -------------------------------------------------------
>
>                 Key: FLINK-5891
>                 URL: https://issues.apache.org/jira/browse/FLINK-5891
>             Project: Flink
>          Issue Type: Bug
>          Components: Gelly
>    Affects Versions: 1.3.0
>            Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} 
> is storing a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
>       public static final class CCUpdater<K, VV extends Comparable<VV>>
>               extends GatherFunction<K, VV, VV> {
>               @Override
>               public void updateVertex(Vertex<K, VV> vertex, 
> MessageIterator<VV> messages) throws Exception {
>                       VV current = vertex.getValue();
>                       VV min = current;
>                       for (VV msg : messages) {
>                               if (msg.compareTo(min) < 0) {
>                                       min = msg;
>                               }
>                       }
>                       if (!min.equals(current)) {
>                               setNewVertexValue(min);
>                       }
>               }
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to