This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ea4ae3a1a8 [ASTERIXDB-3314][STO] Reduce buffer cache pressure on 
columnar
ea4ae3a1a8 is described below

commit ea4ae3a1a8df45d6f575400ce7da2d2099eaa416
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Fri Nov 17 12:58:43 2023 -0800

    [ASTERIXDB-3314][STO] Reduce buffer cache pressure on columnar
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Make max merging components count to 4 in columnar datasets
    - Fix not unpinning columnar filter pages
    - Allocate initial 32KB buffers for columnar writers
    
    Change-Id: I809109b232bc5a5db0c47a52cb98c838ff55e27f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17965
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../out/MultiTemporaryBufferBytesOutputStream.java | 11 +++++
 .../hyracks-storage-am-lsm-btree-column/pom.xml    |  4 ++
 .../column/impls/btree/ColumnBTreeBulkloader.java  | 47 ++++++++++++++++++++++
 .../impls/btree/ColumnBTreeRangeSearchCursor.java  |  2 +-
 .../lsm/tuples/AbstractColumnTupleReference.java   | 41 ++++++++++++++-----
 .../lsm/tuples/ColumnMultiBufferProvider.java      |  8 +++-
 .../storage/am/lsm/common/impls/LSMHarness.java    |  4 +-
 7 files changed, 104 insertions(+), 13 deletions(-)

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
index cf2808e0e2..38f73215d8 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
@@ -25,8 +25,11 @@ import java.nio.ByteBuffer;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.StorageUtil;
 
 public final class MultiTemporaryBufferBytesOutputStream extends 
AbstractMultiBufferBytesOutputStream {
+    private static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE);
+
     public 
MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef) {
         super(multiPageOpRef);
     }
@@ -38,6 +41,14 @@ public final class MultiTemporaryBufferBytesOutputStream 
extends AbstractMultiBu
 
     @Override
     protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+        if (buffers.isEmpty()) {
+            /*
+             * One buffer on the house to avoid confiscating a whole page for 
a tiny stream.
+             * This protects pressuring the buffer cache from confiscating 
pages for small columns. Think sparse
+             * columns, which may take only a few hundreds of bytes to write.
+             */
+            return ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+        }
         return multiPageOpRef.getValue().confiscateTemporary();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 8e0bc0cb8c..93eca7c711 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -95,5 +95,9 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 48bd18094c..3e72584a15 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -38,8 +38,11 @@ import 
org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements 
IColumnWriteMultiPageOp {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final List<CachedPage> columnsPages;
     private final List<CachedPage> tempConfiscatedPages;
     private final ColumnBTreeWriteLeafFrame columnarFrame;
@@ -48,6 +51,12 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
     private boolean setLowKey;
     private int tupleCount;
 
+    // For logging
+    private int numberOfLeafNodes;
+    private int numberOfPagesInCurrentLeafNode;
+    private int maxNumberOfPagesForAColumn;
+    private int maxNumberOfPagesInALeafNode;
+
     public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput, 
IPageWriteCallback callback, ITreeIndex index,
             ITreeIndexFrame leafFrame) throws HyracksDataException {
         super(fillFactor, verifyInput, callback, index, leafFrame);
@@ -59,6 +68,12 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
         lowKey = new BTreeSplitKey(tupleWriter.createTupleReference());
         lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
         setLowKey = true;
+
+        // For logging. Starts with 1 for page0
+        numberOfPagesInCurrentLeafNode = 1;
+        maxNumberOfPagesForAColumn = 0;
+        maxNumberOfPagesInALeafNode = 0;
+        numberOfLeafNodes = 1;
     }
 
     @Override
@@ -118,9 +133,14 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
         for (ICachedPage page : tempConfiscatedPages) {
             bufferCache.returnPage(page, false);
         }
+
+        // For logging
+        int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
         tempConfiscatedPages.clear();
         //Where Page0 and columns pages will be written
         super.end();
+
+        log("Finished");
     }
 
     @Override
@@ -156,6 +176,12 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
         splitKey.setRightPage(leafFrontier.pageId);
         setLowKey = true;
         tupleCount = 0;
+
+        // For logging
+        maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, 
numberOfPagesInCurrentLeafNode);
+        // Starts with 1 for page0
+        numberOfPagesInCurrentLeafNode = 1;
+        numberOfLeafNodes++;
     }
 
     @Override
@@ -172,6 +198,12 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
         for (ICachedPage c : columnsPages) {
             write(c);
         }
+
+        // For logging
+        int numberOfPagesInPersistedColumn = columnsPages.size();
+        maxNumberOfPagesForAColumn = Math.max(maxNumberOfPagesForAColumn, 
numberOfPagesInPersistedColumn);
+        numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn;
+
         columnsPages.clear();
     }
 
@@ -185,6 +217,9 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
             bufferCache.returnPage(page, false);
         }
         super.abort();
+
+        // For logging
+        log("Aborted");
     }
 
     private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) {
@@ -193,6 +228,18 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
         tupleWriter.writeTupleFields(tuple, 0, cmp.getKeyFieldCount(), 
splitKey.getBuffer().array(), 0);
     }
 
+    private void log(String status) {
+        if (!LOGGER.isDebugEnabled()) {
+            return;
+        }
+
+        int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
+        LOGGER.debug(
+                "{} columnar bulkloader used leafNodes: {}, 
tempPagesAllocated: {}, maxPagesPerColumn: {}, and maxLeafNodePages: {}",
+                status, numberOfLeafNodes, numberOfTempConfiscatedPages, 
maxNumberOfPagesForAColumn,
+                maxNumberOfPagesInALeafNode);
+    }
+
     /*
      * ***********************************************************
      * IColumnWriteMultiPageOp
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index fd726cdca2..39952dff61 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -256,8 +256,8 @@ public class ColumnBTreeRangeSearchCursor extends 
EnforcedIndexCursor
 
     @Override
     public void doClose() throws HyracksDataException {
-        frameTuple.close();
         releasePages();
+        frameTuple.close();
         page0 = null;
         pred = null;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index e638a4ad60..3923025838 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -32,6 +32,9 @@ import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeRea
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
 public abstract class AbstractColumnTupleReference implements 
IColumnTupleIterator {
     private static final Logger LOGGER = LogManager.getLogger();
     private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not 
supported for column tuples";
@@ -41,11 +44,15 @@ public abstract class AbstractColumnTupleReference 
implements IColumnTupleIterat
     private final IColumnBufferProvider[] filterBufferProviders;
     private final IColumnBufferProvider[] buffersProviders;
     private final int numberOfPrimaryKeys;
-    private int totalNumberOfMegaLeafNodes;
-    private int numOfSkippedMegaLeafNodes;
     private int endIndex;
     protected int tupleIndex;
 
+    // For logging
+    private final LongSet pinnedPages;
+    private int totalNumberOfMegaLeafNodes;
+    private int numOfSkippedMegaLeafNodes;
+    private int maxNumberOfPinnedPages;
+
     /**
      * Column tuple reference
      *
@@ -64,6 +71,7 @@ public abstract class AbstractColumnTupleReference implements 
IColumnTupleIterat
             primaryKeyBufferProviders[i] = new ColumnSingleBufferProvider(i);
         }
 
+        pinnedPages = new LongOpenHashSet();
         int numberOfFilteredColumns = info.getNumberOfFilteredColumns();
         filterBufferProviders = new 
IColumnBufferProvider[numberOfFilteredColumns];
         for (int i = 0; i < numberOfFilteredColumns; i++) {
@@ -71,7 +79,7 @@ public abstract class AbstractColumnTupleReference implements 
IColumnTupleIterat
             if (columnIndex < 0) {
                 filterBufferProviders[i] = DummyColumnBufferProvider.INSTANCE;
             } else if (columnIndex >= numberOfPrimaryKeys) {
-                filterBufferProviders[i] = new 
ColumnMultiBufferProvider(columnIndex, multiPageOp);
+                filterBufferProviders[i] = new 
ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
             } else {
                 filterBufferProviders[i] = new 
ColumnSingleBufferProvider(columnIndex);
             }
@@ -82,7 +90,7 @@ public abstract class AbstractColumnTupleReference implements 
IColumnTupleIterat
         for (int i = 0; i < numberOfRequestedColumns; i++) {
             int columnIndex = info.getColumnIndex(i);
             if (columnIndex >= numberOfPrimaryKeys) {
-                buffersProviders[i] = new 
ColumnMultiBufferProvider(columnIndex, multiPageOp);
+                buffersProviders[i] = new 
ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
             } else {
                 buffersProviders[i] = DummyColumnBufferProvider.INSTANCE;
             }
@@ -116,6 +124,8 @@ public abstract class AbstractColumnTupleReference 
implements IColumnTupleIterat
         int numberOfTuples = frame.getTupleCount();
         //Start new page and check whether we should skip reading non-key 
columns or not
         boolean readColumnPages = startNewPage(pageZero, 
frame.getNumberOfColumns(), numberOfTuples);
+        //Release previous pinned pages if any
+        unpinColumnsPages();
         /*
          * When startIndex = 0, a call to next() is performed to get the 
information of the PK
          * and 0 skips will be performed. If startIndex (for example) is 5, a 
call to next() will be performed
@@ -125,8 +135,6 @@ public abstract class AbstractColumnTupleReference 
implements IColumnTupleIterat
         if (readColumnPages) {
             for (int i = 0; i < filterBufferProviders.length; i++) {
                 IColumnBufferProvider provider = filterBufferProviders[i];
-                //Release previous pinned pages if any
-                provider.releaseAll();
                 provider.reset(frame);
                 startColumnFilter(provider, i, numberOfTuples);
             }
@@ -135,11 +143,10 @@ public abstract class AbstractColumnTupleReference 
implements IColumnTupleIterat
         if (readColumnPages && evaluateFilter()) {
             for (int i = 0; i < buffersProviders.length; i++) {
                 IColumnBufferProvider provider = buffersProviders[i];
-                //Release previous pinned pages if any
-                provider.releaseAll();
                 provider.reset(frame);
                 startColumn(provider, i, numberOfTuples);
             }
+
             /*
              * skipCount can be < 0 for cases when the tuples in the range [0, 
startIndex] are all anti-matters.
              * Consequently, tuples in the range [0, startIndex] do not have 
any non-key columns. Thus, the returned
@@ -150,6 +157,7 @@ public abstract class AbstractColumnTupleReference 
implements IColumnTupleIterat
         } else {
             numOfSkippedMegaLeafNodes++;
         }
+
         totalNumberOfMegaLeafNodes++;
     }
 
@@ -232,17 +240,30 @@ public abstract class AbstractColumnTupleReference 
implements IColumnTupleIterat
 
     @Override
     public final void unpinColumnsPages() throws HyracksDataException {
+        for (int i = 0; i < filterBufferProviders.length; i++) {
+            filterBufferProviders[i].releaseAll();
+        }
+
         for (int i = 0; i < buffersProviders.length; i++) {
             buffersProviders[i].releaseAll();
         }
+
+        maxNumberOfPinnedPages = Math.max(maxNumberOfPinnedPages, 
pinnedPages.size());
+        pinnedPages.clear();
     }
 
     @Override
     public final void close() {
-        if (LOGGER.isInfoEnabled() && numOfSkippedMegaLeafNodes > 0) {
-            LOGGER.info("Filtered {} disk mega-leaf nodes out of {} in total", 
numOfSkippedMegaLeafNodes,
+        if (!LOGGER.isDebugEnabled()) {
+            return;
+        }
+
+        if (numOfSkippedMegaLeafNodes > 0) {
+            LOGGER.debug("Filtered {} disk mega-leaf nodes out of {} in 
total", numOfSkippedMegaLeafNodes,
                     totalNumberOfMegaLeafNodes);
         }
+
+        LOGGER.debug("Max number of pinned pages is {}", 
maxNumberOfPinnedPages + 1);
     }
 
     /* *************************************************************
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 0c17d6ba9e..34ec856a1e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -26,20 +26,25 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
+import it.unimi.dsi.fastutil.longs.LongSet;
+
 public final class ColumnMultiBufferProvider implements IColumnBufferProvider {
     private final int columnIndex;
     private final IColumnReadMultiPageOp multiPageOp;
     private final Queue<ICachedPage> pages;
+    private final LongSet pinnedPages;
     private int numberOfPages;
     private int startPage;
     private int startOffset;
     private int length;
 
-    public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp 
multiPageOp) {
+    public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp 
multiPageOp, LongSet pinnedPages) {
         this.columnIndex = columnIndex;
         this.multiPageOp = multiPageOp;
+        this.pinnedPages = pinnedPages;
         pages = new ArrayDeque<>();
     }
 
@@ -107,6 +112,7 @@ public final class ColumnMultiBufferProvider implements 
IColumnBufferProvider {
     private ByteBuffer readNext() throws HyracksDataException {
         ICachedPage columnPage = multiPageOp.pin(startPage++);
         pages.add(columnPage);
+        pinnedPages.add(((CachedPage) columnPage).getDiskPageId());
         return columnPage.getBuffer();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 9fcce8b361..461d4167c2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -583,7 +583,9 @@ public class LSMHarness implements ILSMHarness {
     @Override
     public void merge(ILSMIOOperation operation) throws HyracksDataException {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Started a merge operation for index: {}", lsmIndex);
+            MergeOperation mergeOp = (MergeOperation) operation;
+            LOGGER.debug("Started a merge operation (number of merging 
components {}) for index: {}",
+                    mergeOp.getMergingComponents().size(), lsmIndex);
         }
         synchronized (opTracker) {
             enterComponents(operation.getAccessor().getOpContext(), 
LSMOperationType.MERGE);

Reply via email to