This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ec7fb62 ISSUE #1544: [DLOG] ConcurrentModificationException with
nonblocking logReader.readNext(true)
ec7fb62 is described below
commit ec7fb62f025a3b03b42fd92b38bc836515310fad
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 23 11:04:33 2018 -0700
ISSUE #1544: [DLOG] ConcurrentModificationException with nonblocking
logReader.readNext(true)
Descriptions of the changes in this PR:
*Motivation*
Fixes #1544. ConcurrentModificationException is thrown when trying to
access a non-thread-safe hashmap from different threads.
*Changes*
Make sure accessing to this non-thread-safe hashmap is under synchronized
block.
*Tests*
It is a bit tricky to reproduce this race condition in a unit test or an
integration test. so not going to attempt adding any tests.
Master Issue: #1544
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #1558 from sijie/issue_1544, closes #1544
---
.../distributedlog/impl/ZKLogSegmentMetadataStore.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index d78b455..f688f44 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -476,8 +476,12 @@ public class ZKLogSegmentMetadataStore implements
LogSegmentMetadataStore, Watch
return;
}
this.submitTask(logSegmentsPath, () -> {
- for (LogSegmentNamesListener listener : listeners.keySet()) {
- listener.onLogStreamDeleted();
+ // the listener map might be updated in different threads (e.g.
unregisterLogSegmentListener)
+ // so access it under a synchronization block
+ synchronized (listeners) {
+ for (LogSegmentNamesListener listener : listeners.keySet()) {
+ listener.onLogStreamDeleted();
+ }
}
});
@@ -490,8 +494,12 @@ public class ZKLogSegmentMetadataStore implements
LogSegmentMetadataStore, Watch
return;
}
this.submitTask(logSegmentsPath, () -> {
- for (VersionedLogSegmentNamesListener listener :
listeners.values()) {
- listener.onSegmentsUpdated(segments);
+ // the listener map might be updated in different threads (e.g.
unregisterLogSegmentListener)
+ // so access it under a synchronization block
+ synchronized (listeners) {
+ for (VersionedLogSegmentNamesListener listener :
listeners.values()) {
+ listener.onSegmentsUpdated(segments);
+ }
}
});
}