thomasmueller commented on a change in pull request #383:
URL: https://github.com/apache/jackrabbit-oak/pull/383#discussion_r737417571
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -172,22 +171,28 @@ private boolean registerWithMemoryManager() {
log.info("Completed task {}", taskID);
completedTasks.add(taskID);
DirectoryHelper.markCompleted(sortWorkDir);
- if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
- memoryManager.deregisterClient(registrationID);
- }
return sortedFiles;
} catch (IOException e) {
log.error(taskID + " could not complete download ", e);
} finally {
phaser.arriveAndDeregister();
+ log.info("{} entered finally block.", taskID);
+ if (dataDumpNotifyingPhaser != null) {
+ log.info("{} Data dump phaser not null after task completion.
Notifying memory listener.", taskID);
+ dataDumpNotifyingPhaser.arriveAndDeregister();
+ }
+ if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
+ memoryManager.deregisterClient(registrationID);
+ }
+ try {
+ nodeStates.close();
+ } catch (IOException e) {
+ log.error("{} could not close NodeStateEntryTraverser",
taskID);
Review comment:
Could you log the stack trace as well? Just add ", e" to the log.error
call.
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -263,15 +269,21 @@ private synchronized void reset() {
}
private void sortAndSaveBatch() throws IOException {
- if (entryBatch.isEmpty()) {
- return;
+ synchronized (this) {
+ if (entryBatch.isEmpty()) {
+ return;
+ }
+ entryBatch.sort(comparator);
}
- entryBatch.sort(comparator);
Stopwatch w = Stopwatch.createStarted();
File newtmpfile = File.createTempFile("sortInBatch", "flatfile",
sortWorkDir);
long textSize = 0;
+ long size = entryBatch.size();
Review comment:
Here we access entryBatch without synchronized - again, I feel it's
better to use "save" data structures (e.g. in java.util.concurrent), instead of
synchronizing ourselves. If we need to synchronize ourselves, then we need to
do it consistently, and write sufficient test cases that verify all possible
concurrency problems (which is very hard).
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultMemoryManager.java
##########
@@ -209,11 +211,15 @@ public void handleNotification(Notification notification,
.getType()
.equals(MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
if (sufficientMemory.get()) {
- CompositeData cd = (CompositeData) notification
- .getUserData();
- MemoryNotificationInfo info = MemoryNotificationInfo
- .from(cd);
- checkMemory(info.getUsage());
+ synchronized (sufficientMemory) {
+ if (sufficientMemory.get()) {
Review comment:
Hm this not clear... I don't currently see an answer to the question
"why you would want to synchronize on it"
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultMemoryManager.java
##########
@@ -109,7 +110,8 @@ private void configureMemoryListener() {
log.info("Setting up a listener to monitor pool '{}' and trigger batch
save " +
"if memory drop below {} GB (max {})", pool.getName(),
minMemoryBytes/ONE_GB, humanReadableByteCount(maxMemory));
pool.setCollectionUsageThreshold(minMemoryBytes);
- checkMemory(usage);
+ // todo - should we check and block in the beginning? This creates
problem in case of download resume.
Review comment:
Ok, in this case could you remove the TODO and write e.g. "We don't
check memory here, as there is no good way here to free up memory in case it is
low."?
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -239,7 +246,9 @@ private void addEntry(NodeStateEntry e) throws IOException {
//Here logic differs from NodeStateEntrySorter in sense that
//Holder line consist only of json and not 'path|json'
NodeStateHolder h = new StateInBytesHolder(e.getPath(), jsonText);
- entryBatch.add(h);
+ synchronized (this) {
Review comment:
I don't understand the exact use for synchronization... which data
structure needs to be protected from concurrent operations?
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -263,15 +269,21 @@ private synchronized void reset() {
}
private void sortAndSaveBatch() throws IOException {
- if (entryBatch.isEmpty()) {
- return;
+ synchronized (this) {
+ if (entryBatch.isEmpty()) {
+ return;
+ }
+ entryBatch.sort(comparator);
}
- entryBatch.sort(comparator);
Stopwatch w = Stopwatch.createStarted();
File newtmpfile = File.createTempFile("sortInBatch", "flatfile",
sortWorkDir);
long textSize = 0;
+ long size = entryBatch.size();
try (BufferedWriter writer =
FlatFileStoreUtils.createWriter(newtmpfile, compressionEnabled)) {
- for (NodeStateHolder h : entryBatch) {
+ // no concurrency issue with this traversal because addition to
this list is only done in #addEntry which, for
+ // a given TraverseAndSortTask object will only be called from
same thread
+ while (!entryBatch.isEmpty()) {
+ NodeStateHolder h = entryBatch.removeFirst();
Review comment:
This is possibly another case of unsynchronized access to entryBatch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]