This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 015240e426 [ASTERIXDB-3421][STO] Multiple fixes in cloud disk caching
015240e426 is described below

commit 015240e42627a3ea65b88826a1a811a9f3dcf135
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Fri Jun 7 15:23:43 2024 -0700

    [ASTERIXDB-3421][STO] Multiple fixes in cloud disk caching
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Metadata partition should not be evicted
    - Buffer cache read context should call onPin(ICachedPage) in a 
synchronized block on the pinning page
    - Written pages should contain the compressed page size and offset
    - Multiple issues when calculating page IDs of the requested columns
    
    Change-Id: Ic94cc1e63ee4618b18c6d4c6e3e74101a7753400
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18352
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../apache/asterix/cloud/CloudConfigurator.java    |  6 +-
 .../cloud/clients/aws/s3/S3CloudClient.java        |  4 +-
 .../lazy/accessor/SelectiveCloudAccessor.java      |  4 ++
 .../common/context/DatasetLifecycleManager.java    |  2 +-
 .../CloudDiskResourceCacheLockNotifier.java        | 40 +++++++------
 .../cache/service/DiskCacheSweeperThread.java      |  5 +-
 .../service/IEvictableLocalResourceFilter.java}    | 23 ++++----
 .../org/apache/hyracks/cloud/sweeper/ISweeper.java | 35 ------------
 .../org/apache/hyracks/cloud/sweeper/Sweeper.java  |  3 +-
 .../am/lsm/btree/column/cloud/ColumnRanges.java    | 17 ++++--
 .../buffercache/read/CloudColumnReadContext.java   | 66 ++++++++++++++++------
 .../buffercache/read/CloudMegaPageReadContext.java | 54 +++++++++++-------
 .../lsm/tuples/ColumnMultiBufferProvider.java      | 14 ++---
 .../am/lsm/btree/column/utils/ColumnUtil.java      |  4 +-
 .../buffercache/AbstractBufferedFileIOManager.java |  5 ++
 .../storage/common/buffercache/BufferCache.java    |  5 +-
 .../storage/common/buffercache/CachedPage.java     |  8 +++
 .../disk/IDiskResourceCacheLockNotifier.java       |  4 +-
 .../disk/NoOpDiskResourceCacheLockNotifier.java    |  2 +-
 .../storage/common/file/BufferedFileHandle.java    |  7 ++-
 .../common/file/CompressedBufferedFileHandle.java  |  8 ++-
 21 files changed, 185 insertions(+), 131 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index 0f9a4de16c..3c0f1df00e 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -28,6 +28,7 @@ import org.apache.asterix.common.cloud.CloudCachePolicy;
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,6 +38,7 @@ import 
org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
 import 
org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
 import 
org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
 import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.cache.service.IEvictableLocalResourceFilter;
 import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -55,6 +57,8 @@ import 
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public final class CloudConfigurator {
+    private static final IEvictableLocalResourceFilter FILTER =
+            (x -> StoragePathUtil.getPartitionNumFromRelativePath(x.getPath()) 
!= StorageConstants.METADATA_PARTITION);
     private final CloudProperties cloudProperties;
     private final IOManager localIoManager;
     private final AbstractCloudIOManager cloudIOManager;
@@ -157,7 +161,7 @@ public final class CloudConfigurator {
 
     private static IDiskResourceCacheLockNotifier createLockNotifier(boolean 
diskCacheManagerRequired) {
         if (diskCacheManagerRequired) {
-            return new 
CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
+            return new CloudDiskResourceCacheLockNotifier(FILTER);
         }
 
         return NoOpDiskResourceCacheLockNotifier.INSTANCE;
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index f395362c16..97169db507 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -120,7 +120,7 @@ public final class S3CloudClient implements ICloudClient {
     public int read(String bucket, String path, long offset, ByteBuffer 
buffer) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
-        long readTo = offset + buffer.remaining();
+        long readTo = offset + buffer.remaining() - 1;
         GetObjectRequest rangeGetObjectRequest =
                 GetObjectRequest.builder().range("bytes=" + offset + "-" + 
readTo).bucket(bucket).key(path).build();
 
@@ -163,7 +163,7 @@ public final class S3CloudClient implements ICloudClient {
     public InputStream getObjectStream(String bucket, String path, long 
offset, long length) {
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
-        long readTo = offset + length;
+        long readTo = offset + length - 1;
         GetObjectRequest getReq =
                 GetObjectRequest.builder().range("bytes=" + offset + "-" + 
readTo).bucket(bucket).key(path).build();
         try {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index 218015d5d4..d8aecc110b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -48,6 +48,10 @@ public class SelectiveCloudAccessor extends 
ReplaceableCloudAccessor {
 
     @Override
     public void doEvict(FileReference directory) throws HyracksDataException {
+        if (!localIoManager.exists(directory)) {
+            return;
+        }
+
         if (!directory.getFile().isDirectory()) {
             throw new IllegalStateException(directory + " is not a directory");
         }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3e0fbd43e9..43b5d1b10e 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -123,11 +123,11 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         int did = getDIDfromResourcePath(resourcePath);
         LocalResource resource = resourceRepository.get(resourcePath);
         DatasetResource datasetResource = datasets.get(did);
+        lockNotifier.onRegister(resource, index);
         if (datasetResource == null) {
             datasetResource = getDatasetLifecycle(did);
         }
         datasetResource.register(resource, (ILSMIndex) index);
-        lockNotifier.onRegister(resource, index, 
datasetResource.getIndexInfo(resource.getId()).getPartition());
     }
 
     private int getDIDfromResourcePath(String resourcePath) throws 
HyracksDataException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 036a812bb1..fc55c7c6fd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -38,14 +38,14 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 
 public final class CloudDiskResourceCacheLockNotifier implements 
IDiskResourceCacheLockNotifier {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final int metadataPartition;
+    private final IEvictableLocalResourceFilter filter;
     private final Long2ObjectMap<LocalResource> inactiveResources;
     private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes;
     private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes;
     private final ReentrantReadWriteLock evictionLock;
 
-    public CloudDiskResourceCacheLockNotifier(int metadataPartition) {
-        this.metadataPartition = metadataPartition;
+    public CloudDiskResourceCacheLockNotifier(IEvictableLocalResourceFilter 
filter) {
+        this.filter = filter;
         inactiveResources = Long2ObjectMaps.synchronize(new 
Long2ObjectOpenHashMap<>());
         unsweepableIndexes = Long2ObjectMaps.synchronize(new 
Long2ObjectOpenHashMap<>());
         sweepableIndexes = Long2ObjectMaps.synchronize(new 
Long2ObjectOpenHashMap<>());
@@ -53,19 +53,19 @@ public final class CloudDiskResourceCacheLockNotifier 
implements IDiskResourceCa
     }
 
     @Override
-    public void onRegister(LocalResource localResource, IIndex index, int 
partition) {
+    public void onRegister(LocalResource localResource, IIndex index) {
         ILSMIndex lsmIndex = (ILSMIndex) index;
         evictionLock.readLock().lock();
         try {
-            if (partition != metadataPartition) {
+            if (filter.accept(localResource)) {
                 long resourceId = localResource.getId();
                 if (lsmIndex.getDiskCacheManager().isSweepable()) {
                     sweepableIndexes.put(resourceId, new 
SweepableIndexUnit(localResource, lsmIndex));
                 } else {
                     unsweepableIndexes.put(resourceId, new 
UnsweepableIndexUnit(localResource));
                 }
+                inactiveResources.remove(localResource.getId());
             }
-            inactiveResources.remove(localResource.getId());
         } finally {
             evictionLock.readLock().unlock();
         }
@@ -75,7 +75,7 @@ public final class CloudDiskResourceCacheLockNotifier 
implements IDiskResourceCa
     public void onUnregister(long resourceId) {
         evictionLock.readLock().lock();
         try {
-            AbstractIndexUnit indexUnit = getUnit(resourceId);
+            AbstractIndexUnit indexUnit = removeUnit(resourceId);
             if (indexUnit != null) {
                 indexUnit.drop();
             } else {
@@ -86,14 +86,6 @@ public final class CloudDiskResourceCacheLockNotifier 
implements IDiskResourceCa
         }
     }
 
-    private AbstractIndexUnit getUnit(long resourceId) {
-        AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
-        if (indexUnit == null) {
-            indexUnit = unsweepableIndexes.get(resourceId);
-        }
-        return indexUnit;
-    }
-
     @Override
     public void onOpen(long resourceId) {
         evictionLock.readLock().lock();
@@ -122,7 +114,22 @@ public final class CloudDiskResourceCacheLockNotifier 
implements IDiskResourceCa
         } finally {
             evictionLock.readLock().unlock();
         }
+    }
 
+    private AbstractIndexUnit getUnit(long resourceId) {
+        AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
+        if (indexUnit == null) {
+            indexUnit = unsweepableIndexes.get(resourceId);
+        }
+        return indexUnit;
+    }
+
+    private AbstractIndexUnit removeUnit(long resourceId) {
+        AbstractIndexUnit indexUnit = sweepableIndexes.remove(resourceId);
+        if (indexUnit == null) {
+            indexUnit = unsweepableIndexes.remove(resourceId);
+        }
+        return indexUnit;
     }
 
     ReentrantReadWriteLock getEvictionLock() {
@@ -133,7 +140,8 @@ public final class CloudDiskResourceCacheLockNotifier 
implements IDiskResourceCa
         inactiveResources.clear();
         // First check whatever we had already
         for (LocalResource lr : localResources.values()) {
-            if (unsweepableIndexes.containsKey(lr.getId()) || 
sweepableIndexes.containsKey(lr.getId())) {
+            if (!filter.accept(lr) || 
unsweepableIndexes.containsKey(lr.getId())
+                    || sweepableIndexes.containsKey(lr.getId())) {
                 // We already have this resource
                 continue;
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index f594e06784..e466758c90 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -31,7 +31,6 @@ import org.apache.hyracks.api.io.IDiskSpaceMaker;
 import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
-import org.apache.hyracks.cloud.sweeper.ISweeper;
 import org.apache.hyracks.cloud.sweeper.Sweeper;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
@@ -48,7 +47,7 @@ public class DiskCacheSweeperThread implements Runnable, 
IDiskSpaceMaker {
     private final IPhysicalDrive physicalDrive;
     private final List<SweepableIndexUnit> indexes;
     private final ICloudIOManager cloudIOManager;
-    private final ISweeper sweeper;
+    private final Sweeper sweeper;
     private final long inactiveTimeThreshold;
 
     public DiskCacheSweeperThread(ExecutorService executorService, long 
waitTime,
@@ -161,7 +160,7 @@ public class DiskCacheSweeperThread implements Runnable, 
IDiskSpaceMaker {
     }
 
     @CriticalPath
-    private static void sweepIndexes(ISweeper sweeper, 
List<SweepableIndexUnit> indexes) {
+    private static void sweepIndexes(Sweeper sweeper, List<SweepableIndexUnit> 
indexes) {
         for (int i = 0; i < indexes.size(); i++) {
             SweepableIndexUnit index = indexes.get(i);
             if (!index.isSweeping()) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
similarity index 68%
rename from 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
rename to 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
index ca103abd15..3712918a09 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.cloud.sweeper;
+package org.apache.hyracks.cloud.cache.service;
 
-import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.storage.common.LocalResource;
 
-public final class NoOpSweeper implements ISweeper {
-    public static final ISweeper INSTANCE = new NoOpSweeper();
-
-    private NoOpSweeper() {
-    }
-
-    @Override
-    public void sweep(SweepableIndexUnit indexUnit) {
-        // NoOp
-    }
+@FunctionalInterface
+public interface IEvictableLocalResourceFilter {
+    /**
+     * Whether a local resource is evictable
+     *
+     * @param resource resource to test
+     * @return true if it is cacheable, false otherwise
+     */
+    boolean accept(LocalResource resource);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
deleted file mode 100644
index 9067a3fba6..0000000000
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.cloud.sweeper;
-
-import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
-import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
-
-/**
- * Sweeps an index to relieve the pressure on a local {@link IPhysicalDrive}
- */
-@FunctionalInterface
-public interface ISweeper {
-    /**
-     * Sweep an index
-     *
-     * @param indexUnit to sweep
-     */
-    void sweep(SweepableIndexUnit indexUnit) throws InterruptedException;
-}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index 245c95793b..6e12b79744 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -35,7 +35,7 @@ import 
org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public final class Sweeper implements ISweeper {
+public final class Sweeper {
     private static final Logger LOGGER = LogManager.getLogger();
     private static final SweepRequest POISON = new SweepRequest();
     private final BlockingQueue<SweepRequest> requests;
@@ -55,7 +55,6 @@ public final class Sweeper implements ISweeper {
         }
     }
 
-    @Override
     public void sweep(SweepableIndexUnit indexUnit) throws 
InterruptedException {
         SweepRequest request = freeRequests.take();
         request.reset(indexUnit);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 9fc60fa1a5..5c6fe096e8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -20,7 +20,8 @@ package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
 
 import static 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
 import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
-import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages;
 
 import java.util.BitSet;
 
@@ -171,15 +172,18 @@ public final class ColumnRanges {
     }
 
     /**
-     * Length of a column in pages
+     * The number of pages the column occupies
      *
      * @param columnIndex column index
      * @return number of pages
      */
     public int getColumnNumberOfPages(int columnIndex) {
         int pageSize = leafFrame.getBuffer().capacity();
-        int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), 
pageSize);
-        return numberOfPages == 0 ? 1 : numberOfPages;
+        int offset = 
getColumnStartOffset(leafFrame.getColumnOffset(columnIndex), pageSize);
+        int firstBufferLength = pageSize - offset;
+        int remainingLength = getColumnLength(columnIndex) - firstBufferLength;
+        // 1 for the first page + the number of remaining pages
+        return 1 + getNumberOfRemainingPages(remainingLength, pageSize);
     }
 
     /**
@@ -231,6 +235,10 @@ public final class ColumnRanges {
         return columnsOrder;
     }
 
+    public int getTotalNumberOfPages() {
+        return leafFrame.getMegaLeafNodeNumberOfPages();
+    }
+
     private void init() {
         int numberOfColumns = leafFrame.getNumberOfColumns();
         offsetColumnIndexPairs = 
LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
@@ -312,5 +320,4 @@ public final class ColumnRanges {
         }
         builder.append('\n');
     }
-
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 2d6b2fdf00..7623698ca2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
 
 import static 
org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
 import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
 import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
 
@@ -45,9 +46,12 @@ import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadCon
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 @NotThreadSafe
 public final class CloudColumnReadContext implements IColumnReadContext {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final ColumnProjectorType operation;
     private final IPhysicalDrive drive;
     private final BitSet plan;
@@ -130,10 +134,26 @@ public final class CloudColumnReadContext implements 
IColumnReadContext {
             return;
         }
 
-        // TODO What if every other page is requested. That would do N/2 
request, where N is the number of pages.
-        // TODO This should be optimized in a way that minimizes the number of 
requests
         columnRanges.reset(leafFrame, projectedColumns, plan, 
cloudOnlyColumns);
         int pageZeroId = leafFrame.getPageId();
+
+        if (operation == MERGE) {
+            pinAll(fileId, pageZeroId, 
leafFrame.getMegaLeafNodeNumberOfPages() - 1, bufferCache);
+        } else {
+            pinProjected(fileId, pageZeroId, bufferCache);
+        }
+    }
+
+    private void pinAll(int fileId, int pageZeroId, int numberOfPages, 
IBufferCache bufferCache)
+            throws HyracksDataException {
+        columnCtx.prepare(numberOfPages);
+        pin(bufferCache, fileId, pageZeroId, 1, numberOfPages);
+    }
+
+    private void pinProjected(int fileId, int pageZeroId, IBufferCache 
bufferCache) throws HyracksDataException {
+        // TODO What if every other page is requested. That would do N/2 
request, where N is the number of pages.
+        // TODO This should be optimized in a way that minimizes the number of 
requests
+
         int[] columnsOrders = columnRanges.getColumnsOrder();
         int i = 0;
         int columnIndex = columnsOrders[i];
@@ -143,38 +163,52 @@ public final class CloudColumnReadContext implements 
IColumnReadContext {
                 continue;
             }
 
-            int startPageId = 
columnRanges.getColumnStartPageIndex(columnIndex);
-            // Will increment the number pages if the next column's pages are 
contiguous to this column's pages
-            int numberOfPages = 
columnRanges.getColumnNumberOfPages(columnIndex);
+            int firstPageIdx = 
columnRanges.getColumnStartPageIndex(columnIndex);
+            // last page of the column
+            int lastPageIdx = firstPageIdx + 
columnRanges.getColumnNumberOfPages(columnIndex) - 1;
 
             // Advance to the next column to check if it has contiguous pages
             columnIndex = columnsOrders[++i];
             while (columnIndex > -1) {
+                int sharedPageCount = 0;
                 // Get the next column's start page ID
-                int nextStartPageId = 
columnRanges.getColumnStartPageIndex(columnIndex);
-                if (nextStartPageId > startPageId + numberOfPages + 1) {
-                    // The next startPageId is not contiguous, stop.
+                int nextStartPageIdx = 
columnRanges.getColumnStartPageIndex(columnIndex);
+                if (nextStartPageIdx > lastPageIdx + 1) {
+                    // The nextStartPageIdx is not contiguous, stop.
                     break;
+                } else if (nextStartPageIdx == lastPageIdx) {
+                    // A shared page
+                    sharedPageCount = 1;
                 }
 
-                // Last page of this column
-                int nextLastPage = nextStartPageId + 
columnRanges.getColumnNumberOfPages(columnIndex);
-                // The next column's pages are contiguous. Combine its ranges 
with the previous one.
-                numberOfPages = nextLastPage - startPageId;
+                lastPageIdx += 
columnRanges.getColumnNumberOfPages(columnIndex) - sharedPageCount;
                 // Advance to the next column
                 columnIndex = columnsOrders[++i];
             }
 
+            if (lastPageIdx >= columnRanges.getTotalNumberOfPages()) {
+                throw new IndexOutOfBoundsException("lastPageIdx=" + 
lastPageIdx + ">=" + "megaLeafNodePages="
+                        + columnRanges.getTotalNumberOfPages());
+            }
+
+            int numberOfPages = lastPageIdx - firstPageIdx + 1;
             columnCtx.prepare(numberOfPages);
-            pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+            pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
         }
     }
 
-    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int 
start, int numOfRequestedPages)
+    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int 
start, int numberOfPages)
             throws HyracksDataException {
-        for (int i = start; i < start + numOfRequestedPages; i++) {
+        for (int i = start; i < start + numberOfPages; i++) {
             long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + 
i);
-            pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+            try {
+                pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+            } catch (Throwable e) {
+                LOGGER.error("Error while pinning page number {} with number 
of pages {}. {}\n columnRanges:\n {}", i,
+                        numberOfPages, columnCtx, columnRanges);
+                throw e;
+            }
+
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index f1b2fd40be..21d5ce7fd9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
 
 import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
-import static 
org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
 import static 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
 
 import java.io.IOException;
@@ -49,10 +48,15 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
     private final ColumnProjectorType operation;
     private final ColumnRanges columnRanges;
     private final IPhysicalDrive drive;
+
     private int numberOfContiguousPages;
     private int pageCounter;
     private InputStream gapStream;
 
+    // For debugging
+    private long streamOffset;
+    private long remainingStreamBytes;
+
     CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges 
columnRanges, IPhysicalDrive drive) {
         this.operation = operation;
         this.columnRanges = columnRanges;
@@ -76,7 +80,7 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
              * up writing the bytes of this page in the position of another 
page. Therefore, we should skip the bytes
              * for this particular page to avoid placing the bytes of this 
page into another page's position.
              */
-            skipCloudBytes(cachedPage);
+            skipStreamIfOpened(cachedPage);
             pageCounter++;
         }
     }
@@ -102,8 +106,7 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
         boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
         int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
         boolean cloudOnly = columnRanges.isCloudOnly(pageId);
-        ByteBuffer buffer;
-        if (empty || cloudOnly || gapStream != null) {
+        if (empty || cloudOnly) {
             boolean evictable = columnRanges.isEvictable(pageId);
             /*
              * Persist iff the following conditions are satisfied:
@@ -118,26 +121,30 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
              * 'cloudOnly' is true.
              */
             boolean persist = empty && !cloudOnly && !evictable && operation 
!= MERGE && drive.hasSpace();
-            buffer = readFromStream(ioManager, fileHandle, header, cPage, 
persist);
-            buffer.position(RESERVED_HEADER_BYTES);
+            readFromStream(ioManager, fileHandle, header, cPage, persist);
         } else {
             /*
-             * Here we can find a page that is planned for eviction, but it 
has not being evicted yet
-             * (i.e., empty = false). This could happen if the cursor is at a 
point the sweeper hasn't
-             * reached yet (i.e., cloudOnly = false).
+             *  Here we can find a page that is planned for eviction, but it 
has not being evicted yet
+             *  (i.e., empty = false). This could happen if the cursor is at a 
point the sweeper hasn't
+             *  reached yet (i.e., cloudOnly = false). Thus, whatever is read 
from the disk is valid.
              */
-            buffer = DEFAULT.processHeader(ioManager, fileHandle, header, 
cPage);
+            skipStreamIfOpened(cPage);
         }
 
         if (++pageCounter == numberOfContiguousPages) {
             close();
         }
 
-        return buffer;
+        // Finally process the header
+        return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
     }
 
     void close() throws HyracksDataException {
         if (gapStream != null) {
+            if (remainingStreamBytes != 0) {
+                LOGGER.warn("Closed cloud stream with nonzero bytes = {}", 
remainingStreamBytes);
+            }
+
             try {
                 gapStream.close();
                 gapStream = null;
@@ -147,13 +154,14 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
         }
     }
 
-    private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle 
fileHandle,
-            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) 
throws HyracksDataException {
+    private void readFromStream(IOManager ioManager, BufferedFileHandle 
fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage, boolean persist) throws HyracksDataException {
         InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
         ByteBuffer buffer = header.getBuffer();
         buffer.position(0);
+
         try {
-            while (buffer.remaining() != 0) {
+            while (buffer.remaining() > 0) {
                 int length = stream.read(buffer.array(), buffer.position(), 
buffer.remaining());
                 if (length < 0) {
                     throw new IllegalStateException("Stream should not be 
empty!");
@@ -164,6 +172,7 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
             throw HyracksDataException.create(e);
         }
 
+        // Flip the buffer after reading to restore the correct position
         buffer.flip();
 
         if (persist) {
@@ -172,7 +181,8 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
             BufferCacheCloudReadContextUtil.persist(cloudIOManager, 
fileHandle.getFileHandle(), buffer, offset);
         }
 
-        return buffer;
+        streamOffset += cPage.getCompressedPageSize();
+        remainingStreamBytes -= cPage.getCompressedPageSize();
     }
 
     private InputStream getOrCreateStream(IOManager ioManager, 
BufferedFileHandle fileHandle, CachedPage cPage)
@@ -185,25 +195,31 @@ final class CloudMegaPageReadContext implements 
IBufferCacheReadContext {
         long offset = cPage.getCompressedPageOffset();
         int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
         long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+        remainingStreamBytes = length;
+        streamOffset = offset;
+        LOGGER.info(
+                "Cloud stream read for pageId={} starting from pageCounter={} 
out of "
+                        + "numberOfContiguousPages={} (streamOffset = {}, 
remainingStreamBytes = {})",
+                pageId, pageCounter, numberOfContiguousPages, streamOffset, 
remainingStreamBytes);
 
-        LOGGER.info("Cloud stream read for {} pages [{}, {}]", 
numberOfContiguousPages - pageCounter, pageId,
-                pageId + requiredNumOfPages);
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
         gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), 
offset, length);
 
         return gapStream;
     }
 
-    private void skipCloudBytes(CloudCachedPage cachedPage) throws 
HyracksDataException {
+    private void skipStreamIfOpened(CachedPage cPage) throws 
HyracksDataException {
         if (gapStream == null) {
             return;
         }
 
         try {
-            long remaining = cachedPage.getCompressedPageSize();
+            long remaining = cPage.getCompressedPageSize();
             while (remaining > 0) {
                 remaining -= gapStream.skip(remaining);
             }
+            streamOffset += cPage.getCompressedPageSize();
+            remainingStreamBytes -= cPage.getCompressedPageSize();
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
index 48633e7e0c..d7b076478a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java
@@ -19,7 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
 
 import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset;
-import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -40,7 +40,7 @@ public final class ColumnMultiBufferProvider implements 
IColumnBufferProvider {
     private final IColumnReadMultiPageOp multiPageOp;
     private final Queue<ICachedPage> pages;
     private final LongSet pinnedPages;
-    private int numberOfPages;
+    private int numberOfRemainingPages;
     private int startPage;
     private int startOffset;
     private int length;
@@ -55,7 +55,7 @@ public final class ColumnMultiBufferProvider implements 
IColumnBufferProvider {
     @Override
     public void reset(ColumnBTreeReadLeafFrame frame) throws 
HyracksDataException {
         if (columnIndex >= frame.getNumberOfColumns()) {
-            numberOfPages = 0;
+            numberOfRemainingPages = 0;
             length = 0;
             return;
         }
@@ -70,8 +70,8 @@ public final class ColumnMultiBufferProvider implements 
IColumnBufferProvider {
         length = ColumnUtil.readColumnLength(firstPage, startOffset, pageSize);
         // Get the remaining length of the column
         int remainingLength = length - firstPage.remaining();
-        // Get the number of pages this column occupies
-        numberOfPages = getNumberOfPages(remainingLength, pageSize);
+        // Get the number of remaining pages this column occupies
+        numberOfRemainingPages = getNumberOfRemainingPages(remainingLength, 
pageSize);
         //+4-bytes after reading the length
         startOffset += Integer.BYTES;
         //-4-bytes after reading the length
@@ -84,12 +84,12 @@ public final class ColumnMultiBufferProvider implements 
IColumnBufferProvider {
         buffer.clear();
         buffer.position(startOffset);
         buffers.add(buffer);
-        for (int i = 0; i < numberOfPages; i++) {
+        for (int i = 0; i < numberOfRemainingPages; i++) {
             buffer = readNext().duplicate();
             buffer.clear();
             buffers.add(buffer);
         }
-        numberOfPages = 0;
+        numberOfRemainingPages = 0;
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
index 12bb64e76e..fc1e4603ba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java
@@ -87,7 +87,7 @@ public class ColumnUtil {
         // Read the length of this column
         int length = firstPage.getInt();
         // Ensure the page limit to at most a full page
-        firstPage.limit(Math.min(length, pageSize));
+        firstPage.limit(Math.min(startOffset + length, pageSize));
         return length;
     }
 
@@ -98,7 +98,7 @@ public class ColumnUtil {
      * @param pageSize        disk buffer cache page size
      * @return number of pages the column occupies
      */
-    public static int getNumberOfPages(int remainingLength, int pageSize) {
+    public static int getNumberOfRemainingPages(int remainingLength, int 
pageSize) {
         return (int) Math.ceil((double) remainingLength / pageSize);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 21cefb6c11..a0ad045038 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -321,4 +321,9 @@ public abstract class AbstractBufferedFileIOManager {
         final String path = fileHandle.getFileReference().getAbsolutePath();
         throw new IllegalStateException(String.format(ERROR_MESSAGE, op, 
expected, actual, path));
     }
+
+    @Override
+    public String toString() {
+        return fileHandle != null ? 
fileHandle.getFileReference().getAbsolutePath() : "";
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index ab375320dd..271afadae6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -203,12 +203,11 @@ public class BufferCache implements IBufferCacheInternal, 
ILifeCycleComponent, I
                 }
             }
 
-            // Notify context page is going to be pinned
-            context.onPin(cPage);
-
             // Resolve race of multiple threads trying to read the page from
             // disk.
             synchronized (cPage) {
+                // Notify context page is going to be pinned
+                context.onPin(cPage);
                 if (!cPage.valid) {
                     try {
                         tryRead(cPage, context);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 6fd18ffec5..b9d4e5a681 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
 /**
  * @author yingyib
  */
@@ -209,4 +211,10 @@ public class CachedPage implements ICachedPageInternal {
     public int getCompressedPageSize() {
         return compressedSize;
     }
+
+    @Override
+    public String toString() {
+        return "CachedPage:[page:" + BufferedFileHandle.getPageId(dpid) + ", 
compressedPageOffset:" + compressedOffset
+                + ", compressedSize:" + compressedSize + "]";
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
index 1e986b3e47..6ce985e428 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -24,7 +24,6 @@ import org.apache.hyracks.storage.common.LocalResource;
 /**
  * A proxy to notify a disk-cache (a faster disk that is caching a slower 
resource) about resource lifecycle events.
  * The notifier could block a resource from being operated on if the 
disk-cache manager denying access to a resource
- * TODO Do we care about dataset?
  */
 public interface IDiskResourceCacheLockNotifier {
     /**
@@ -33,9 +32,8 @@ public interface IDiskResourceCacheLockNotifier {
      *
      * @param localResource resource to be registered
      * @param index         of the resource
-     * @param partition     partition
      */
-    void onRegister(LocalResource localResource, IIndex index, int partition);
+    void onRegister(LocalResource localResource, IIndex index);
 
     /**
      * Notify unregistering an existing resource
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
index b83c388466..2c590cbb19 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -28,7 +28,7 @@ public final class NoOpDiskResourceCacheLockNotifier 
implements IDiskResourceCac
     }
 
     @Override
-    public void onRegister(LocalResource localResource, IIndex index, int 
partition) {
+    public void onRegister(LocalResource localResource, IIndex index) {
         // NoOp
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 970a0ae6b0..ff2bd83c46 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -114,11 +114,13 @@ public class BufferedFileHandle extends 
AbstractBufferedFileIOManager {
         final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) 
+ 1 == extraBlockPageId;
         IFileHandle handle = getFileHandle();
         long bytesWritten;
+        long offset;
         try {
             buf.limit(contiguousLargePages ? bufferCache.getPageSize() * 
totalPages : bufferCache.getPageSize());
             buf.position(0);
             ByteBuffer[] buffers = header.prepareWrite(cPage);
-            bytesWritten = context.write(ioManager, handle, 
getFirstPageOffset(cPage), buffers);
+            offset = getFirstPageOffset(cPage);
+            bytesWritten = context.write(ioManager, handle, offset, buffers);
         } finally {
             returnHeaderHelper(header);
         }
@@ -130,6 +132,9 @@ public class BufferedFileHandle extends 
AbstractBufferedFileIOManager {
 
         final int expectedWritten = bufferCache.getPageSizeWithHeader() + 
bufferCache.getPageSize() * (totalPages - 1);
         verifyBytesWritten(expectedWritten, bytesWritten);
+
+        cPage.setCompressedPageOffset(offset);
+        cPage.setCompressedPageSize((int) bytesWritten);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6fe3846725..cd882b5365 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -114,15 +114,16 @@ public class CompressedBufferedFileHandle extends 
BufferedFileHandle {
             } else {
                 uBuffer.position(0);
             }
+            long offset;
             if (compressToWriteBuffer(uBuffer, cBuffer) < 
bufferCache.getPageSize()) {
                 cBuffer.position(0);
-                final long offset = 
compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
+                offset = compressedFileManager.writePageInfo(pageId, 
cBuffer.remaining());
                 expectedBytesWritten = cBuffer.limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
cBuffer);
             } else {
                 //Compression did not gain any savings
                 final ByteBuffer[] buffers = header.prepareWrite(cPage);
-                final long offset = 
compressedFileManager.writePageInfo(pageId, 
bufferCache.getPageSizeWithHeader());
+                offset = compressedFileManager.writePageInfo(pageId, 
bufferCache.getPageSizeWithHeader());
                 expectedBytesWritten = buffers[0].limit() + (long) 
buffers[1].limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
buffers);
             }
@@ -134,6 +135,9 @@ public class CompressedBufferedFileHandle extends 
BufferedFileHandle {
                 writeExtraCompressedPages(cPage, cBuffer, totalPages, 
extraBlockPageId);
             }
 
+            cPage.setCompressedPageOffset(offset);
+            cPage.setCompressedPageSize((int) bytesWritten);
+
         } finally {
             returnHeaderHelper(header);
         }

Reply via email to