thomasmueller commented on code in PR #1202:
URL: https://github.com/apache/jackrabbit-oak/pull/1202#discussion_r1393938962


##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java:
##########
@@ -51,49 +49,47 @@ public static NodeStateEntryBatch 
createNodeStateEntryBatch(int bufferSizeBytes,
     }
 
     private final ByteBuffer buffer;
-    private final ArrayList<SortKey> sortBuffer;
     private final int maxEntries;
+    private int numberOfEntries;
+    private int sizeOfEntries;

Review Comment:
   sizeOfEntries sounds like bytes. At some point, we might want to go beyond 2 
GB. Could we switch to "long"? Maybe better for all of then entries: 
maxEntries, numberOfEntries, but most importantly, sizeOfEntries.
   
   
   



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java:
##########
@@ -116,48 +136,90 @@ public Result call() throws Exception {
         }
     }
 
+    private void buildSortArray(NodeStateEntryBatch nseb) {
+        Stopwatch startTime = Stopwatch.createStarted();
+        ByteBuffer buffer = nseb.getBuffer();
+        int totalPathSize = 0;
+        while (buffer.hasRemaining()) {
+            int positionInBuffer = buffer.position();
+            // Read the next key from the buffer
+            int pathLength = buffer.getInt();
+            totalPathSize += pathLength;
+            if (pathLength > copyBuffer.length) {
+                LOG.debug("Resizing copy buffer from {} to {}", 
copyBuffer.length, pathLength);
+                copyBuffer = new byte[pathLength];
+            }
+            buffer.get(copyBuffer, 0, pathLength);
+            // Skip the json
+            int entryLength = buffer.getInt();
+            buffer.position(buffer.position() + entryLength);
+            // Create the sort key
+            String path = new String(copyBuffer, 0, pathLength, 
StandardCharsets.UTF_8);
+            String[] pathSegments = SortKey.genSortKeyPathElements(path);
+            sortBuffer.add(new SortKey(pathSegments, positionInBuffer));
+        }
+        timeCreatingSortArrayMillis += startTime.elapsed().toMillis();
+        LOG.info("Built sort array in {}. Entries: {}, Total size of path 
strings: {}",
+                startTime, sortBuffer.size(), 
humanReadableByteCountBin(totalPathSize));
+    }
+
     private void sortAndSaveBatch(NodeStateEntryBatch nseb) throws Exception {
-        ArrayList<SortKey> sortBuffer = nseb.getSortBuffer();
         ByteBuffer buffer = nseb.getBuffer();
         LOG.info("Going to sort batch in memory. Entries: {}, Size: {}",
-                sortBuffer.size(), 
humanReadableByteCountBin(buffer.remaining()));
+                nseb.numberOfEntries(), 
humanReadableByteCountBin(nseb.sizeOfEntries()));
+        sortBuffer.clear();
+        buildSortArray(nseb);
         if (sortBuffer.isEmpty()) {
             return;
         }
         Stopwatch sortClock = Stopwatch.createStarted();
         sortBuffer.sort(pathComparator);
+        timeSortingMillis += sortClock.elapsed().toMillis();
         LOG.info("Sorted batch in {}. Saving to disk", sortClock);
         Stopwatch saveClock = Stopwatch.createStarted();
         Path newtmpfile = Files.createTempFile(sortWorkDir, "sortInBatch", 
"flatfile");
         long textSize = 0;
         batchesProcessed++;
-        try (BufferedOutputStream writer = createOutputStream(newtmpfile, 
algorithm)) {
+        try (OutputStream os = IndexStoreUtils.createOutputStream(newtmpfile, 
algorithm)) {
             for (SortKey entry : sortBuffer) {
                 entriesProcessed++;
                 // Retrieve the entry from the buffer
                 int posInBuffer = entry.getBufferPos();
                 buffer.position(posInBuffer);
-                int entrySize = buffer.getInt();
-
-                // Write the entry to the file without creating intermediate 
byte[]
-                int bytesRemaining = entrySize;
-                while (bytesRemaining > 0) {
-                    int bytesRead = Math.min(copyBuffer.length, 
bytesRemaining);
-                    buffer.get(copyBuffer, 0, bytesRead);
-                    writer.write(copyBuffer, 0, bytesRead);
-                    bytesRemaining -= bytesRead;
-                }
-                writer.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
-                textSize += entrySize + 1;
+
+                int pathSize = buffer.getInt();
+                copyField(os, buffer, pathSize);
+                os.write(PipelinedStrategy.FLATFILESTORE_DELIMITER);
+                int jsonSize = buffer.getInt();
+                copyField(os, buffer, jsonSize);
+                os.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
+                textSize += pathSize + jsonSize + 2;
             }
         }
-        LOG.info("Stored batch of size {} (uncompressed {}) with {} entries in 
{}",
-                humanReadableByteCountBin(Files.size(newtmpfile)),
+        timeWritingMillis += saveClock.elapsed().toMillis();
+        long compressedSize = Files.size(newtmpfile);
+        LOG.info("Wrote batch of size {} (uncompressed {}) with {} entries in 
{} at {} MB/s",
+                humanReadableByteCountBin(compressedSize),
                 humanReadableByteCountBin(textSize),
-                sortBuffer.size(), saveClock);
+                sortBuffer.size(), saveClock,
+                String.format("%2.2f", ((1.0 * compressedSize / 
saveClock.elapsed().toMillis()) * 1000) / FileUtils.ONE_MB)

Review Comment:
   There's a risk of division by 0...
   
   What about adding another utility method for this division?



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java:
##########
@@ -276,21 +276,19 @@ public PipelinedStrategy(MongoDocumentStore documentStore,
         );
         this.mongoDocQueueSize = mongoDocQueueReservedMemoryMB / 
mongoDocBatchMaxSizeMB;
 
-        // Transform threads <-> merge-sort
+        // Derived values for transform <-> sort-save
+        int nseWorkingMemoryMB = readNSEBuffersReservedMemory();
         this.nseBuffersCount = 1 + numberOfTransformThreads;
-
-        long nseBuffersReservedMemoryBytes = nseBuffersReservedMemoryMB * 
FileUtils.ONE_MB;
+        long nseWorkingMemoryBytes = nseWorkingMemoryMB * FileUtils.ONE_MB;

Review Comment:
   FileUtils.ONE_MB is a "long", so this is fine...
   
   An option would be to cast nseWorkingMemoryMB to long here, so this is very 
clear.



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java:
##########
@@ -304,11 +302,21 @@ public PipelinedStrategy(MongoDocumentStore documentStore,
                 mongoDocQueueReservedMemoryMB,
                 mongoDocBatchMaxSizeMB,
                 mongoDocQueueSize);
-        LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, 
numberOfBuffers: {}, bufferSize: {}, maxEntriesPerBuffer: {} ]",
-                nseBuffersReservedMemoryMB,
+        LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, 
numberOfBuffers: {}, bufferSize: {}, sortBufferReservedMemory: {} ]",
+                nseWorkingMemoryMB,
                 nseBuffersCount,
                 IOUtils.humanReadableByteCountBin(nseBuffersSizeBytes),
-                nseBufferMaxEntriesPerBuffer);
+                
IOUtils.humanReadableByteCountBin(memoryReservedForSortKeysArray)
+        );
+    }
+
+    static long estimateMaxSizeOfSortKeyArray(long nseWorkingMemoryBytes, long 
nseBuffersCount, int sortBufferMemoryPercentage) {
+        // We reserve a percentage of the size of a buffer for the sort keys 
array. That is, we are assuming that for every line
+        // in the sort buffer, the memory needed to store the SortKey of the 
path section of the line will not be more
+        // than sortBufferMemoryPercentage of the total size of the line in 
average
+        // Estimate memory needed by the sort keys array. We assume each entry 
requires 256 bytes.
+        long approxNseBufferSize = limitToIntegerRange(nseWorkingMemoryBytes / 
nseBuffersCount);

Review Comment:
   Possibly division by 0



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java:
##########
@@ -276,21 +276,19 @@ public PipelinedStrategy(MongoDocumentStore documentStore,
         );
         this.mongoDocQueueSize = mongoDocQueueReservedMemoryMB / 
mongoDocBatchMaxSizeMB;
 
-        // Transform threads <-> merge-sort
+        // Derived values for transform <-> sort-save
+        int nseWorkingMemoryMB = readNSEBuffersReservedMemory();
         this.nseBuffersCount = 1 + numberOfTransformThreads;
-
-        long nseBuffersReservedMemoryBytes = nseBuffersReservedMemoryMB * 
FileUtils.ONE_MB;
+        long nseWorkingMemoryBytes = nseWorkingMemoryMB * FileUtils.ONE_MB;
         // The working memory is divided in the following regions:
         // - #transforThreads   NSE Binary buffers
-        // - 1x                 Metadata of NSE entries in Binary buffers, 
list of SortKeys
-        // A ByteBuffer can be at most Integer.MAX_VALUE bytes long
-        this.nseBuffersSizeBytes = 
limitToIntegerRange(nseBuffersReservedMemoryBytes / (nseBuffersCount + 1));
+        // - x1                 Memory reserved for the array created by the 
sort-batch thread with the keys of the entries
+        //                      in the batch that is being sorted
+        long memoryReservedForSortKeysArray = 
estimateMaxSizeOfSortKeyArray(nseWorkingMemoryBytes, nseBuffersCount, 
sortBufferMemoryPercentage);
+        long memoryReservedForBuffers = nseWorkingMemoryBytes - 
memoryReservedForSortKeysArray;
 
-        // Assuming 1 instance of SortKey takes around 256 bytes. We have 
#transformThreads + 1 regions of nseBufferSizeBytes.
-        // The extra region is for the SortKey instances. Below we compute the 
total number of SortKey instances that
-        // fit in the memory region reserved for them, assuming that each 
SortKey instance takes 256 bytes. Then we
-        // distribute equally these available entries among the nse buffers
-        this.nseBufferMaxEntriesPerBuffer = (this.nseBuffersSizeBytes / 256) / 
this.nseBuffersCount;
+        // A ByteBuffer can be at most Integer.MAX_VALUE bytes long
+        this.nseBuffersSizeBytes = 
limitToIntegerRange(memoryReservedForBuffers / nseBuffersCount);

Review Comment:
   We had "/ (nseBuffersCount + 1)" before, which prevents division by 0.
   
   Maybe another utility method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to