[
https://issues.apache.org/jira/browse/FLINK-4636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571359#comment-16571359
]
ASF GitHub Bot commented on FLINK-4636:
---------------------------------------
dawidwys closed pull request #2568: [FLINK-4636] Add boundary check for
priorityqueue for cep operator
URL: https://github.com/apache/flink/pull/2568
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 64ffa2a0d1c..b428ba62d2d 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws
Exception {
int numberPriorityQueueEntries = ois.readInt();
- priorityQueue = new
PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new
StreamRecordComparator<IN>());
-
- for (int i = 0; i <numberPriorityQueueEntries; i++) {
- StreamElement streamElement =
streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
- priorityQueue.offer(streamElement.<IN>asRecord());
+ if(numberPriorityQueueEntries > 0) {
+ priorityQueue = new
PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new
StreamRecordComparator<IN>());
+ for (int i = 0; i <numberPriorityQueueEntries; i++) {
+ StreamElement streamElement =
streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
+
priorityQueue.offer(streamElement.<IN>asRecord());
+ }
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> AbstractCEPPatternOperator fails to restore state
> -------------------------------------------------
>
> Key: FLINK-4636
> URL: https://issues.apache.org/jira/browse/FLINK-4636
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.1.2, 1.2.0
> Reporter: Fabian Hueske
> Assignee: Jagadish Bihani
> Priority: Major
>
> The {{restoreState()}} of the {{AbstractCEPPatternOperator}} restores the a
> Java {{PriorityQueue}}. For that it first reads the number of elements to
> insert and then creates a {{PriorityQueue}} object. However, Java's
> {{PriorityQueue}} cannot be instantiated with an initial capacity of {{0}},
> which is not checked.
> In case of an empty queue, the {{PriorityQueue}} should be instantiated with
> an initial size of {{1}}.
> See
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024.html
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)