[ 
https://issues.apache.org/jira/browse/FLINK-6290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-6290:
------------------------------------
    Description: 
Below test right now fails:
{code}
        @Test
        public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
                SharedBuffer<String, Event> sharedBuffer = new 
SharedBuffer<>(Event.createTypeSerializer());
                int numberEvents = 8;
                Event[] events = new Event[numberEvents];
                final long timestamp = 1L;

                for (int i = 0; i < numberEvents; i++) {
                        events[i] = new Event(i + 1, "e" + (i + 1), i);
                }

                sharedBuffer.put("start", events[1], timestamp, 
DeweyNumber.fromString("1"));
                sharedBuffer.put("branching", events[2], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.0"));
                sharedBuffer.put("branching", events[3], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.1"));
                sharedBuffer.put("branching", events[3], timestamp, 
"branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
                sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
                sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));

                //simulate IGNORE (next event can point to events[2])
                sharedBuffer.lock("branching", events[2], timestamp);

                sharedBuffer.release("branching", events[4], timestamp);

                //There should be still events[1] and events[2] in the buffer
                assertFalse(sharedBuffer.isEmpty());
        }
{code}

The problem is with the {{SharedBuffer#internalRemove}} method:

{code}
private void internalRemove(final SharedBufferEntry<K, V> entry) {
                Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
                entriesToRemove.add(entry);

                while (!entriesToRemove.isEmpty()) {
                        SharedBufferEntry<K, V> currentEntry = 
entriesToRemove.pop();

                        if (currentEntry.getReferenceCounter() == 0) {
                                currentEntry.remove();

                                for (SharedBufferEdge<K, V> edge: 
currentEntry.getEdges()) {
                                        if (edge.getTarget() != null) {
                                                
edge.getTarget().decreaseReferenceCounter();
                                                
entriesToRemove.push(edge.getTarget());
                                        }
                                }
                        }
                }
        }
{code}

When currentEntry has multiple edges to the same entry. The entry will be added 
twice to the entriesToRemove and it's edges will be removed twice.


  was:
Below test right now fails:
{code}
        @Test
        public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
                SharedBuffer<String, Event> sharedBuffer = new 
SharedBuffer<>(Event.createTypeSerializer());
                int numberEvents = 8;
                Event[] events = new Event[numberEvents];
                final long timestamp = 1L;

                for (int i = 0; i < numberEvents; i++) {
                        events[i] = new Event(i + 1, "e" + (i + 1), i);
                }

                sharedBuffer.put("start", events[1], timestamp, 
DeweyNumber.fromString("1"));
                sharedBuffer.put("branching", events[2], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.0"));
                sharedBuffer.put("branching", events[3], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.1"));
                sharedBuffer.put("branching", events[3], timestamp, 
"branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
                sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
                sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));

                //simulate IGNORE (next event can point to events[2])
                sharedBuffer.lock("branching", events[2], timestamp);

                sharedBuffer.release("branching", events[4], timestamp);

                //There should be still events[1] and events[2] in the buffer
                assertFalse(sharedBuffer.isEmpty());
        }
{code}

The problem is with the {{SharedBuffer#internalRemove}} method:

{{
private void internalRemove(final SharedBufferEntry<K, V> entry) {
                Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
                entriesToRemove.add(entry);

                while (!entriesToRemove.isEmpty()) {
                        SharedBufferEntry<K, V> currentEntry = 
entriesToRemove.pop();

                        if (currentEntry.getReferenceCounter() == 0) {
                                currentEntry.remove();

                                for (SharedBufferEdge<K, V> edge: 
currentEntry.getEdges()) {
                                        if (edge.getTarget() != null) {
                                                
edge.getTarget().decreaseReferenceCounter();
                                                
entriesToRemove.push(edge.getTarget());
                                        }
                                }
                        }
                }
        }
}}

When currentEntry has multiple edges to the same entry. The entry will be added 
twice to the entriesToRemove and it's edges will be removed twice.



> SharedBuffer is improperly released when multiple edges between entries
> -----------------------------------------------------------------------
>
>                 Key: FLINK-6290
>                 URL: https://issues.apache.org/jira/browse/FLINK-6290
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Dawid Wysakowicz
>            Priority: Critical
>             Fix For: 1.3.0
>
>
> Below test right now fails:
> {code}
>       @Test
>       public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
>               SharedBuffer<String, Event> sharedBuffer = new 
> SharedBuffer<>(Event.createTypeSerializer());
>               int numberEvents = 8;
>               Event[] events = new Event[numberEvents];
>               final long timestamp = 1L;
>               for (int i = 0; i < numberEvents; i++) {
>                       events[i] = new Event(i + 1, "e" + (i + 1), i);
>               }
>               sharedBuffer.put("start", events[1], timestamp, 
> DeweyNumber.fromString("1"));
>               sharedBuffer.put("branching", events[2], timestamp, "start", 
> events[1], timestamp, DeweyNumber.fromString("1.0"));
>               sharedBuffer.put("branching", events[3], timestamp, "start", 
> events[1], timestamp, DeweyNumber.fromString("1.1"));
>               sharedBuffer.put("branching", events[3], timestamp, 
> "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
>               sharedBuffer.put("branching", events[4], timestamp, 
> "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
>               sharedBuffer.put("branching", events[4], timestamp, 
> "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
>               //simulate IGNORE (next event can point to events[2])
>               sharedBuffer.lock("branching", events[2], timestamp);
>               sharedBuffer.release("branching", events[4], timestamp);
>               //There should be still events[1] and events[2] in the buffer
>               assertFalse(sharedBuffer.isEmpty());
>       }
> {code}
> The problem is with the {{SharedBuffer#internalRemove}} method:
> {code}
> private void internalRemove(final SharedBufferEntry<K, V> entry) {
>               Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
>               entriesToRemove.add(entry);
>               while (!entriesToRemove.isEmpty()) {
>                       SharedBufferEntry<K, V> currentEntry = 
> entriesToRemove.pop();
>                       if (currentEntry.getReferenceCounter() == 0) {
>                               currentEntry.remove();
>                               for (SharedBufferEdge<K, V> edge: 
> currentEntry.getEdges()) {
>                                       if (edge.getTarget() != null) {
>                                               
> edge.getTarget().decreaseReferenceCounter();
>                                               
> entriesToRemove.push(edge.getTarget());
>                                       }
>                               }
>                       }
>               }
>       }
> {code}
> When currentEntry has multiple edges to the same entry. The entry will be 
> added twice to the entriesToRemove and it's edges will be removed twice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to