thomasmueller commented on code in PR #1202: URL: https://github.com/apache/jackrabbit-oak/pull/1202#discussion_r1393938962
########## oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java: ########## @@ -51,49 +49,47 @@ public static NodeStateEntryBatch createNodeStateEntryBatch(int bufferSizeBytes, } private final ByteBuffer buffer; - private final ArrayList<SortKey> sortBuffer; private final int maxEntries; + private int numberOfEntries; + private int sizeOfEntries; Review Comment: sizeOfEntries sounds like bytes. At some point, we might want to go beyond 2 GB. Could we switch to "long"? Maybe better for all of then entries: maxEntries, numberOfEntries, but most importantly, sizeOfEntries. ########## oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java: ########## @@ -116,48 +136,90 @@ public Result call() throws Exception { } } + private void buildSortArray(NodeStateEntryBatch nseb) { + Stopwatch startTime = Stopwatch.createStarted(); + ByteBuffer buffer = nseb.getBuffer(); + int totalPathSize = 0; + while (buffer.hasRemaining()) { + int positionInBuffer = buffer.position(); + // Read the next key from the buffer + int pathLength = buffer.getInt(); + totalPathSize += pathLength; + if (pathLength > copyBuffer.length) { + LOG.debug("Resizing copy buffer from {} to {}", copyBuffer.length, pathLength); + copyBuffer = new byte[pathLength]; + } + buffer.get(copyBuffer, 0, pathLength); + // Skip the json + int entryLength = buffer.getInt(); + buffer.position(buffer.position() + entryLength); + // Create the sort key + String path = new String(copyBuffer, 0, pathLength, StandardCharsets.UTF_8); + String[] pathSegments = SortKey.genSortKeyPathElements(path); + sortBuffer.add(new SortKey(pathSegments, positionInBuffer)); + } + timeCreatingSortArrayMillis += startTime.elapsed().toMillis(); + LOG.info("Built sort array in {}. Entries: {}, Total size of path strings: {}", + startTime, sortBuffer.size(), humanReadableByteCountBin(totalPathSize)); + } + private void sortAndSaveBatch(NodeStateEntryBatch nseb) throws Exception { - ArrayList<SortKey> sortBuffer = nseb.getSortBuffer(); ByteBuffer buffer = nseb.getBuffer(); LOG.info("Going to sort batch in memory. Entries: {}, Size: {}", - sortBuffer.size(), humanReadableByteCountBin(buffer.remaining())); + nseb.numberOfEntries(), humanReadableByteCountBin(nseb.sizeOfEntries())); + sortBuffer.clear(); + buildSortArray(nseb); if (sortBuffer.isEmpty()) { return; } Stopwatch sortClock = Stopwatch.createStarted(); sortBuffer.sort(pathComparator); + timeSortingMillis += sortClock.elapsed().toMillis(); LOG.info("Sorted batch in {}. Saving to disk", sortClock); Stopwatch saveClock = Stopwatch.createStarted(); Path newtmpfile = Files.createTempFile(sortWorkDir, "sortInBatch", "flatfile"); long textSize = 0; batchesProcessed++; - try (BufferedOutputStream writer = createOutputStream(newtmpfile, algorithm)) { + try (OutputStream os = IndexStoreUtils.createOutputStream(newtmpfile, algorithm)) { for (SortKey entry : sortBuffer) { entriesProcessed++; // Retrieve the entry from the buffer int posInBuffer = entry.getBufferPos(); buffer.position(posInBuffer); - int entrySize = buffer.getInt(); - - // Write the entry to the file without creating intermediate byte[] - int bytesRemaining = entrySize; - while (bytesRemaining > 0) { - int bytesRead = Math.min(copyBuffer.length, bytesRemaining); - buffer.get(copyBuffer, 0, bytesRead); - writer.write(copyBuffer, 0, bytesRead); - bytesRemaining -= bytesRead; - } - writer.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR); - textSize += entrySize + 1; + + int pathSize = buffer.getInt(); + copyField(os, buffer, pathSize); + os.write(PipelinedStrategy.FLATFILESTORE_DELIMITER); + int jsonSize = buffer.getInt(); + copyField(os, buffer, jsonSize); + os.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR); + textSize += pathSize + jsonSize + 2; } } - LOG.info("Stored batch of size {} (uncompressed {}) with {} entries in {}", - humanReadableByteCountBin(Files.size(newtmpfile)), + timeWritingMillis += saveClock.elapsed().toMillis(); + long compressedSize = Files.size(newtmpfile); + LOG.info("Wrote batch of size {} (uncompressed {}) with {} entries in {} at {} MB/s", + humanReadableByteCountBin(compressedSize), humanReadableByteCountBin(textSize), - sortBuffer.size(), saveClock); + sortBuffer.size(), saveClock, + String.format("%2.2f", ((1.0 * compressedSize / saveClock.elapsed().toMillis()) * 1000) / FileUtils.ONE_MB) Review Comment: There's a risk of division by 0... What about adding another utility method for this division? ########## oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java: ########## @@ -276,21 +276,19 @@ public PipelinedStrategy(MongoDocumentStore documentStore, ); this.mongoDocQueueSize = mongoDocQueueReservedMemoryMB / mongoDocBatchMaxSizeMB; - // Transform threads <-> merge-sort + // Derived values for transform <-> sort-save + int nseWorkingMemoryMB = readNSEBuffersReservedMemory(); this.nseBuffersCount = 1 + numberOfTransformThreads; - - long nseBuffersReservedMemoryBytes = nseBuffersReservedMemoryMB * FileUtils.ONE_MB; + long nseWorkingMemoryBytes = nseWorkingMemoryMB * FileUtils.ONE_MB; Review Comment: FileUtils.ONE_MB is a "long", so this is fine... An option would be to cast nseWorkingMemoryMB to long here, so this is very clear. ########## oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java: ########## @@ -304,11 +302,21 @@ public PipelinedStrategy(MongoDocumentStore documentStore, mongoDocQueueReservedMemoryMB, mongoDocBatchMaxSizeMB, mongoDocQueueSize); - LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, numberOfBuffers: {}, bufferSize: {}, maxEntriesPerBuffer: {} ]", - nseBuffersReservedMemoryMB, + LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, numberOfBuffers: {}, bufferSize: {}, sortBufferReservedMemory: {} ]", + nseWorkingMemoryMB, nseBuffersCount, IOUtils.humanReadableByteCountBin(nseBuffersSizeBytes), - nseBufferMaxEntriesPerBuffer); + IOUtils.humanReadableByteCountBin(memoryReservedForSortKeysArray) + ); + } + + static long estimateMaxSizeOfSortKeyArray(long nseWorkingMemoryBytes, long nseBuffersCount, int sortBufferMemoryPercentage) { + // We reserve a percentage of the size of a buffer for the sort keys array. That is, we are assuming that for every line + // in the sort buffer, the memory needed to store the SortKey of the path section of the line will not be more + // than sortBufferMemoryPercentage of the total size of the line in average + // Estimate memory needed by the sort keys array. We assume each entry requires 256 bytes. + long approxNseBufferSize = limitToIntegerRange(nseWorkingMemoryBytes / nseBuffersCount); Review Comment: Possibly division by 0 ########## oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java: ########## @@ -276,21 +276,19 @@ public PipelinedStrategy(MongoDocumentStore documentStore, ); this.mongoDocQueueSize = mongoDocQueueReservedMemoryMB / mongoDocBatchMaxSizeMB; - // Transform threads <-> merge-sort + // Derived values for transform <-> sort-save + int nseWorkingMemoryMB = readNSEBuffersReservedMemory(); this.nseBuffersCount = 1 + numberOfTransformThreads; - - long nseBuffersReservedMemoryBytes = nseBuffersReservedMemoryMB * FileUtils.ONE_MB; + long nseWorkingMemoryBytes = nseWorkingMemoryMB * FileUtils.ONE_MB; // The working memory is divided in the following regions: // - #transforThreads NSE Binary buffers - // - 1x Metadata of NSE entries in Binary buffers, list of SortKeys - // A ByteBuffer can be at most Integer.MAX_VALUE bytes long - this.nseBuffersSizeBytes = limitToIntegerRange(nseBuffersReservedMemoryBytes / (nseBuffersCount + 1)); + // - x1 Memory reserved for the array created by the sort-batch thread with the keys of the entries + // in the batch that is being sorted + long memoryReservedForSortKeysArray = estimateMaxSizeOfSortKeyArray(nseWorkingMemoryBytes, nseBuffersCount, sortBufferMemoryPercentage); + long memoryReservedForBuffers = nseWorkingMemoryBytes - memoryReservedForSortKeysArray; - // Assuming 1 instance of SortKey takes around 256 bytes. We have #transformThreads + 1 regions of nseBufferSizeBytes. - // The extra region is for the SortKey instances. Below we compute the total number of SortKey instances that - // fit in the memory region reserved for them, assuming that each SortKey instance takes 256 bytes. Then we - // distribute equally these available entries among the nse buffers - this.nseBufferMaxEntriesPerBuffer = (this.nseBuffersSizeBytes / 256) / this.nseBuffersCount; + // A ByteBuffer can be at most Integer.MAX_VALUE bytes long + this.nseBuffersSizeBytes = limitToIntegerRange(memoryReservedForBuffers / nseBuffersCount); Review Comment: We had "/ (nseBuffersCount + 1)" before, which prevents division by 0. Maybe another utility method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org