thomasmueller commented on a change in pull request #383:
URL: https://github.com/apache/jackrabbit-oak/pull/383#discussion_r724034175
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultMemoryManager.java
##########
@@ -209,11 +211,15 @@ public void handleNotification(Notification notification,
.getType()
.equals(MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
if (sufficientMemory.get()) {
- CompositeData cd = (CompositeData) notification
- .getUserData();
- MemoryNotificationInfo info = MemoryNotificationInfo
- .from(cd);
- checkMemory(info.getUsage());
+ synchronized (sufficientMemory) {
+ if (sufficientMemory.get()) {
Review comment:
sufficientMemory is AtomicBoolean. I don't see why you would want to
synchronize on it. Is the problem that you want to protect against concurrent
calls on checkMemory? If yes, then checkMemory should be synchronized instead.
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -172,14 +173,24 @@ private boolean registerWithMemoryManager() {
log.info("Completed task {}", taskID);
completedTasks.add(taskID);
DirectoryHelper.markCompleted(sortWorkDir);
- if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
- memoryManager.deregisterClient(registrationID);
- }
return sortedFiles;
} catch (IOException e) {
log.error(taskID + " could not complete download ", e);
} finally {
phaser.arriveAndDeregister();
+ log.info("{} entered finally block.", taskID);
+ if (dataDumpNotifyingPhaser != null) {
+ log.info("{} Data dump phaser not null after task completion.
Notifying memory listener.", taskID);
+ dataDumpNotifyingPhaser.arriveAndDeregister();
+ }
+ if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
+ memoryManager.deregisterClient(registrationID);
Review comment:
I wonder why do we even need a callback method from the memory
manager... What about if each client (of the memory manager) just calls the
memory manager when needed? That would reduce the need of synchronization.
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
##########
@@ -99,11 +100,15 @@
@Override
public File createSortedStoreFile() throws IOException {
- logFlags();
- configureMemoryListener();
- sortWorkDir = createdSortWorkDir(storeDir);
- writeToSortedFiles();
- return sortStoreFile();
+ try {
+ logFlags();
+ configureMemoryListener();
+ sortWorkDir = createdSortWorkDir(storeDir);
+ writeToSortedFiles();
+ return sortStoreFile();
+ } finally {
+ nodeStates.close();
Review comment:
Here again, closing the nodeStates as a side effect of another method...
I find it weird.
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
##########
@@ -348,11 +349,12 @@ int getTotalProvidedDocCount() {
}
/**
- * @return a map with keys denoting timestamp and values denoting paths
which were created at those timestamps.
+ * @return a map with keys denoting timestamp and values denoting paths
which were created at those timestamps. An
+ * iterator over the map entries would be in the increasing order of
timestamps.
*/
- private Map<Long, List<String>> createPathsWithTimestamps() {
- Map<Long, List<String>> map = new HashMap<>();
- for( int i = 1; i <= 10; i++) {
+ private LinkedHashMap<Long, List<String>> createPathsWithTimestamps() {
+ LinkedHashMap<Long, List<String>> map = new LinkedHashMap<>();
+ for( int i = 1; i <= 15; i++) {
Review comment:
Why 15?
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -239,7 +246,9 @@ private void addEntry(NodeStateEntry e) throws IOException {
//Here logic differs from NodeStateEntrySorter in sense that
//Holder line consist only of json and not 'path|json'
NodeStateHolder h = new StateInBytesHolder(e.getPath(), jsonText);
- entryBatch.add(h);
+ synchronized (this) {
Review comment:
We now synchronise all access to entryBatch in the caller of entryBatch.
What about moving all the synchronization to entryBatch itself? That would be
easier to understand in my view. Or hopefully synchronization isn't needed at
all (that would even be easier).
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -263,14 +274,18 @@ private synchronized void reset() {
}
private void sortAndSaveBatch() throws IOException {
- if (entryBatch.isEmpty()) {
- return;
+ synchronized (this) {
+ if (entryBatch.isEmpty()) {
+ return;
+ }
+ entryBatch.sort(comparator);
}
- entryBatch.sort(comparator);
Stopwatch w = Stopwatch.createStarted();
File newtmpfile = File.createTempFile("sortInBatch", "flatfile",
sortWorkDir);
long textSize = 0;
try (BufferedWriter writer =
FlatFileStoreUtils.createWriter(newtmpfile, compressionEnabled)) {
+ // no concurrency issue with this traversal because addition to
this list is only done in #addEntry which, for
+ // a given TraverseAndSortTask object will only be called from
same thread
for (NodeStateHolder h : entryBatch) {
Review comment:
Ah, no, here we don't synchronize on entryBatch! That's inconsistent
synchronization, and can cause big problems I think. (The way around it would
be to clone entryBatch... but again, I would prefer if we don't need any
synchronization).
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java
##########
@@ -62,8 +63,12 @@ public StoreAndSortStrategy(Iterable<NodeStateEntry>
nodeStates, PathElementComp
@Override
public File createSortedStoreFile() throws IOException {
- File storeFile = writeToStore(storeDir, getStoreFileName());
- return sortStoreFile(storeFile);
+ try {
+ File storeFile = writeToStore(storeDir, getStoreFileName());
+ return sortStoreFile(storeFile);
+ } finally {
+ nodeStates.close();
Review comment:
It sounds weird that createSortedStoreFile has a side effect of closing
the nodeStates. Why?
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
##########
@@ -294,7 +295,7 @@ public NodeStateEntryTraverser create(LastModifiedRange
range) {
null, range) {
@Override
public @NotNull Iterator<NodeStateEntry> iterator() {
- Map<String, Long> times = new HashMap<>();
+ Map<String, Long> times = new LinkedHashMap<>(); // should
be sorted in increasing order of value i.e. lastModificationTime
Review comment:
LinkedHashMap doesn't sort in increasing order of the key. To sort, you
would need to use a TreeMap.
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultMemoryManager.java
##########
@@ -109,7 +110,8 @@ private void configureMemoryListener() {
log.info("Setting up a listener to monitor pool '{}' and trigger batch
save " +
"if memory drop below {} GB (max {})", pool.getName(),
minMemoryBytes/ONE_GB, humanReadableByteCount(maxMemory));
pool.setCollectionUsageThreshold(minMemoryBytes);
- checkMemory(usage);
+ // todo - should we check and block in the beginning? This creates
problem in case of download resume.
Review comment:
Why does it create problems?
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
##########
@@ -88,7 +89,7 @@
private ArrayList<NodeStateHolder> entryBatch = new ArrayList<>();
Review comment:
Java has a synchronized list...
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
##########
@@ -172,14 +173,24 @@ private boolean registerWithMemoryManager() {
log.info("Completed task {}", taskID);
completedTasks.add(taskID);
DirectoryHelper.markCompleted(sortWorkDir);
- if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
- memoryManager.deregisterClient(registrationID);
- }
return sortedFiles;
} catch (IOException e) {
log.error(taskID + " could not complete download ", e);
} finally {
phaser.arriveAndDeregister();
+ log.info("{} entered finally block.", taskID);
+ if (dataDumpNotifyingPhaser != null) {
+ log.info("{} Data dump phaser not null after task completion.
Notifying memory listener.", taskID);
+ dataDumpNotifyingPhaser.arriveAndDeregister();
+ }
+ if (MemoryManager.Type.JMX_BASED.equals(memoryManager.getType())) {
+ memoryManager.deregisterClient(registrationID);
+ }
+ try {
+ nodeStates.close();
Review comment:
Here again we close the nodeStates.... why here? What about having a
separate close() method in this class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]