[
https://issues.apache.org/jira/browse/FLINK-6290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dawid Wysakowicz updated FLINK-6290:
------------------------------------
Description:
Below test right now fails:
{code}
@Test
public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
SharedBuffer<String, Event> sharedBuffer = new
SharedBuffer<>(Event.createTypeSerializer());
int numberEvents = 8;
Event[] events = new Event[numberEvents];
final long timestamp = 1L;
for (int i = 0; i < numberEvents; i++) {
events[i] = new Event(i + 1, "e" + (i + 1), i);
}
sharedBuffer.put("start", events[1], timestamp,
DeweyNumber.fromString("1"));
sharedBuffer.put("branching", events[2], timestamp, "start",
events[1], timestamp, DeweyNumber.fromString("1.0"));
sharedBuffer.put("branching", events[3], timestamp, "start",
events[1], timestamp, DeweyNumber.fromString("1.1"));
sharedBuffer.put("branching", events[3], timestamp,
"branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
sharedBuffer.put("branching", events[4], timestamp,
"branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
sharedBuffer.put("branching", events[4], timestamp,
"branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
//simulate IGNORE (next event can point to events[2])
sharedBuffer.lock("branching", events[2], timestamp);
sharedBuffer.release("branching", events[4], timestamp);
//There should be still events[1] and events[2] in the buffer
assertFalse(sharedBuffer.isEmpty());
}
{code}
The problem is with the {{SharedBuffer#internalRemove}} method:
{code}
private void internalRemove(final SharedBufferEntry<K, V> entry) {
Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
entriesToRemove.add(entry);
while (!entriesToRemove.isEmpty()) {
SharedBufferEntry<K, V> currentEntry =
entriesToRemove.pop();
if (currentEntry.getReferenceCounter() == 0) {
currentEntry.remove();
for (SharedBufferEdge<K, V> edge:
currentEntry.getEdges()) {
if (edge.getTarget() != null) {
edge.getTarget().decreaseReferenceCounter();
entriesToRemove.push(edge.getTarget());
}
}
}
}
}
{code}
When currentEntry has multiple edges to the same entry. The entry will be added
twice to the entriesToRemove and it's edges will be removed twice.
was:
Below test right now fails:
{code}
@Test
public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
SharedBuffer<String, Event> sharedBuffer = new
SharedBuffer<>(Event.createTypeSerializer());
int numberEvents = 8;
Event[] events = new Event[numberEvents];
final long timestamp = 1L;
for (int i = 0; i < numberEvents; i++) {
events[i] = new Event(i + 1, "e" + (i + 1), i);
}
sharedBuffer.put("start", events[1], timestamp,
DeweyNumber.fromString("1"));
sharedBuffer.put("branching", events[2], timestamp, "start",
events[1], timestamp, DeweyNumber.fromString("1.0"));
sharedBuffer.put("branching", events[3], timestamp, "start",
events[1], timestamp, DeweyNumber.fromString("1.1"));
sharedBuffer.put("branching", events[3], timestamp,
"branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
sharedBuffer.put("branching", events[4], timestamp,
"branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
sharedBuffer.put("branching", events[4], timestamp,
"branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
//simulate IGNORE (next event can point to events[2])
sharedBuffer.lock("branching", events[2], timestamp);
sharedBuffer.release("branching", events[4], timestamp);
//There should be still events[1] and events[2] in the buffer
assertFalse(sharedBuffer.isEmpty());
}
{code}
The problem is with the {{SharedBuffer#internalRemove}} method:
{{
private void internalRemove(final SharedBufferEntry<K, V> entry) {
Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
entriesToRemove.add(entry);
while (!entriesToRemove.isEmpty()) {
SharedBufferEntry<K, V> currentEntry =
entriesToRemove.pop();
if (currentEntry.getReferenceCounter() == 0) {
currentEntry.remove();
for (SharedBufferEdge<K, V> edge:
currentEntry.getEdges()) {
if (edge.getTarget() != null) {
edge.getTarget().decreaseReferenceCounter();
entriesToRemove.push(edge.getTarget());
}
}
}
}
}
}}
When currentEntry has multiple edges to the same entry. The entry will be added
twice to the entriesToRemove and it's edges will be removed twice.
> SharedBuffer is improperly released when multiple edges between entries
> -----------------------------------------------------------------------
>
> Key: FLINK-6290
> URL: https://issues.apache.org/jira/browse/FLINK-6290
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.3.0
> Reporter: Dawid Wysakowicz
> Priority: Critical
> Fix For: 1.3.0
>
>
> Below test right now fails:
> {code}
> @Test
> public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
> SharedBuffer<String, Event> sharedBuffer = new
> SharedBuffer<>(Event.createTypeSerializer());
> int numberEvents = 8;
> Event[] events = new Event[numberEvents];
> final long timestamp = 1L;
> for (int i = 0; i < numberEvents; i++) {
> events[i] = new Event(i + 1, "e" + (i + 1), i);
> }
> sharedBuffer.put("start", events[1], timestamp,
> DeweyNumber.fromString("1"));
> sharedBuffer.put("branching", events[2], timestamp, "start",
> events[1], timestamp, DeweyNumber.fromString("1.0"));
> sharedBuffer.put("branching", events[3], timestamp, "start",
> events[1], timestamp, DeweyNumber.fromString("1.1"));
> sharedBuffer.put("branching", events[3], timestamp,
> "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
> sharedBuffer.put("branching", events[4], timestamp,
> "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
> sharedBuffer.put("branching", events[4], timestamp,
> "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
> //simulate IGNORE (next event can point to events[2])
> sharedBuffer.lock("branching", events[2], timestamp);
> sharedBuffer.release("branching", events[4], timestamp);
> //There should be still events[1] and events[2] in the buffer
> assertFalse(sharedBuffer.isEmpty());
> }
> {code}
> The problem is with the {{SharedBuffer#internalRemove}} method:
> {code}
> private void internalRemove(final SharedBufferEntry<K, V> entry) {
> Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
> entriesToRemove.add(entry);
> while (!entriesToRemove.isEmpty()) {
> SharedBufferEntry<K, V> currentEntry =
> entriesToRemove.pop();
> if (currentEntry.getReferenceCounter() == 0) {
> currentEntry.remove();
> for (SharedBufferEdge<K, V> edge:
> currentEntry.getEdges()) {
> if (edge.getTarget() != null) {
>
> edge.getTarget().decreaseReferenceCounter();
>
> entriesToRemove.push(edge.getTarget());
> }
> }
> }
> }
> }
> {code}
> When currentEntry has multiple edges to the same entry. The entry will be
> added twice to the entriesToRemove and it's edges will be removed twice.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)