This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.7 by this push:
new e04020a ISSUE #1544: [DLOG] ConcurrentModificationException with
nonblocking logReader.readNext(true)
e04020a is described below
commit e04020acda245c66351ccf2612f01b2c26ddbdd6
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 23 11:04:33 2018 -0700
ISSUE #1544: [DLOG] ConcurrentModificationException with nonblocking
logReader.readNext(true)
Descriptions of the changes in this PR:
*Motivation*
Fixes #1544. ConcurrentModificationException is thrown when trying to
access a non-thread-safe hashmap from different threads.
*Changes*
Make sure accessing to this non-thread-safe hashmap is under synchronized
block.
*Tests*
It is a bit tricky to reproduce this race condition in a unit test or an
integration test. so not going to attempt adding any tests.
Master Issue: #1544
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #1558 from sijie/issue_1544, closes #1544
(cherry picked from commit ec7fb62f025a3b03b42fd92b38bc836515310fad)
Signed-off-by: Sijie Guo <[email protected]>
---
.../distributedlog/impl/ZKLogSegmentMetadataStore.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index d78b455..f688f44 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -476,8 +476,12 @@ public class ZKLogSegmentMetadataStore implements
LogSegmentMetadataStore, Watch
return;
}
this.submitTask(logSegmentsPath, () -> {
- for (LogSegmentNamesListener listener : listeners.keySet()) {
- listener.onLogStreamDeleted();
+ // the listener map might be updated in different threads (e.g.
unregisterLogSegmentListener)
+ // so access it under a synchronization block
+ synchronized (listeners) {
+ for (LogSegmentNamesListener listener : listeners.keySet()) {
+ listener.onLogStreamDeleted();
+ }
}
});
@@ -490,8 +494,12 @@ public class ZKLogSegmentMetadataStore implements
LogSegmentMetadataStore, Watch
return;
}
this.submitTask(logSegmentsPath, () -> {
- for (VersionedLogSegmentNamesListener listener :
listeners.values()) {
- listener.onSegmentsUpdated(segments);
+ // the listener map might be updated in different threads (e.g.
unregisterLogSegmentListener)
+ // so access it under a synchronization block
+ synchronized (listeners) {
+ for (VersionedLogSegmentNamesListener listener :
listeners.values()) {
+ listener.onSegmentsUpdated(segments);
+ }
}
});
}