[
https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882017#comment-15882017
]
Xingcan Cui commented on FLINK-5891:
------------------------------------
Thanks for your explanation, [~greghogan]. I'm afraid my PR on
https://issues.apache.org/jira/browse/FLINK-1526 gets the same problem as I
also store values with non-primitive types (anyhow the primitive types will not
be affected, right?) from the received messages. I saw the following code in
Flink's ML lib. To avoid the reference problem, it makes a deep copy of each
{{StreamRecord<IN> element}}.
{code:title=AbstractCEPBasePatternOperator.java | borderStyle=solid}
...
// we have to buffer the elements until we receive the proper watermark
if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed
priorityQueue.offer(new
StreamRecord<IN>(inputSerializer.copy(element.getValue()),
element.getTimestamp()));
} else {
priorityQueue.offer(element);
}
updatePriorityQueue(priorityQueue);
...
{code}
So, what's your suggestions on fixing this? I'd like to work on it (and surely
also the PR of Flink-1526).
> ConnectedComponents is broken when object reuse enabled
> -------------------------------------------------------
>
> Key: FLINK-5891
> URL: https://issues.apache.org/jira/browse/FLINK-5891
> Project: Flink
> Issue Type: Bug
> Components: Gelly
> Affects Versions: 1.3.0
> Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}}
> is storing a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
> public static final class CCUpdater<K, VV extends Comparable<VV>>
> extends GatherFunction<K, VV, VV> {
> @Override
> public void updateVertex(Vertex<K, VV> vertex,
> MessageIterator<VV> messages) throws Exception {
> VV current = vertex.getValue();
> VV min = current;
> for (VV msg : messages) {
> if (msg.compareTo(min) < 0) {
> min = msg;
> }
> }
> if (!min.equals(current)) {
> setNewVertexValue(min);
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)