This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 94964fd46f [ASTERIXDB-3387][STO] Introduce Selective caching policy
94964fd46f is described below

commit 94964fd46f000948c16640718b7a0686a2bc8cc0
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon May 13 14:27:16 2024 -0700

    [ASTERIXDB-3387][STO] Introduce Selective caching policy
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    This patch introduces a new 'Selective' caching policy,
    where files can be eviced and holes can punched in files.
    
    Change-Id: I9f6f91f80581078d9622be240bfa01d6b2dbaf2e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18278
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../org/apache/asterix/cloud/CloudFileHandle.java  | 14 ++++
 .../apache/asterix/cloud/CloudManagerProvider.java |  2 +-
 .../apache/asterix/cloud/LazyCloudIOManager.java   | 52 ++++++++----
 .../apache/asterix/cloud/lazy/ParallelCacher.java  |  1 +
 .../cloud/lazy/accessor/AbstractLazyAccessor.java  | 11 +++
 .../asterix/cloud/lazy/accessor/ILazyAccessor.java | 17 ++++
 .../cloud/lazy/accessor/InitialCloudAccessor.java  | 17 +++-
 .../lazy/accessor/ReplaceableCloudAccessor.java    | 13 ++-
 .../lazy/accessor/SelectiveCloudAccessor.java      | 63 ++++++++++++++
 .../cloud/lazy/filesystem/HolePuncherProvider.java | 98 ++++++++++++++++++++++
 .../IHolePuncher.java}                             | 28 +++----
 .../asterix/common/cloud/CloudCachePolicy.java     |  3 +-
 12 files changed, 281 insertions(+), 38 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 0ae93cfca0..b7eb1c8273 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -21,12 +21,16 @@ package org.apache.asterix.cloud;
 import java.io.IOException;
 
 import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
 import org.apache.hyracks.control.nc.io.FileHandle;
 
 public class CloudFileHandle extends FileHandle {
     private final ICloudWriter cloudWriter;
+    private int blockSize;
+    private int fileDescriptor;
 
     public CloudFileHandle(FileReference fileRef, ICloudWriter cloudWriter) {
         super(fileRef);
@@ -38,9 +42,19 @@ public class CloudFileHandle extends FileHandle {
         if (fileRef.getFile().exists()) {
             super.open(rwMode, syncMode);
         }
+        fileDescriptor = 
FileSystemOperationDispatcherUtil.getFileDescriptor(getFileChannel());
+        blockSize = 
FileSystemOperationDispatcherUtil.getBlockSize(fileDescriptor);
     }
 
     public ICloudWriter getCloudWriter() {
         return cloudWriter;
     }
+
+    public int getBlockSize() throws HyracksDataException {
+        return blockSize;
+    }
+
+    public int getFileDescriptor() throws HyracksDataException {
+        return fileDescriptor;
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index f325f41065..fb61d7abac 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -34,7 +34,7 @@ public class CloudManagerProvider {
             INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
         IOManager localIoManager = (IOManager) ioManager;
         if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
-            return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver);
+            return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, false);
         }
 
         return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index fa4cd5683a..612237a96b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -40,6 +40,9 @@ import 
org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
 import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
 import org.apache.asterix.cloud.lazy.accessor.LocalAccessor;
 import org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor;
+import org.apache.asterix.cloud.lazy.accessor.SelectiveCloudAccessor;
+import org.apache.asterix.cloud.lazy.filesystem.HolePuncherProvider;
+import org.apache.asterix.cloud.lazy.filesystem.IHolePuncher;
 import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -61,20 +64,26 @@ import org.apache.logging.log4j.Logger;
 final class LazyCloudIOManager extends AbstractCloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
     private final ILazyAccessorReplacer replacer;
+    private final IHolePuncher puncher;
     private ILazyAccessor accessor;
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
+            INamespacePathResolver nsPathResolver, boolean 
replaceableAccessor) throws HyracksDataException {
         super(ioManager, cloudProperties, nsPathResolver);
         accessor = new InitialCloudAccessor(cloudClient, bucket, 
localIoManager);
-        replacer = () -> {
-            synchronized (this) {
-                if (!accessor.isLocalAccessor()) {
-                    LOGGER.warn("Replacing cloud-accessor to local-accessor");
-                    accessor = new LocalAccessor(cloudClient, bucket, 
localIoManager);
+        puncher = HolePuncherProvider.get(this, cloudProperties, 
writeBufferProvider);
+        if (replaceableAccessor) {
+            replacer = InitialCloudAccessor.NO_OP_REPLACER;
+        } else {
+            replacer = () -> {
+                synchronized (this) {
+                    if (!accessor.isLocalAccessor()) {
+                        LOGGER.warn("Replacing cloud-accessor to 
local-accessor");
+                        accessor = new LocalAccessor(cloudClient, bucket, 
localIoManager);
+                    }
                 }
-            }
-        };
+            };
+        }
     }
 
     /*
@@ -104,7 +113,11 @@ final class LazyCloudIOManager extends 
AbstractCloudIOManager {
         // Keep uncached files list (i.e., files exists in cloud only)
         cloudFiles.removeAll(localFiles);
         int remainingUncachedFiles = cloudFiles.size();
-        if (remainingUncachedFiles > 0) {
+        boolean canReplaceAccessor = replacer != 
InitialCloudAccessor.NO_OP_REPLACER;
+        if (remainingUncachedFiles == 0 && canReplaceAccessor) {
+            // Everything is cached, no need to invoke cloud-based accessor 
for read operations
+            accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+        } else {
             LOGGER.debug("The number of uncached files: {}. Uncached files: 
{}", remainingUncachedFiles, cloudFiles);
             // Get list of FileReferences from the list of cloud (i.e., 
resolve each path's string to FileReference)
             List<FileReference> uncachedFiles = resolve(cloudFiles);
@@ -115,13 +128,17 @@ final class LazyCloudIOManager extends 
AbstractCloudIOManager {
             // Download all metadata files to avoid (List) calls to the cloud 
when listing/reading these files
             downloadMetadataFiles(downloader, uncachedFiles);
             // Create a parallel cacher which download and monitor all 
uncached files
-            ParallelCacher cacher = new ParallelCacher(downloader, 
uncachedFiles, true);
-            // Local cache misses some files, cloud-based accessor is needed 
for read operations
-            accessor = new ReplaceableCloudAccessor(cloudClient, bucket, 
localIoManager, partitions, replacer, cacher);
-        } else {
-            // Everything is cached, no need to invoke cloud-based accessor 
for read operations
-            accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+            ParallelCacher cacher = new ParallelCacher(downloader, 
uncachedFiles, canReplaceAccessor);
+            // Local cache misses some files or SELECTIVE policy is used, 
cloud-based accessor is needed
+            accessor = createAccessor(cacher, canReplaceAccessor);
+        }
+    }
+
+    private ILazyAccessor createAccessor(ParallelCacher cacher, boolean 
canReplaceAccessor) {
+        if (canReplaceAccessor) {
+            return new ReplaceableCloudAccessor(cloudClient, bucket, 
localIoManager, partitions, replacer, cacher);
         }
+        return new SelectiveCloudAccessor(cloudClient, bucket, localIoManager, 
partitions, puncher, cacher);
     }
 
     private void downloadMetadataPartition(IParallelDownloader downloader, 
List<FileReference> uncachedFiles,
@@ -187,13 +204,12 @@ final class LazyCloudIOManager extends 
AbstractCloudIOManager {
 
     @Override
     public int punchHole(IFileHandle fileHandle, long offset, long length) 
throws HyracksDataException {
-        // TODO implement for Selective accessor
-        return -1;
+        return accessor.doPunchHole(fileHandle, offset, length);
     }
 
     @Override
     public void evict(FileReference directory) throws HyracksDataException {
-        // TODO implement for Selective accessor
+        accessor.doEvict(directory);
     }
 
     private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws 
HyracksDataException {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index bd6644c6da..7539aa731d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger;
  * A parallel cacher that maintains and downloads (in parallel) all uncached 
files
  *
  * @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
+ * @see org.apache.asterix.cloud.lazy.accessor.SelectiveCloudAccessor
  */
 public final class ParallelCacher implements IParallelCacher {
     private static final Logger LOGGER = LogManager.getLogger();
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
index c7ce2224cd..549cc3a678 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 
@@ -61,4 +62,14 @@ abstract class AbstractLazyAccessor implements ILazyAccessor 
{
         }
         return deletedFiles;
     }
+
+    @Override
+    public int doPunchHole(IFileHandle sweeperFile, long offset, long length) 
throws HyracksDataException {
+        throw new UnsupportedOperationException("PunchHole is not supported");
+    }
+
+    @Override
+    public void doEvict(FileReference directory) throws HyracksDataException {
+        throw new UnsupportedOperationException("Uncache is not supported");
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index 48f2ec7b6f..e6c06925c7 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -25,6 +25,7 @@ import org.apache.asterix.cloud.CloudFileHandle;
 import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
 
 /**
  * An abstraction for lazy I/O operations
@@ -94,4 +95,20 @@ public interface ILazyAccessor {
      * @param bytes         to be written
      */
     void doOverwrite(FileReference fileReference, byte[] bytes) throws 
HyracksDataException;
+
+    /**
+     * Punch a hole in a sweepable file (only)
+     *
+     * @param fileHandle file handle
+     * @param offset     starting offset
+     * @param length     length
+     */
+    int doPunchHole(IFileHandle fileHandle, long offset, long length) throws 
HyracksDataException;
+
+    /**
+     * Evicts a directory deletes it only in the local drive
+     *
+     * @param directory to evict
+     */
+    void doEvict(FileReference directory) throws HyracksDataException;
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
index 798163dc12..8c3321ad86 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
@@ -22,6 +22,8 @@ import java.util.Collections;
 
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.lazy.NoOpParallelCacher;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.control.nc.io.IOManager;
 
 /**
@@ -29,10 +31,23 @@ import org.apache.hyracks.control.nc.io.IOManager;
  * initializing the NC's partitions
  */
 public class InitialCloudAccessor extends ReplaceableCloudAccessor {
-    private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
+    public static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
     };
 
     public InitialCloudAccessor(ICloudClient cloudClient, String bucket, 
IOManager localIoManager) {
         super(cloudClient, bucket, localIoManager, Collections.emptySet(), 
NO_OP_REPLACER, NoOpParallelCacher.INSTANCE);
     }
+
+    @Override
+    public boolean doExists(FileReference fileRef) throws HyracksDataException 
{
+        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, 
fileRef.getRelativePath());
+    }
+
+    @Override
+    public long doGetSize(FileReference fileReference) throws 
HyracksDataException {
+        if (localIoManager.exists(fileReference)) {
+            return localIoManager.getSize(fileReference);
+        }
+        return cloudClient.getObjectSize(bucket, 
fileReference.getRelativePath());
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index c53267488f..1a440e7efe 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -72,8 +72,10 @@ public class ReplaceableCloudAccessor extends 
AbstractLazyAccessor {
     @Override
     public void doOnOpen(CloudFileHandle fileHandle) throws 
HyracksDataException {
         FileReference fileRef = fileHandle.getFileReference();
-        if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, 
fileRef.getRelativePath())) {
-            if (cacher.downloadDataFiles(fileRef)) {
+        if (!localIoManager.exists(fileRef) && cacher.isCacheable(fileRef)) {
+            boolean shouldReplace = fileRef.areHolesAllowed() ? 
cacher.createEmptyDataFiles(fileRef)
+                    : cacher.downloadDataFiles(fileRef);
+            if (shouldReplace) {
                 replace();
             }
         }
@@ -134,6 +136,11 @@ public class ReplaceableCloudAccessor extends 
AbstractLazyAccessor {
         }
     }
 
+    @Override
+    public void doEvict(FileReference directory) throws HyracksDataException {
+        throw new UnsupportedOperationException("evict is not supported");
+    }
+
     private Set<FileReference> cloudBackedList(FileReference dir, 
FilenameFilter filter) throws HyracksDataException {
         LOGGER.debug("CLOUD LIST: {}", dir);
         Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket, 
dir.getRelativePath(), filter);
@@ -169,7 +176,7 @@ public class ReplaceableCloudAccessor extends 
AbstractLazyAccessor {
         return 
partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
     }
 
-    private void replace() throws HyracksDataException {
+    protected void replace() throws HyracksDataException {
         cacher.close();
         replacer.replace();
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
new file mode 100644
index 0000000000..934468aab8
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.asterix.cloud.UncachedFileReference;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.lazy.IParallelCacher;
+import org.apache.asterix.cloud.lazy.filesystem.IHolePuncher;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+public class SelectiveCloudAccessor extends ReplaceableCloudAccessor {
+    private final IHolePuncher puncher;
+
+    public SelectiveCloudAccessor(ICloudClient cloudClient, String bucket, 
IOManager localIoManager,
+            Set<Integer> partitions, IHolePuncher puncher, IParallelCacher 
cacher) {
+        super(cloudClient, bucket, localIoManager, partitions, 
InitialCloudAccessor.NO_OP_REPLACER, cacher);
+        this.puncher = puncher;
+    }
+
+    @Override
+    public int doPunchHole(IFileHandle fileHandle, long offset, long length) 
throws HyracksDataException {
+        return puncher.punchHole(fileHandle, offset, length);
+    }
+
+    @Override
+    public void doEvict(FileReference directory) throws HyracksDataException {
+        if (!directory.getFile().isDirectory()) {
+            throw new IllegalStateException(directory + " is not a directory");
+        }
+
+        // TODO only delete data files?
+        Collection<FileReference> uncachedFiles = 
UncachedFileReference.toUncached(localIoManager.list(directory));
+        cacher.add(uncachedFiles);
+        localIoManager.delete(directory);
+    }
+
+    @Override
+    protected void replace() {
+        // NoOp
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
new file mode 100644
index 0000000000..4ea6c464e3
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.filesystem;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+
+public final class HolePuncherProvider {
+    private static final IHolePuncher UNSUPPORTED = 
HolePuncherProvider::unsupported;
+
+    private HolePuncherProvider() {
+    }
+
+    public static IHolePuncher get(AbstractCloudIOManager cloudIOManager, 
CloudProperties cloudProperties,
+            IWriteBufferProvider bufferProvider) {
+        if (cloudProperties.getCloudCachePolicy() != 
CloudCachePolicy.SELECTIVE) {
+            return UNSUPPORTED;
+        }
+
+        return new DebugHolePuncher(cloudIOManager, bufferProvider);
+    }
+
+    private static int unsupported(IFileHandle fileHandle, long offset, long 
length) {
+        throw new UnsupportedOperationException("punchHole is not supported");
+    }
+
+    private static final class DebugHolePuncher implements IHolePuncher {
+        private final AbstractCloudIOManager cloudIOManager;
+        private final IWriteBufferProvider bufferProvider;
+
+        private DebugHolePuncher(AbstractCloudIOManager cloudIOManager, 
IWriteBufferProvider bufferProvider) {
+            this.cloudIOManager = cloudIOManager;
+            this.bufferProvider = bufferProvider;
+        }
+
+        @Override
+        public int punchHole(IFileHandle fileHandle, long offset, long length) 
throws HyracksDataException {
+            ByteBuffer buffer = acquireAndPrepareBuffer(length);
+            int totalWritten = 0;
+            try {
+                long remaining = length;
+                long position = offset;
+                while (remaining > 0) {
+                    int written = cloudIOManager.localWriter(fileHandle, 
position, buffer);
+                    position += written;
+                    remaining -= written;
+                    totalWritten += written;
+                    buffer.limit((int) Math.min(remaining, buffer.capacity()));
+                }
+            } finally {
+                bufferProvider.recycle(buffer);
+            }
+
+            return totalWritten;
+        }
+
+        private ByteBuffer acquireAndPrepareBuffer(long length) {
+            ByteBuffer buffer = bufferProvider.getBuffer();
+            buffer.clear();
+            if (buffer.capacity() >= length) {
+                buffer.limit((int) length);
+            }
+
+            while (buffer.remaining() > Long.BYTES) {
+                buffer.putLong(0L);
+            }
+
+            while (buffer.remaining() > 0) {
+                buffer.put((byte) 0);
+            }
+
+            buffer.flip();
+            return buffer;
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
similarity index 51%
copy from 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
copy to 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
index 798163dc12..369c24732a 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
@@ -16,23 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.cloud.lazy.accessor;
+package org.apache.asterix.cloud.lazy.filesystem;
 
-import java.util.Collections;
-
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.lazy.NoOpParallelCacher;
-import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
 
 /**
- * Initial accessor to allow {@link 
org.apache.asterix.common.transactions.IGlobalTransactionContext} to work before
- * initializing the NC's partitions
+ * An interface used to implement an OS dependent for the hole punching 
operation
  */
-public class InitialCloudAccessor extends ReplaceableCloudAccessor {
-    private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
-    };
+@FunctionalInterface
+public interface IHolePuncher {
 
-    public InitialCloudAccessor(ICloudClient cloudClient, String bucket, 
IOManager localIoManager) {
-        super(cloudClient, bucket, localIoManager, Collections.emptySet(), 
NO_OP_REPLACER, NoOpParallelCacher.INSTANCE);
-    }
+    /**
+     * Punch a hole in a sweeper file (only)
+     *
+     * @param file   sweeper file
+     * @param offset starting offset
+     * @param length length
+     */
+    int punchHole(IFileHandle file, long offset, long length) throws 
HyracksDataException;
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
index c6858c5748..e8e3334857 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
@@ -26,7 +26,8 @@ import java.util.stream.Collectors;
 
 public enum CloudCachePolicy {
     EAGER("eager"),
-    LAZY("lazy");
+    LAZY("lazy"),
+    SELECTIVE("selective");
     private static final Map<String, CloudCachePolicy> partitioningSchemes =
             
Collections.unmodifiableMap(Arrays.stream(CloudCachePolicy.values())
                     .collect(Collectors.toMap(CloudCachePolicy::getPolicyName, 
Function.identity())));

Reply via email to