This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new ea4ae3a1a8 [ASTERIXDB-3314][STO] Reduce buffer cache pressure on
columnar
ea4ae3a1a8 is described below
commit ea4ae3a1a8df45d6f575400ce7da2d2099eaa416
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Fri Nov 17 12:58:43 2023 -0800
[ASTERIXDB-3314][STO] Reduce buffer cache pressure on columnar
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Make max merging components count to 4 in columnar datasets
- Fix not unpinning columnar filter pages
- Allocate initial 32KB buffers for columnar writers
Change-Id: I809109b232bc5a5db0c47a52cb98c838ff55e27f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17965
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../out/MultiTemporaryBufferBytesOutputStream.java | 11 +++++
.../hyracks-storage-am-lsm-btree-column/pom.xml | 4 ++
.../column/impls/btree/ColumnBTreeBulkloader.java | 47 ++++++++++++++++++++++
.../impls/btree/ColumnBTreeRangeSearchCursor.java | 2 +-
.../lsm/tuples/AbstractColumnTupleReference.java | 41 ++++++++++++++-----
.../lsm/tuples/ColumnMultiBufferProvider.java | 8 +++-
.../storage/am/lsm/common/impls/LSMHarness.java | 4 +-
7 files changed, 104 insertions(+), 13 deletions(-)
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
index cf2808e0e2..38f73215d8 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
@@ -25,8 +25,11 @@ import java.nio.ByteBuffer;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.StorageUtil;
public final class MultiTemporaryBufferBytesOutputStream extends
AbstractMultiBufferBytesOutputStream {
+ private static final int INITIAL_BUFFER_SIZE =
StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE);
+
public
MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp>
multiPageOpRef) {
super(multiPageOpRef);
}
@@ -38,6 +41,14 @@ public final class MultiTemporaryBufferBytesOutputStream
extends AbstractMultiBu
@Override
protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+ if (buffers.isEmpty()) {
+ /*
+ * One buffer on the house to avoid confiscating a whole page for
a tiny stream.
+ * This protects pressuring the buffer cache from confiscating
pages for small columns. Think sparse
+ * columns, which may take only a few hundreds of bytes to write.
+ */
+ return ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+ }
return multiPageOpRef.getValue().confiscateTemporary();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 8e0bc0cb8c..93eca7c711 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -95,5 +95,9 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 48bd18094c..3e72584a15 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -38,8 +38,11 @@ import
org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements
IColumnWriteMultiPageOp {
+ private static final Logger LOGGER = LogManager.getLogger();
private final List<CachedPage> columnsPages;
private final List<CachedPage> tempConfiscatedPages;
private final ColumnBTreeWriteLeafFrame columnarFrame;
@@ -48,6 +51,12 @@ public final class ColumnBTreeBulkloader extends
BTreeNSMBulkLoader implements I
private boolean setLowKey;
private int tupleCount;
+ // For logging
+ private int numberOfLeafNodes;
+ private int numberOfPagesInCurrentLeafNode;
+ private int maxNumberOfPagesForAColumn;
+ private int maxNumberOfPagesInALeafNode;
+
public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput,
IPageWriteCallback callback, ITreeIndex index,
ITreeIndexFrame leafFrame) throws HyracksDataException {
super(fillFactor, verifyInput, callback, index, leafFrame);
@@ -59,6 +68,12 @@ public final class ColumnBTreeBulkloader extends
BTreeNSMBulkLoader implements I
lowKey = new BTreeSplitKey(tupleWriter.createTupleReference());
lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
setLowKey = true;
+
+ // For logging. Starts with 1 for page0
+ numberOfPagesInCurrentLeafNode = 1;
+ maxNumberOfPagesForAColumn = 0;
+ maxNumberOfPagesInALeafNode = 0;
+ numberOfLeafNodes = 1;
}
@Override
@@ -118,9 +133,14 @@ public final class ColumnBTreeBulkloader extends
BTreeNSMBulkLoader implements I
for (ICachedPage page : tempConfiscatedPages) {
bufferCache.returnPage(page, false);
}
+
+ // For logging
+ int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
tempConfiscatedPages.clear();
//Where Page0 and columns pages will be written
super.end();
+
+ log("Finished");
}
@Override
@@ -156,6 +176,12 @@ public final class ColumnBTreeBulkloader extends
BTreeNSMBulkLoader implements I
splitKey.setRightPage(leafFrontier.pageId);
setLowKey = true;
tupleCount = 0;
+
+ // For logging
+ maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode,
numberOfPagesInCurrentLeafNode);
+ // Starts with 1 for page0
+ numberOfPagesInCurrentLeafNode = 1;
+ numberOfLeafNodes++;
}
@Override
@@ -172,6 +198,12 @@ public final class ColumnBTreeBulkloader extends
BTreeNSMBulkLoader implements I
for (ICachedPage c : columnsPages) {
write(c);
}
+
+ // For logging
+ int numberOfPagesInPersistedColumn = columnsPages.size();
+ maxNumberOfPagesForAColumn = Math.max(maxNumberOfPagesForAColumn,
numberOfPagesInPersistedColumn);
+ numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn;
+
columnsPages.clear();
}
@@ -185,6 +217,9 @@ public final class ColumnBTreeBulkloader extends
BTreeNSMBulkLoader implements I
bufferCache.returnPage(page, false);
}
super.abort();
+
+ // For logging
+ log("Aborted");
}
private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) {
@@ -193,6 +228,18 @@ public final class ColumnBTreeBulkloader extends
BTreeNSMBulkLoader implements I
tupleWriter.writeTupleFields(tuple, 0, cmp.getKeyFieldCount(),
splitKey.getBuffer().array(), 0);
}
+ private void log(String status) {
+ if (!LOGGER.isDebugEnabled()) {
+ return;
+ }
+
+ int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
+ LOGGER.debug(
+ "{} columnar bulkloader used leafNodes: {},
tempPagesAllocated: {}, maxPagesPerColumn: {}, and maxLeafNodePages: {}",
+ status, numberOfLeafNodes, numberOfTempConfiscatedPages,
maxNumberOfPagesForAColumn,
+ maxNumberOfPagesInALeafNode);
+ }
+
/*
* ***********************************************************
* IColumnWriteMultiPageOp
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index fd726cdca2..39952dff61 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -256,8 +256,8 @@ public class ColumnBTreeRangeSearchCursor extends
EnforcedIndexCursor
@Override
public void doClose() throws HyracksDataException {
- frameTuple.close();
releasePages();
+ frameTuple.close();
page0 = null;
pred = null;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index e638a4ad60..3923025838 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -32,6 +32,9 @@ import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeRea
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
public abstract class AbstractColumnTupleReference implements
IColumnTupleIterator {
private static final Logger LOGGER = LogManager.getLogger();
private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not
supported for column tuples";
@@ -41,11 +44,15 @@ public abstract class AbstractColumnTupleReference
implements IColumnTupleIterat
private final IColumnBufferProvider[] filterBufferProviders;
private final IColumnBufferProvider[] buffersProviders;
private final int numberOfPrimaryKeys;
- private int totalNumberOfMegaLeafNodes;
- private int numOfSkippedMegaLeafNodes;
private int endIndex;
protected int tupleIndex;
+ // For logging
+ private final LongSet pinnedPages;
+ private int totalNumberOfMegaLeafNodes;
+ private int numOfSkippedMegaLeafNodes;
+ private int maxNumberOfPinnedPages;
+
/**
* Column tuple reference
*
@@ -64,6 +71,7 @@ public abstract class AbstractColumnTupleReference implements
IColumnTupleIterat
primaryKeyBufferProviders[i] = new ColumnSingleBufferProvider(i);
}
+ pinnedPages = new LongOpenHashSet();
int numberOfFilteredColumns = info.getNumberOfFilteredColumns();
filterBufferProviders = new
IColumnBufferProvider[numberOfFilteredColumns];
for (int i = 0; i < numberOfFilteredColumns; i++) {
@@ -71,7 +79,7 @@ public abstract class AbstractColumnTupleReference implements
IColumnTupleIterat
if (columnIndex < 0) {
filterBufferProviders[i] = DummyColumnBufferProvider.INSTANCE;
} else if (columnIndex >= numberOfPrimaryKeys) {
- filterBufferProviders[i] = new
ColumnMultiBufferProvider(columnIndex, multiPageOp);
+ filterBufferProviders[i] = new
ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
} else {
filterBufferProviders[i] = new
ColumnSingleBufferProvider(columnIndex);
}
@@ -82,7 +90,7 @@ public abstract class AbstractColumnTupleReference implements
IColumnTupleIterat
for (int i = 0; i < numberOfRequestedColumns; i++) {
int columnIndex = info.getColumnIndex(i);
if (columnIndex >= numberOfPrimaryKeys) {
- buffersProviders[i] = new
ColumnMultiBufferProvider(columnIndex, multiPageOp);
+ buffersProviders[i] = new
ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages);
} else {
buffersProviders[i] = DummyColumnBufferProvider.INSTANCE;
}
@@ -116,6 +124,8 @@ public abstract class AbstractColumnTupleReference
implements IColumnTupleIterat
int numberOfTuples = frame.getTupleCount();
//Start new page and check whether we should skip reading non-key
columns or not
boolean readColumnPages = startNewPage(pageZero,
frame.getNumberOfColumns(), numberOfTuples);
+ //Release previous pinned pages if any
+ unpinColumnsPages();
/*
* When startIndex = 0, a call to next() is performed to get the
information of the PK
* and 0 skips will be performed. If startIndex (for example) is 5, a
call to next() will be performed
@@ -125,8 +135,6 @@ public abstract class AbstractColumnTupleReference
implements IColumnTupleIterat
if (readColumnPages) {
for (int i = 0; i < filterBufferProviders.length; i++) {
IColumnBufferProvider provider = filterBufferProviders[i];
- //Release previous pinned pages if any
- provider.releaseAll();
provider.reset(frame);
startColumnFilter(provider, i, numberOfTuples);
}
@@ -135,11 +143,10 @@ public abstract class AbstractColumnTupleReference
implements IColumnTupleIterat
if (readColumnPages && evaluateFilter()) {
for (int i = 0; i < buffersProviders.length; i++) {
IColumnBufferProvider provider = buffersProviders[i];
- //Release previous pinned pages if any
- provider.releaseAll();
provider.reset(frame);
startColumn(provider, i, numberOfTuples);
}
+
/*
* skipCount can be < 0 for cases when the tuples in the range [0,
startIndex] are all anti-matters.
* Consequently, tuples in the range [0, startIndex] do not have
any non-key columns. Thus, the returned
@@ -150,6 +157,7 @@ public abstract class AbstractColumnTupleReference
implements IColumnTupleIterat
} else {
numOfSkippedMegaLeafNodes++;
}
+
totalNumberOfMegaLeafNodes++;
}
@@ -232,17 +240,30 @@ public abstract class AbstractColumnTupleReference
implements IColumnTupleIterat
@Override
public final void unpinColumnsPages() throws HyracksDataException {
+ for (int i = 0; i < filterBufferProviders.length; i++) {
+ filterBufferProviders[i].releaseAll();
+ }
+
for (int i = 0; i < buffersProviders.length; i++) {
buffersProviders[i].releaseAll();
}
+
+ maxNumberOfPinnedPages = Math.max(maxNumberOfPinnedPages,
pinnedPages.size());
+ pinnedPages.clear();
}
@Override
public final void close() {
- if (LOGGER.isInfoEnabled() && numOfSkippedMegaLeafNodes > 0) {
- LOGGER.info("Filtered {} disk mega-leaf nodes out of {} in total",
numOfSkippedMegaLeafNodes,
+ if (!LOGGER.isDebugEnabled()) {
+ return;
+ }
+
+ if (numOfSkippedMegaLeafNodes > 0) {
+ LOGGER.debug("Filtered {} disk mega-leaf nodes out of {} in
total", numOfSkippedMegaLeafNodes,
totalNumberOfMegaLeafNodes);
}
+
+ LOGGER.debug("Max number of pinned pages is {}",
maxNumberOfPinnedPages + 1);
}
/* *************************************************************
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 0c17d6ba9e..34ec856a1e 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -26,20 +26,25 @@ import
org.apache.hyracks.api.exceptions.HyracksDataException;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
public final class ColumnMultiBufferProvider implements IColumnBufferProvider {
private final int columnIndex;
private final IColumnReadMultiPageOp multiPageOp;
private final Queue<ICachedPage> pages;
+ private final LongSet pinnedPages;
private int numberOfPages;
private int startPage;
private int startOffset;
private int length;
- public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp
multiPageOp) {
+ public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp
multiPageOp, LongSet pinnedPages) {
this.columnIndex = columnIndex;
this.multiPageOp = multiPageOp;
+ this.pinnedPages = pinnedPages;
pages = new ArrayDeque<>();
}
@@ -107,6 +112,7 @@ public final class ColumnMultiBufferProvider implements
IColumnBufferProvider {
private ByteBuffer readNext() throws HyracksDataException {
ICachedPage columnPage = multiPageOp.pin(startPage++);
pages.add(columnPage);
+ pinnedPages.add(((CachedPage) columnPage).getDiskPageId());
return columnPage.getBuffer();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 9fcce8b361..461d4167c2 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -583,7 +583,9 @@ public class LSMHarness implements ILSMHarness {
@Override
public void merge(ILSMIOOperation operation) throws HyracksDataException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Started a merge operation for index: {}", lsmIndex);
+ MergeOperation mergeOp = (MergeOperation) operation;
+ LOGGER.debug("Started a merge operation (number of merging
components {}) for index: {}",
+ mergeOp.getMergingComponents().size(), lsmIndex);
}
synchronized (opTracker) {
enterComponents(operation.getAccessor().getOpContext(),
LSMOperationType.MERGE);