This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 94964fd46f [ASTERIXDB-3387][STO] Introduce Selective caching policy
94964fd46f is described below
commit 94964fd46f000948c16640718b7a0686a2bc8cc0
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon May 13 14:27:16 2024 -0700
[ASTERIXDB-3387][STO] Introduce Selective caching policy
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
This patch introduces a new 'Selective' caching policy,
where files can be eviced and holes can punched in files.
Change-Id: I9f6f91f80581078d9622be240bfa01d6b2dbaf2e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18278
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../org/apache/asterix/cloud/CloudFileHandle.java | 14 ++++
.../apache/asterix/cloud/CloudManagerProvider.java | 2 +-
.../apache/asterix/cloud/LazyCloudIOManager.java | 52 ++++++++----
.../apache/asterix/cloud/lazy/ParallelCacher.java | 1 +
.../cloud/lazy/accessor/AbstractLazyAccessor.java | 11 +++
.../asterix/cloud/lazy/accessor/ILazyAccessor.java | 17 ++++
.../cloud/lazy/accessor/InitialCloudAccessor.java | 17 +++-
.../lazy/accessor/ReplaceableCloudAccessor.java | 13 ++-
.../lazy/accessor/SelectiveCloudAccessor.java | 63 ++++++++++++++
.../cloud/lazy/filesystem/HolePuncherProvider.java | 98 ++++++++++++++++++++++
.../IHolePuncher.java} | 28 +++----
.../asterix/common/cloud/CloudCachePolicy.java | 3 +-
12 files changed, 281 insertions(+), 38 deletions(-)
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 0ae93cfca0..b7eb1c8273 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -21,12 +21,16 @@ package org.apache.asterix.cloud;
import java.io.IOException;
import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
import org.apache.hyracks.control.nc.io.FileHandle;
public class CloudFileHandle extends FileHandle {
private final ICloudWriter cloudWriter;
+ private int blockSize;
+ private int fileDescriptor;
public CloudFileHandle(FileReference fileRef, ICloudWriter cloudWriter) {
super(fileRef);
@@ -38,9 +42,19 @@ public class CloudFileHandle extends FileHandle {
if (fileRef.getFile().exists()) {
super.open(rwMode, syncMode);
}
+ fileDescriptor =
FileSystemOperationDispatcherUtil.getFileDescriptor(getFileChannel());
+ blockSize =
FileSystemOperationDispatcherUtil.getBlockSize(fileDescriptor);
}
public ICloudWriter getCloudWriter() {
return cloudWriter;
}
+
+ public int getBlockSize() throws HyracksDataException {
+ return blockSize;
+ }
+
+ public int getFileDescriptor() throws HyracksDataException {
+ return fileDescriptor;
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index f325f41065..fb61d7abac 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -34,7 +34,7 @@ public class CloudManagerProvider {
INamespacePathResolver nsPathResolver) throws HyracksDataException
{
IOManager localIoManager = (IOManager) ioManager;
if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
- return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver);
+ return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, false);
}
return new EagerCloudIOManager(localIoManager, cloudProperties,
nsPathResolver);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index fa4cd5683a..612237a96b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -40,6 +40,9 @@ import
org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
import org.apache.asterix.cloud.lazy.accessor.LocalAccessor;
import org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor;
+import org.apache.asterix.cloud.lazy.accessor.SelectiveCloudAccessor;
+import org.apache.asterix.cloud.lazy.filesystem.HolePuncherProvider;
+import org.apache.asterix.cloud.lazy.filesystem.IHolePuncher;
import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -61,20 +64,26 @@ import org.apache.logging.log4j.Logger;
final class LazyCloudIOManager extends AbstractCloudIOManager {
private static final Logger LOGGER = LogManager.getLogger();
private final ILazyAccessorReplacer replacer;
+ private final IHolePuncher puncher;
private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties,
- INamespacePathResolver nsPathResolver) throws HyracksDataException
{
+ INamespacePathResolver nsPathResolver, boolean
replaceableAccessor) throws HyracksDataException {
super(ioManager, cloudProperties, nsPathResolver);
accessor = new InitialCloudAccessor(cloudClient, bucket,
localIoManager);
- replacer = () -> {
- synchronized (this) {
- if (!accessor.isLocalAccessor()) {
- LOGGER.warn("Replacing cloud-accessor to local-accessor");
- accessor = new LocalAccessor(cloudClient, bucket,
localIoManager);
+ puncher = HolePuncherProvider.get(this, cloudProperties,
writeBufferProvider);
+ if (replaceableAccessor) {
+ replacer = InitialCloudAccessor.NO_OP_REPLACER;
+ } else {
+ replacer = () -> {
+ synchronized (this) {
+ if (!accessor.isLocalAccessor()) {
+ LOGGER.warn("Replacing cloud-accessor to
local-accessor");
+ accessor = new LocalAccessor(cloudClient, bucket,
localIoManager);
+ }
}
- }
- };
+ };
+ }
}
/*
@@ -104,7 +113,11 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
// Keep uncached files list (i.e., files exists in cloud only)
cloudFiles.removeAll(localFiles);
int remainingUncachedFiles = cloudFiles.size();
- if (remainingUncachedFiles > 0) {
+ boolean canReplaceAccessor = replacer !=
InitialCloudAccessor.NO_OP_REPLACER;
+ if (remainingUncachedFiles == 0 && canReplaceAccessor) {
+ // Everything is cached, no need to invoke cloud-based accessor
for read operations
+ accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+ } else {
LOGGER.debug("The number of uncached files: {}. Uncached files:
{}", remainingUncachedFiles, cloudFiles);
// Get list of FileReferences from the list of cloud (i.e.,
resolve each path's string to FileReference)
List<FileReference> uncachedFiles = resolve(cloudFiles);
@@ -115,13 +128,17 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
// Download all metadata files to avoid (List) calls to the cloud
when listing/reading these files
downloadMetadataFiles(downloader, uncachedFiles);
// Create a parallel cacher which download and monitor all
uncached files
- ParallelCacher cacher = new ParallelCacher(downloader,
uncachedFiles, true);
- // Local cache misses some files, cloud-based accessor is needed
for read operations
- accessor = new ReplaceableCloudAccessor(cloudClient, bucket,
localIoManager, partitions, replacer, cacher);
- } else {
- // Everything is cached, no need to invoke cloud-based accessor
for read operations
- accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+ ParallelCacher cacher = new ParallelCacher(downloader,
uncachedFiles, canReplaceAccessor);
+ // Local cache misses some files or SELECTIVE policy is used,
cloud-based accessor is needed
+ accessor = createAccessor(cacher, canReplaceAccessor);
+ }
+ }
+
+ private ILazyAccessor createAccessor(ParallelCacher cacher, boolean
canReplaceAccessor) {
+ if (canReplaceAccessor) {
+ return new ReplaceableCloudAccessor(cloudClient, bucket,
localIoManager, partitions, replacer, cacher);
}
+ return new SelectiveCloudAccessor(cloudClient, bucket, localIoManager,
partitions, puncher, cacher);
}
private void downloadMetadataPartition(IParallelDownloader downloader,
List<FileReference> uncachedFiles,
@@ -187,13 +204,12 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
@Override
public int punchHole(IFileHandle fileHandle, long offset, long length)
throws HyracksDataException {
- // TODO implement for Selective accessor
- return -1;
+ return accessor.doPunchHole(fileHandle, offset, length);
}
@Override
public void evict(FileReference directory) throws HyracksDataException {
- // TODO implement for Selective accessor
+ accessor.doEvict(directory);
}
private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws
HyracksDataException {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index bd6644c6da..7539aa731d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger;
* A parallel cacher that maintains and downloads (in parallel) all uncached
files
*
* @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
+ * @see org.apache.asterix.cloud.lazy.accessor.SelectiveCloudAccessor
*/
public final class ParallelCacher implements IParallelCacher {
private static final Logger LOGGER = LogManager.getLogger();
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
index c7ce2224cd..549cc3a678 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
@@ -61,4 +62,14 @@ abstract class AbstractLazyAccessor implements ILazyAccessor
{
}
return deletedFiles;
}
+
+ @Override
+ public int doPunchHole(IFileHandle sweeperFile, long offset, long length)
throws HyracksDataException {
+ throw new UnsupportedOperationException("PunchHole is not supported");
+ }
+
+ @Override
+ public void doEvict(FileReference directory) throws HyracksDataException {
+ throw new UnsupportedOperationException("Uncache is not supported");
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index 48f2ec7b6f..e6c06925c7 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -25,6 +25,7 @@ import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
/**
* An abstraction for lazy I/O operations
@@ -94,4 +95,20 @@ public interface ILazyAccessor {
* @param bytes to be written
*/
void doOverwrite(FileReference fileReference, byte[] bytes) throws
HyracksDataException;
+
+ /**
+ * Punch a hole in a sweepable file (only)
+ *
+ * @param fileHandle file handle
+ * @param offset starting offset
+ * @param length length
+ */
+ int doPunchHole(IFileHandle fileHandle, long offset, long length) throws
HyracksDataException;
+
+ /**
+ * Evicts a directory deletes it only in the local drive
+ *
+ * @param directory to evict
+ */
+ void doEvict(FileReference directory) throws HyracksDataException;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
index 798163dc12..8c3321ad86 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
@@ -22,6 +22,8 @@ import java.util.Collections;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.lazy.NoOpParallelCacher;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.control.nc.io.IOManager;
/**
@@ -29,10 +31,23 @@ import org.apache.hyracks.control.nc.io.IOManager;
* initializing the NC's partitions
*/
public class InitialCloudAccessor extends ReplaceableCloudAccessor {
- private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
+ public static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
};
public InitialCloudAccessor(ICloudClient cloudClient, String bucket,
IOManager localIoManager) {
super(cloudClient, bucket, localIoManager, Collections.emptySet(),
NO_OP_REPLACER, NoOpParallelCacher.INSTANCE);
}
+
+ @Override
+ public boolean doExists(FileReference fileRef) throws HyracksDataException
{
+ return localIoManager.exists(fileRef) || cloudClient.exists(bucket,
fileRef.getRelativePath());
+ }
+
+ @Override
+ public long doGetSize(FileReference fileReference) throws
HyracksDataException {
+ if (localIoManager.exists(fileReference)) {
+ return localIoManager.getSize(fileReference);
+ }
+ return cloudClient.getObjectSize(bucket,
fileReference.getRelativePath());
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index c53267488f..1a440e7efe 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -72,8 +72,10 @@ public class ReplaceableCloudAccessor extends
AbstractLazyAccessor {
@Override
public void doOnOpen(CloudFileHandle fileHandle) throws
HyracksDataException {
FileReference fileRef = fileHandle.getFileReference();
- if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket,
fileRef.getRelativePath())) {
- if (cacher.downloadDataFiles(fileRef)) {
+ if (!localIoManager.exists(fileRef) && cacher.isCacheable(fileRef)) {
+ boolean shouldReplace = fileRef.areHolesAllowed() ?
cacher.createEmptyDataFiles(fileRef)
+ : cacher.downloadDataFiles(fileRef);
+ if (shouldReplace) {
replace();
}
}
@@ -134,6 +136,11 @@ public class ReplaceableCloudAccessor extends
AbstractLazyAccessor {
}
}
+ @Override
+ public void doEvict(FileReference directory) throws HyracksDataException {
+ throw new UnsupportedOperationException("evict is not supported");
+ }
+
private Set<FileReference> cloudBackedList(FileReference dir,
FilenameFilter filter) throws HyracksDataException {
LOGGER.debug("CLOUD LIST: {}", dir);
Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket,
dir.getRelativePath(), filter);
@@ -169,7 +176,7 @@ public class ReplaceableCloudAccessor extends
AbstractLazyAccessor {
return
partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
}
- private void replace() throws HyracksDataException {
+ protected void replace() throws HyracksDataException {
cacher.close();
replacer.replace();
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
new file mode 100644
index 0000000000..934468aab8
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.asterix.cloud.UncachedFileReference;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.lazy.IParallelCacher;
+import org.apache.asterix.cloud.lazy.filesystem.IHolePuncher;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+public class SelectiveCloudAccessor extends ReplaceableCloudAccessor {
+ private final IHolePuncher puncher;
+
+ public SelectiveCloudAccessor(ICloudClient cloudClient, String bucket,
IOManager localIoManager,
+ Set<Integer> partitions, IHolePuncher puncher, IParallelCacher
cacher) {
+ super(cloudClient, bucket, localIoManager, partitions,
InitialCloudAccessor.NO_OP_REPLACER, cacher);
+ this.puncher = puncher;
+ }
+
+ @Override
+ public int doPunchHole(IFileHandle fileHandle, long offset, long length)
throws HyracksDataException {
+ return puncher.punchHole(fileHandle, offset, length);
+ }
+
+ @Override
+ public void doEvict(FileReference directory) throws HyracksDataException {
+ if (!directory.getFile().isDirectory()) {
+ throw new IllegalStateException(directory + " is not a directory");
+ }
+
+ // TODO only delete data files?
+ Collection<FileReference> uncachedFiles =
UncachedFileReference.toUncached(localIoManager.list(directory));
+ cacher.add(uncachedFiles);
+ localIoManager.delete(directory);
+ }
+
+ @Override
+ protected void replace() {
+ // NoOp
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
new file mode 100644
index 0000000000..4ea6c464e3
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.filesystem;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+
+public final class HolePuncherProvider {
+ private static final IHolePuncher UNSUPPORTED =
HolePuncherProvider::unsupported;
+
+ private HolePuncherProvider() {
+ }
+
+ public static IHolePuncher get(AbstractCloudIOManager cloudIOManager,
CloudProperties cloudProperties,
+ IWriteBufferProvider bufferProvider) {
+ if (cloudProperties.getCloudCachePolicy() !=
CloudCachePolicy.SELECTIVE) {
+ return UNSUPPORTED;
+ }
+
+ return new DebugHolePuncher(cloudIOManager, bufferProvider);
+ }
+
+ private static int unsupported(IFileHandle fileHandle, long offset, long
length) {
+ throw new UnsupportedOperationException("punchHole is not supported");
+ }
+
+ private static final class DebugHolePuncher implements IHolePuncher {
+ private final AbstractCloudIOManager cloudIOManager;
+ private final IWriteBufferProvider bufferProvider;
+
+ private DebugHolePuncher(AbstractCloudIOManager cloudIOManager,
IWriteBufferProvider bufferProvider) {
+ this.cloudIOManager = cloudIOManager;
+ this.bufferProvider = bufferProvider;
+ }
+
+ @Override
+ public int punchHole(IFileHandle fileHandle, long offset, long length)
throws HyracksDataException {
+ ByteBuffer buffer = acquireAndPrepareBuffer(length);
+ int totalWritten = 0;
+ try {
+ long remaining = length;
+ long position = offset;
+ while (remaining > 0) {
+ int written = cloudIOManager.localWriter(fileHandle,
position, buffer);
+ position += written;
+ remaining -= written;
+ totalWritten += written;
+ buffer.limit((int) Math.min(remaining, buffer.capacity()));
+ }
+ } finally {
+ bufferProvider.recycle(buffer);
+ }
+
+ return totalWritten;
+ }
+
+ private ByteBuffer acquireAndPrepareBuffer(long length) {
+ ByteBuffer buffer = bufferProvider.getBuffer();
+ buffer.clear();
+ if (buffer.capacity() >= length) {
+ buffer.limit((int) length);
+ }
+
+ while (buffer.remaining() > Long.BYTES) {
+ buffer.putLong(0L);
+ }
+
+ while (buffer.remaining() > 0) {
+ buffer.put((byte) 0);
+ }
+
+ buffer.flip();
+ return buffer;
+ }
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
similarity index 51%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
copy to
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
index 798163dc12..369c24732a 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
@@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.cloud.lazy.accessor;
+package org.apache.asterix.cloud.lazy.filesystem;
-import java.util.Collections;
-
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.lazy.NoOpParallelCacher;
-import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
/**
- * Initial accessor to allow {@link
org.apache.asterix.common.transactions.IGlobalTransactionContext} to work before
- * initializing the NC's partitions
+ * An interface used to implement an OS dependent for the hole punching
operation
*/
-public class InitialCloudAccessor extends ReplaceableCloudAccessor {
- private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
- };
+@FunctionalInterface
+public interface IHolePuncher {
- public InitialCloudAccessor(ICloudClient cloudClient, String bucket,
IOManager localIoManager) {
- super(cloudClient, bucket, localIoManager, Collections.emptySet(),
NO_OP_REPLACER, NoOpParallelCacher.INSTANCE);
- }
+ /**
+ * Punch a hole in a sweeper file (only)
+ *
+ * @param file sweeper file
+ * @param offset starting offset
+ * @param length length
+ */
+ int punchHole(IFileHandle file, long offset, long length) throws
HyracksDataException;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
index c6858c5748..e8e3334857 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
@@ -26,7 +26,8 @@ import java.util.stream.Collectors;
public enum CloudCachePolicy {
EAGER("eager"),
- LAZY("lazy");
+ LAZY("lazy"),
+ SELECTIVE("selective");
private static final Map<String, CloudCachePolicy> partitioningSchemes =
Collections.unmodifiableMap(Arrays.stream(CloudCachePolicy.values())
.collect(Collectors.toMap(CloudCachePolicy::getPolicyName,
Function.identity())));