thomasmueller commented on a change in pull request #383:
URL: https://github.com/apache/jackrabbit-oak/pull/383#discussion_r724034175



##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultMemoryManager.java
##########
@@ -209,11 +211,15 @@ public void handleNotification(Notification notification,
                     .getType()
                     
.equals(MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
                 if (sufficientMemory.get()) {
-                    CompositeData cd = (CompositeData) notification
-                            .getUserData();
-                    MemoryNotificationInfo info = MemoryNotificationInfo
-                            .from(cd);
-                    checkMemory(info.getUsage());
+                    synchronized (sufficientMemory) {
+                        if (sufficientMemory.get()) {

Review comment:
       sufficientMemory is AtomicBoolean. I don't see why you would want to 
synchronize on it. Is the problem that you want to protect against concurrent 
calls on checkMemory? If yes, then checkMemory should be synchronized instead.

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -172,14 +173,24 @@ private boolean registerWithMemoryManager() {
             log.info("Completed task {}", taskID);
             completedTasks.add(taskID);
             DirectoryHelper.markCompleted(sortWorkDir);
-            if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
-                memoryManager.deregisterClient(registrationID);
-            }
             return sortedFiles;
         } catch (IOException e) {
             log.error(taskID + " could not complete download ", e);
         } finally {
             phaser.arriveAndDeregister();
+            log.info("{} entered finally block.", taskID);
+            if (dataDumpNotifyingPhaser != null) {
+                log.info("{} Data dump phaser not null after task completion. 
Notifying memory listener.", taskID);
+                dataDumpNotifyingPhaser.arriveAndDeregister();
+            }
+            if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
+                memoryManager.deregisterClient(registrationID);

Review comment:
       I wonder why do we even need a callback method from the memory 
manager... What about if each client (of the memory manager) just calls the 
memory manager when needed? That would reduce the need of synchronization.

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
##########
@@ -99,11 +100,15 @@
 
     @Override
     public File createSortedStoreFile() throws IOException {
-        logFlags();
-        configureMemoryListener();
-        sortWorkDir = createdSortWorkDir(storeDir);
-        writeToSortedFiles();
-        return sortStoreFile();
+        try {
+            logFlags();
+            configureMemoryListener();
+            sortWorkDir = createdSortWorkDir(storeDir);
+            writeToSortedFiles();
+            return sortStoreFile();
+        } finally {
+            nodeStates.close();

Review comment:
       Here again, closing the nodeStates as a side effect of another method... 
I find it weird.

##########
File path: 
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
##########
@@ -348,11 +349,12 @@ int getTotalProvidedDocCount() {
     }
 
     /**
-     * @return a map with keys denoting timestamp and values denoting paths 
which were created at those timestamps.
+     * @return a map with keys denoting timestamp and values denoting paths 
which were created at those timestamps. An
+     * iterator over the map entries would be in the increasing order of 
timestamps.
      */
-    private Map<Long, List<String>> createPathsWithTimestamps() {
-        Map<Long, List<String>> map = new HashMap<>();
-        for( int i = 1; i <= 10; i++) {
+    private LinkedHashMap<Long, List<String>> createPathsWithTimestamps() {
+        LinkedHashMap<Long, List<String>> map = new LinkedHashMap<>();
+        for( int i = 1; i <= 15; i++) {

Review comment:
       Why 15?

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -239,7 +246,9 @@ private void addEntry(NodeStateEntry e) throws IOException {
         //Here logic differs from NodeStateEntrySorter in sense that
         //Holder line consist only of json and not 'path|json'
         NodeStateHolder h = new StateInBytesHolder(e.getPath(), jsonText);
-        entryBatch.add(h);
+        synchronized (this) {

Review comment:
       We now synchronise all access to entryBatch in the caller of entryBatch. 
What about moving all the synchronization to entryBatch itself? That would be 
easier to understand in my view. Or hopefully synchronization isn't needed at 
all (that would even be easier).

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -263,14 +274,18 @@ private synchronized void reset() {
     }
 
     private void sortAndSaveBatch() throws IOException {
-        if (entryBatch.isEmpty()) {
-            return;
+        synchronized (this) {
+            if (entryBatch.isEmpty()) {
+                return;
+            }
+            entryBatch.sort(comparator);
         }
-        entryBatch.sort(comparator);
         Stopwatch w = Stopwatch.createStarted();
         File newtmpfile = File.createTempFile("sortInBatch", "flatfile", 
sortWorkDir);
         long textSize = 0;
         try (BufferedWriter writer = 
FlatFileStoreUtils.createWriter(newtmpfile, compressionEnabled)) {
+            // no concurrency issue with this traversal because addition to 
this list is only done in #addEntry which, for
+            // a given TraverseAndSortTask object will only be called from 
same thread
             for (NodeStateHolder h : entryBatch) {

Review comment:
       Ah, no, here we don't synchronize on entryBatch! That's inconsistent 
synchronization, and can cause big problems I think. (The way around it would 
be to clone entryBatch... but again, I would prefer if we don't need any 
synchronization).

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java
##########
@@ -62,8 +63,12 @@ public StoreAndSortStrategy(Iterable<NodeStateEntry> 
nodeStates, PathElementComp
 
     @Override
     public File createSortedStoreFile() throws IOException {
-        File storeFile = writeToStore(storeDir, getStoreFileName());
-        return sortStoreFile(storeFile);
+        try {
+            File storeFile = writeToStore(storeDir, getStoreFileName());
+            return sortStoreFile(storeFile);
+        } finally {
+            nodeStates.close();

Review comment:
       It sounds weird that createSortedStoreFile has a side effect of closing 
the nodeStates. Why?

##########
File path: 
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
##########
@@ -294,7 +295,7 @@ public NodeStateEntryTraverser create(LastModifiedRange 
range) {
                     null, range) {
                 @Override
                 public @NotNull Iterator<NodeStateEntry> iterator() {
-                    Map<String, Long> times = new HashMap<>();
+                    Map<String, Long> times = new LinkedHashMap<>(); // should 
be sorted in increasing order of value i.e. lastModificationTime

Review comment:
       LinkedHashMap doesn't sort in increasing order of the key. To sort, you 
would need to use a TreeMap.

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultMemoryManager.java
##########
@@ -109,7 +110,8 @@ private void configureMemoryListener() {
         log.info("Setting up a listener to monitor pool '{}' and trigger batch 
save " +
                 "if memory drop below {} GB (max {})", pool.getName(), 
minMemoryBytes/ONE_GB, humanReadableByteCount(maxMemory));
         pool.setCollectionUsageThreshold(minMemoryBytes);
-        checkMemory(usage);
+        // todo - should we check and block in the beginning? This creates 
problem in case of download resume.

Review comment:
       Why does it create problems?

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
##########
@@ -88,7 +89,7 @@
     private ArrayList<NodeStateHolder> entryBatch = new ArrayList<>();

Review comment:
       Java has a synchronized list...

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -172,14 +173,24 @@ private boolean registerWithMemoryManager() {
             log.info("Completed task {}", taskID);
             completedTasks.add(taskID);
             DirectoryHelper.markCompleted(sortWorkDir);
-            if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
-                memoryManager.deregisterClient(registrationID);
-            }
             return sortedFiles;
         } catch (IOException e) {
             log.error(taskID + " could not complete download ", e);
         } finally {
             phaser.arriveAndDeregister();
+            log.info("{} entered finally block.", taskID);
+            if (dataDumpNotifyingPhaser != null) {
+                log.info("{} Data dump phaser not null after task completion. 
Notifying memory listener.", taskID);
+                dataDumpNotifyingPhaser.arriveAndDeregister();
+            }
+            if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
+                memoryManager.deregisterClient(registrationID);
+            }
+            try {
+                nodeStates.close();

Review comment:
       Here again we close the nodeStates.... why here? What about having a 
separate close() method in this class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to