This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 4dc60abf2e4b3064454bc30fadca2e23de0c4edd
Author: Ritik Raj <[email protected]>
AuthorDate: Fri Jul 4 01:06:35 2025 +0530

    [ASTERIXDB-3601][STO] Unpinning the not required segmentPages
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    In order to calculate the length/size of the columns,
    all the columns' offset is to be known, hence need to
    pin all the segment pages, irrespective of being asked
    by the query. But, after calculating the length of columns
    the segments are unpinned and the buffer is cleared.
    
    Ext-ref: MB-66306
    Change-Id: I915031e39a17ac853a24d78809b7742e2ea60163
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20016
    Reviewed-by: Peeyush Gupta <[email protected]>
    Tested-by: Ritik Raj <[email protected]>
---
 asterixdb/asterix-app/pom.xml                      |   3 +-
 .../test/cloud_storage/CloudStorageSparseTest.java | 130 +++++++++++++++++++++
 .../runtime/SqlppSinglePartitionExecutionTest.java |   8 ++
 ...d-storage.conf => cc-cloud-storage-sparse.conf} |   1 +
 .../src/test/resources/cc-cloud-storage.conf       |   1 +
 .../stream/in/MultiPageZeroByteBuffersReader.java  |  39 ++++++-
 .../column/zero/PageZeroWriterFlavorSelector.java  |   2 +-
 .../zero/readers/DefaultColumnPageZeroReader.java  |  15 ++-
 .../DefaultColumnMultiPageZeroReader.java          |  11 +-
 .../DefaultColumnMultiPageZeroWriter.java          |   8 +-
 .../multipage/SparseColumnMultiPageZeroReader.java |  11 +-
 .../am/lsm/btree/column/cloud/ColumnRanges.java    | 115 +++++++++---------
 .../buffercache/read/CloudColumnReadContext.java   |  18 ++-
 .../impls/btree/ColumnBTreeReadLeafFrame.java      |   4 +
 .../column/impls/btree/IColumnPageZeroReader.java  |   2 +
 .../tuples/ColumnMultiPageZeroBufferProvider.java  |  42 ++++++-
 16 files changed, 318 insertions(+), 92 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 9815d0e3ae..4910e8241f 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -491,7 +491,7 @@
       <id>asterix-gerrit-asterix-app</id>
       <properties>
         <test.excludes>
-          
**/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+          
**/CloudStorageTest.java,**/CloudStorageSparseTest,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
           
**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
           
**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
           
**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -612,6 +612,7 @@
       <properties>
         <test.includes>
           **/CloudStorageTest.java,
+          **/CloudStorageSparseTest.java,
           **/CloudStorageCancellationTest.java,
           **/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
         </test.includes>
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java
new file mode 100644
index 0000000000..b7a264b34b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageSparseTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final TestCaseContext tcCtx;
+    public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    public static final String CONFIG_FILE_NAME = 
"src/test/resources/cc-cloud-storage-sparse.conf";
+    public static final String DELTA_RESULT_PATH = "results_cloud";
+    public static final String EXCLUDED_TESTS = "MP";
+
+    public static final String PLAYGROUND_CONTAINER = "playground";
+    public static final String MOCK_SERVER_REGION = "us-west-2";
+    public static final int MOCK_SERVER_PORT = 8001;
+    public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:"; + 
MOCK_SERVER_PORT;
+
+    public CloudStorageSparseTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        setupEnv(testExecutor);
+    }
+
+    public static void setupEnv(TestExecutor testExecutor) throws Exception {
+        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
+        testExecutor.executorId = "cloud";
+        testExecutor.stripSubstring = "//DB:";
+        LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, 
CONFIG_FILE_NAME);
+
+        // create the playground bucket and leave it empty, just for external 
collection-based tests
+        S3ClientBuilder builder = S3Client.builder();
+        URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing 
to S3 mock server
+        
builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
+                .endpointOverride(endpoint);
+        S3Client client = builder.build();
+        
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
+        client.close();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+        LocalCloudUtilAdobeMock.shutdownSilently();
+    }
+
+    @Parameters(name = "CloudStorageSparseTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = 
tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || 
!EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+        IBufferCache bufferCache;
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) 
{
+            bufferCache = ((INcApplicationContext) 
nc.getApplicationContext()).getBufferCache();
+            Assert.assertTrue(((BufferCache) bufferCache).isClean());
+        }
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
index d0823c70e5..6f19393a1d 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
@@ -28,7 +28,10 @@ import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,6 +71,11 @@ public class SqlppSinglePartitionExecutionTest {
     @Test
     public void test() throws Exception {
         LangExecutionUtil.test(tcCtx);
+        IBufferCache bufferCache;
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) 
{
+            bufferCache = ((INcApplicationContext) 
nc.getApplicationContext()).getBufferCache();
+            Assert.assertTrue(((BufferCache) bufferCache).isClean());
+        }
     }
 
     private static void setNcEndpoints(TestExecutor testExecutor) {
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf
similarity index 98%
copy from asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
copy to asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf
index 551d2c4545..0842e0179b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf
@@ -42,6 +42,7 @@ jvm.args=-Xmx4096m 
--add-opens=jdk.management/com.sun.management.internal=ALL-UN
 storage.buffercache.size=128MB
 storage.memorycomponent.globalbudget=512MB
 storage.max.columns.in.zeroth.segment=800
+storage.page.zero.writer=sparse
 
 [cc]
 address = 127.0.0.1
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
index 551d2c4545..b3d1e2dc76 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
@@ -42,6 +42,7 @@ jvm.args=-Xmx4096m 
--add-opens=jdk.management/com.sun.management.internal=ALL-UN
 storage.buffercache.size=128MB
 storage.memorycomponent.globalbudget=512MB
 storage.max.columns.in.zeroth.segment=800
+storage.page.zero.writer=default
 
 [cc]
 address = 127.0.0.1
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
index 4a590730cd..1ab3052036 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
@@ -34,9 +34,12 @@ import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.ColumnMul
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
 
 public final class MultiPageZeroByteBuffersReader {
     private static final ByteBuffer EMPTY;
+    private final IntList notRequiredSegmentsIndexes;
     private ColumnMultiPageZeroBufferProvider bufferProvider;
     private final Int2IntMap segmentDir; // should I just create a 
buffer[numberOfSegments] instead?
     private int maxBuffersSize;
@@ -51,15 +54,15 @@ public final class MultiPageZeroByteBuffersReader {
     public MultiPageZeroByteBuffersReader() {
         this.buffers = new ArrayList<>();
         segmentDir = new Int2IntOpenHashMap();
+        notRequiredSegmentsIndexes = new IntArrayList();
         segmentDir.defaultReturnValue(-1);
     }
 
-    public void reset(IColumnBufferProvider bufferProvider) throws 
HyracksDataException {
-        ColumnMultiPageZeroBufferProvider pageZeroBufferProvider = 
(ColumnMultiPageZeroBufferProvider) bufferProvider;
+    public void reset(IColumnBufferProvider pageZeroBufferProvider) throws 
HyracksDataException {
         reset();
-        this.bufferProvider = pageZeroBufferProvider;
-        maxBuffersSize = pageZeroBufferProvider.getNumberOfRemainingPages();
-        pageZeroBufferProvider.readAll(buffers, segmentDir);
+        this.bufferProvider = (ColumnMultiPageZeroBufferProvider) 
pageZeroBufferProvider;
+        maxBuffersSize = bufferProvider.getNumberOfRemainingPages();
+        bufferProvider.readAll(buffers, segmentDir);
     }
 
     public void read(int segmentIndex, IPointable pointable, int position, int 
length)
@@ -147,6 +150,32 @@ public final class MultiPageZeroByteBuffersReader {
         }
     }
 
+    public void unPinNotRequiredSegments(BitSet pageZeroSegmentsPages, int 
numberOfPageZeroSegments)
+            throws HyracksDataException {
+        if (numberOfPageZeroSegments <= 1) {
+            // If there is only one segment, it is always pinned.
+            // So no need to unpin the segments.
+            return;
+        }
+        notRequiredSegmentsIndexes.clear();
+        // Start checking from index 1 (0th segment is always pinned)
+        int i = pageZeroSegmentsPages.nextClearBit(1);
+        while (i >= 1 && i < numberOfPageZeroSegments) {
+            int segmentIndex = i - 1; // Adjusted index for segmentDir
+
+            int bufferIndex = segmentDir.get(segmentIndex);
+            if (bufferIndex != -1) {
+                buffers.set(bufferIndex, EMPTY);
+                notRequiredSegmentsIndexes.add(bufferIndex);
+                segmentDir.remove(segmentIndex);
+            }
+
+            i = pageZeroSegmentsPages.nextClearBit(i + 1);
+        }
+        // Unpin the buffers that are not required anymore.
+        bufferProvider.releasePages(notRequiredSegmentsIndexes);
+    }
+
     public int findColumnIndexInSegment(int segmentIndex, int columnIndex, int 
numberOfColumnsInSegment)
             throws HyracksDataException {
         if (segmentIndex < 0 || segmentIndex >= maxBuffersSize) {
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
index 831e459ba6..5deead4544 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
@@ -50,7 +50,7 @@ import it.unimi.dsi.fastutil.bytes.Byte2ObjectArrayMap;
  * space requirements of each approach.
  */
 public class PageZeroWriterFlavorSelector implements 
IColumnPageZeroWriterFlavorSelector {
-    protected byte writerFlag = 
IColumnPageZeroWriter.ColumnPageZeroWriterType.ADAPTIVE.getWriterFlag();
+    protected byte writerFlag = 
IColumnPageZeroWriter.ColumnPageZeroWriterType.DEFAULT.getWriterFlag();
 
     // Cache of writer instances to avoid repeated object creation
     private final Byte2ObjectArrayMap<IColumnPageZeroWriter> writers;
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
index 3f729b8c00..d756a6cb5c 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.BitSet;
 
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
@@ -43,12 +44,11 @@ public class DefaultColumnPageZeroReader implements 
IColumnPageZeroReader {
     protected static Logger LOGGER = LogManager.getLogger();
 
     protected ByteBuffer pageZeroBuf;
-    protected BitSet pageZeroSegmentsPages;
+    protected static final BitSet EMPTY_SEGMENTS = new BitSet();
     protected int numberOfPresentColumns;
     protected int headerSize;
 
     public DefaultColumnPageZeroReader() {
-        this.pageZeroSegmentsPages = new BitSet();
     }
 
     @Override
@@ -172,7 +172,7 @@ public class DefaultColumnPageZeroReader implements 
IColumnPageZeroReader {
 
     @Override
     public BitSet getPageZeroSegmentsPages() {
-        return pageZeroSegmentsPages;
+        return EMPTY_SEGMENTS;
     }
 
     @Override
@@ -187,9 +187,12 @@ public class DefaultColumnPageZeroReader implements 
IColumnPageZeroReader {
 
     @Override
     public BitSet markRequiredPageSegments(BitSet projectedColumns, int 
pageZeroId, boolean markAll) {
-        pageZeroSegmentsPages.clear();
-        pageZeroSegmentsPages.set(0);
-        return pageZeroSegmentsPages;
+        return EMPTY_SEGMENTS;
+    }
+
+    @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException 
{
+        // No-OP
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
index d29daa7901..d4ffbb42b5 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
@@ -60,8 +60,8 @@ public class DefaultColumnMultiPageZeroReader extends 
AbstractColumnMultiPageZer
         super();
         zerothSegmentReader = new DefaultColumnPageZeroReader();
         this.pageZeroSegmentsPages = new BitSet();
-        this.maxNumberOfColumnsInAPage = bufferCapacity
-                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + 
DefaultColumnPageZeroWriter.FILTER_SIZE);
+        this.maxNumberOfColumnsInAPage =
+                
DefaultColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
         this.offsetPointable = new VoidPointable();
     }
 
@@ -256,7 +256,7 @@ public class DefaultColumnMultiPageZeroReader extends 
AbstractColumnMultiPageZer
         // Not marking the zeroth segment
         if (numberOfPageZeroSegments == 1 || markAll) {
             // mark all segments as required
-            pageZeroSegmentsPages.set(0, numberOfPageZeroSegments);
+            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
         } else {
             // Iterate over the projected columns and mark the segments that 
contain them
             int currentIndex = 
projectedColumns.nextSetBit(zerothSegmentMaxColumns);
@@ -278,6 +278,11 @@ public class DefaultColumnMultiPageZeroReader extends 
AbstractColumnMultiPageZer
         return pageZeroSegmentsPages;
     }
 
+    @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException 
{
+        segmentBuffers.unPinNotRequiredSegments(pageZeroSegmentsPages, 
numberOfPageZeroSegments);
+    }
+
     @Override
     public int getHeaderSize() {
         return EXTENDED_HEADER_SIZE;
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
index 7f16d5eaed..5c7d383b38 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
@@ -95,8 +95,7 @@ public class DefaultColumnMultiPageZeroWriter implements 
IColumnPageZeroWriter {
         segments = new 
MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef); // should this 
be populated at reset?
         this.zerothSegmentWriter = new DefaultColumnPageZeroWriter();
         this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
-        this.maximumNumberOfColumnsInAPage = bufferCapacity
-                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + 
DefaultColumnPageZeroWriter.FILTER_SIZE);
+        this.maximumNumberOfColumnsInAPage = 
getMaximumNumberOfColumnsInAPage(bufferCapacity);
     }
 
     @Override
@@ -263,4 +262,9 @@ public class DefaultColumnMultiPageZeroWriter implements 
IColumnPageZeroWriter {
     public int getHeaderSize() {
         return EXTENDED_HEADER_SIZE;
     }
+
+    public static int getMaximumNumberOfColumnsInAPage(int bufferCapacity) {
+        return bufferCapacity
+                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + 
DefaultColumnPageZeroWriter.FILTER_SIZE);
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
index 608ff7141e..035db5d6c3 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
@@ -64,8 +64,8 @@ public class SparseColumnMultiPageZeroReader extends 
AbstractColumnMultiPageZero
         super();
         zerothSegmentReader = new SparseColumnPageZeroReader();
         this.pageZeroSegmentsPages = new BitSet();
-        this.maxNumberOfColumnsInAPage = bufferCapacity
-                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + 
SparseColumnPageZeroWriter.FILTER_SIZE);
+        this.maxNumberOfColumnsInAPage =
+                
SparseColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
         this.offsetPointable = new VoidPointable();
         this.columnIndexToRelativeColumnIndex = new Int2IntOpenHashMap();
         columnIndexToRelativeColumnIndex.defaultReturnValue(-1);
@@ -346,7 +346,7 @@ public class SparseColumnMultiPageZeroReader extends 
AbstractColumnMultiPageZero
         // Not marking the zeroth segment
         if (numberOfPageZeroSegments == 1 || markAll) {
             // mark all segments as required
-            pageZeroSegmentsPages.set(0, numberOfPageZeroSegments);
+            pageZeroSegmentsPages.set(1, numberOfPageZeroSegments);
         } else {
             // Iterate over the projected columns and mark the segments that 
contain them
             int currentIndex = 
projectedColumns.nextSetBit(maxColumnIndexInZerothSegment + 1);
@@ -377,6 +377,11 @@ public class SparseColumnMultiPageZeroReader extends 
AbstractColumnMultiPageZero
         return pageZeroSegmentsPages;
     }
 
+    @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException 
{
+        segmentBuffers.unPinNotRequiredSegments(pageZeroSegmentsPages, 
numberOfPageZeroSegments);
+    }
+
     @Override
     public void printPageZeroReaderInfo() {
         ColumnarValueException ex = new ColumnarValueException();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index ba3cda4154..87b35bdf3b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -107,64 +107,69 @@ public final class ColumnRanges {
      */
     public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet 
requestedColumns, BitSet evictableColumns,
             BitSet cloudOnlyColumns) throws HyracksDataException {
-        // Set leafFrame
-        this.leafFrame = leafFrame;
-        // Ensure arrays capacities (given the leafFrame's columns and pages)
-        init();
-
-        // Set the first 32-bits to the offset and the second 32-bits to 
columnIndex
-        int numberOfPresentColumnsInLeaf = 
leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
-
-        // Set artificial offset to determine the last column's length
-        int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
-        offsetColumnIndexPairs[numberOfPresentColumnsInLeaf] =
-                IntPairUtil.of(megaLeafLength, numberOfPresentColumnsInLeaf);
-
-        // Sort the pairs by offset (i.e., lowest offset first)
-        LongArrays.stableSort(offsetColumnIndexPairs, 0, 
numberOfPresentColumnsInLeaf, OFFSET_COMPARATOR);
-
-        int columnOrdinal = 0;
-        for (int i = 0; i < numberOfPresentColumnsInLeaf; i++) {
-            if (offsetColumnIndexPairs[i] == 0) {
-                //Any requested column's offset can't be zero
-                //In case a column is not being present in the accessed 
pageZero segments, it will be defaulted to 0
-                continue;
-            }
-            int columnIndex = 
getColumnIndexFromPair(offsetColumnIndexPairs[i]);
-            int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
-            int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
-
-            // Compute the column's length in bytes (set 0 for PKs)
-            int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - 
offset;
-            // In case of sparse columns, few columnIndexes can be greater 
than the total sparse column count.
-            ensureCapacity(columnIndex);
-            lengths[columnIndex] = length;
-
-            // Get start page ID (given the computed length above)
-            int startPageId = getColumnStartPageIndex(columnIndex);
-            // Get the number of pages (given the computed length above)
-            int numberOfPages = getColumnNumberOfPages(columnIndex);
-
-            if (columnIndex >= numberOfPrimaryKeys && 
requestedColumns.get(columnIndex)) {
-                // Set column index
-                columnsOrder[columnOrdinal++] = columnIndex;
-                // Compute cloud-only and evictable pages
-                setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, 
evictableColumns, startPageId,
-                        numberOfPages);
-                // A requested column. Keep its pages as requested
-                continue;
-            }
+        try {
+            // Set leafFrame
+            this.leafFrame = leafFrame;
+            // Ensure arrays capacities (given the leafFrame's columns and 
pages)
+            init();
+
+            // Set the first 32-bits to the offset and the second 32-bits to 
columnIndex
+            int numberOfPresentColumnsInLeaf = 
leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
+
+            // Set artificial offset to determine the last column's length
+            int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
+            offsetColumnIndexPairs[numberOfPresentColumnsInLeaf] =
+                    IntPairUtil.of(megaLeafLength, 
numberOfPresentColumnsInLeaf);
+
+            // Sort the pairs by offset (i.e., lowest offset first)
+            LongArrays.stableSort(offsetColumnIndexPairs, 0, 
numberOfPresentColumnsInLeaf, OFFSET_COMPARATOR);
+
+            int columnOrdinal = 0;
+            for (int i = 0; i < numberOfPresentColumnsInLeaf; i++) {
+                if (offsetColumnIndexPairs[i] == 0) {
+                    //Any requested column's offset can't be zero
+                    //In case a column is not being present in the accessed 
pageZero segments, it will be defaulted to 0
+                    continue;
+                }
+                int columnIndex = 
getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+                int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+                int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 
1]);
+
+                // Compute the column's length in bytes (set 0 for PKs)
+                int length = columnIndex < numberOfPrimaryKeys ? 0 : 
nextOffset - offset;
+                // In case of sparse columns, few columnIndexes can be greater 
than the total sparse column count.
+                ensureCapacity(columnIndex);
+                lengths[columnIndex] = length;
+
+                // Get start page ID (given the computed length above)
+                int startPageId = getColumnStartPageIndex(columnIndex);
+                // Get the number of pages (given the computed length above)
+                int numberOfPages = getColumnNumberOfPages(columnIndex);
+
+                if (columnIndex >= numberOfPrimaryKeys && 
requestedColumns.get(columnIndex)) {
+                    // Set column index
+                    columnsOrder[columnOrdinal++] = columnIndex;
+                    // Compute cloud-only and evictable pages
+                    setCloudOnlyAndEvictablePages(columnIndex, 
cloudOnlyColumns, evictableColumns, startPageId,
+                            numberOfPages);
+                    // A requested column. Keep its pages as requested
+                    continue;
+                }
 
-            // Mark the page as non-evictable
-            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
-                nonEvictablePages.set(j);
+                // Mark the page as non-evictable
+                for (int j = startPageId; j < startPageId + numberOfPages; 
j++) {
+                    nonEvictablePages.set(j);
+                }
             }
-        }
 
-        // Bound the nonRequestedPages to the number of pages in the mega leaf 
node
-        nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
-        // to indicate the end
-        columnsOrder[columnOrdinal] = -1;
+            // Bound the nonRequestedPages to the number of pages in the mega 
leaf node
+            nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+            // to indicate the end
+            columnsOrder[columnOrdinal] = -1;
+        } finally {
+            //Unpin the not required segment pages
+            leafFrame.unPinNotRequiredPageZeroSegments();
+        }
     }
 
     /**
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 5458807f67..181aa060e1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -138,19 +138,17 @@ public final class CloudColumnReadContext implements 
IColumnReadContext {
         }
 
         // pin the required page segments
-        mergedPageRanges.clear();
+        //        mergedPageRanges.clear();
         int pageZeroId = leafFrame.getPageId();
-        // Pinning all the segments of the page zero for now,
+        // Pinning all the segments of the page zero
         // as the column eviction logic is based on the length of the columns 
which
         // gets evaluated from the page zero segments.
-
-        //TODO: find a way to pin only the segments that are required for the 
operation
-        // or pin all the segments and then unpin the segments that are not 
required
-        boolean markAll = true || operation == MERGE;
-        BitSet pageZeroSegmentRanges = 
leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, markAll);
-        // Merge the page zero segments ranges
-        mergePageZeroSegmentRanges(pageZeroSegmentRanges);
-        mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
+        BitSet pageZeroSegmentRanges =
+                leafFrame.markRequiredPageZeroSegments(projectedColumns, 
pageZeroId, operation == MERGE);
+        // will unpin the non-required segments after columnRanges.reset()
+        // can we do lazily?
+        int numberOfPageZeroSegments = leafFrame.getNumberOfPageZeroSegments();
+        pinAll(fileId, pageZeroId, numberOfPageZeroSegments - 1, bufferCache);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index f01b28c3db..33074238fb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -190,6 +190,10 @@ public final class ColumnBTreeReadLeafFrame extends 
AbstractColumnBTreeLeafFrame
         return columnPageZeroReader.markRequiredPageSegments(projectedColumns, 
pageZeroId, markAll);
     }
 
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException 
{
+        columnPageZeroReader.unPinNotRequiredPageZeroSegments();
+    }
+
     public void printPageZeroReaderInfo() {
         columnPageZeroReader.printPageZeroReaderInfo();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
index fa9b57a796..f57eba3916 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
@@ -77,4 +77,6 @@ public interface IColumnPageZeroReader {
     BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, 
boolean markAll);
 
     void printPageZeroReaderInfo();
+
+    void unPinNotRequiredPageZeroSegments() throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
index 1556beafc5..c60ca79afc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
@@ -32,6 +32,7 @@ import 
org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.IntList;
 import it.unimi.dsi.fastutil.longs.LongSet;
 
 public class ColumnMultiPageZeroBufferProvider implements 
IColumnBufferProvider {
@@ -75,14 +76,13 @@ public class ColumnMultiPageZeroBufferProvider implements 
IColumnBufferProvider
             // will do on request basis? or prefetch all the segments?
             return;
         }
-        // traverse the set segments and read the pages
-        int currentIndex = pageZeroSegmentsPages.nextSetBit(0);
-        while (currentIndex != -1) {
-            int segmentIndex = currentIndex - 1; // segmentIndex starts from 1
+        //Since all the pageSegments are pinned for calculating the lengths of 
the columns,
+        //read all the segments and store them in the buffers list.
+        //after ColumnRanges.reset(), unpin the segments that are not required.
+        for (int segmentIndex = 0; segmentIndex < numberOfRemainingPages; 
segmentIndex++) {
             ByteBuffer buffer = read(segmentIndex);
             segmentDir.put(segmentIndex, buffers.size());
             buffers.add(buffer);
-            currentIndex = pageZeroSegmentsPages.nextSetBit(currentIndex + 1);
         }
     }
 
@@ -97,11 +97,41 @@ public class ColumnMultiPageZeroBufferProvider implements 
IColumnBufferProvider
     @Override
     public void releaseAll() throws HyracksDataException {
         for (ICachedPage page : pages) {
-            multiPageOp.unpin(page);
+            if (page != null) {
+                multiPageOp.unpin(page);
+            }
         }
         pages.clear();
     }
 
+    public void releasePages(IntList notRequiredSegmentsIndexes) throws 
HyracksDataException {
+        //From the list of cached pages, remove those pages.
+        //Pages and buffers list are in sync, so we can use the same indexes.
+        Throwable th = null;
+        for (int pageIndex : notRequiredSegmentsIndexes) {
+            if (pageIndex < 0 || pageIndex >= pages.size()) {
+                throw new IndexOutOfBoundsException("Page index out of bounds: 
" + pageIndex);
+            }
+            try {
+                ICachedPage page = pages.get(pageIndex);
+                if (page != null) {
+                    multiPageOp.unpin(page);
+                    pinnedPages.remove(((CachedPage) page).getDiskPageId());
+                    pages.set(pageIndex, null); // Clear the reference
+                }
+            } catch (Exception e) {
+                if (th == null) {
+                    th = e;
+                } else {
+                    th.addSuppressed(e);
+                }
+            }
+        }
+        if (th != null) {
+            throw HyracksDataException.create(th);
+        }
+    }
+
     @Override
     public ByteBuffer getBuffer() {
         throw new UnsupportedOperationException("getBuffer() is not supported 
for multi-page zero buffer provider.");


Reply via email to