This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 498eb1f073 [ASTERIXDB-3218][*DB] Utilize local accessor when
everything is cached
498eb1f073 is described below
commit 498eb1f0739244cea293819f8773ed0ca8b69e4a
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Thu Jul 13 15:33:35 2023 -0700
[ASTERIXDB-3218][*DB] Utilize local accessor when everything is cached
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
With the lazy caching policy, every I/O request (e.g., list) has
to be accompanied by a cloud storage request to reconcile any
differences between the local cache and what is stored in the
cloud. However, those requests are not necessary if both
the cloud and the local cache are in sync. This patch removes
all those unnecessary cloud requests once everything is cached.
Change-Id: I0c5c2c09ce62930cec109e465f4fcff01423889c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17640
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../asterix/cloud/AbstractCloudIOManager.java | 28 ---
.../apache/asterix/cloud/EagerCloudIOManager.java | 33 +++-
.../apache/asterix/cloud/LazyCloudIOManager.java | 132 +++++++-------
.../cloud/lazy/accessor/AbstractLazyAccessor.java | 59 ++++++
.../asterix/cloud/lazy/accessor/ILazyAccessor.java | 46 +++++
.../cloud/lazy/accessor/ILazyAccessorReplacer.java | 23 +++
.../cloud/lazy/accessor/InitialCloudAccessor.java | 49 +++++
.../asterix/cloud/lazy/accessor/LocalAccessor.java | 83 +++++++++
.../lazy/accessor/ReplaceableCloudAccessor.java | 198 +++++++++++++++++++++
9 files changed, 557 insertions(+), 94 deletions(-)
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 49f71f49d2..3288444f9f 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -26,11 +26,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
import org.apache.asterix.cloud.clients.CloudClientProvider;
@@ -150,11 +148,6 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
* ******************************************************************
*/
- @Override
- public boolean exists(FileReference fileRef) throws HyracksDataException {
- return localIoManager.exists(fileRef) || cloudClient.exists(bucket,
fileRef.getRelativePath());
- }
-
@Override
public final IFileHandle open(FileReference fileRef, FileReadWriteMode
rwMode, FileSyncMode syncMode)
throws HyracksDataException {
@@ -203,20 +196,6 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
return writtenBytes;
}
- @Override
- public final void delete(FileReference fileRef) throws
HyracksDataException {
- // Never delete the storage dir in cloud storage
- if
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
{
- File localFile = fileRef.getFile();
- // if file reference exists,and it is a file, then list is not
required
- Set<String> paths =
- localFile.exists() && localFile.isFile() ?
Collections.singleton(fileRef.getRelativePath())
- :
list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
- cloudClient.deleteObjects(bucket, paths);
- }
- localIoManager.delete(fileRef);
- }
-
@Override
public IIOBulkOperation createDeleteBulkOperation() {
return new DeleteBulkCloudOperation(localIoManager, bucket,
cloudClient);
@@ -258,13 +237,6 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
localIoManager.sync(fileHandle, metadata);
}
- @Override
- public final void overwrite(FileReference fileRef, byte[] bytes) throws
HyracksDataException {
- // Write here will overwrite the older object if exists
- cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
- localIoManager.overwrite(fileRef, bytes);
- }
-
@Override
public final void create(FileReference fileRef) throws
HyracksDataException {
// We need to delete the local file on create as the cloud storage
didn't complete the upload
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 0c61957f34..f869f3706c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -20,12 +20,17 @@ package org.apache.asterix.cloud;
import static
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+import java.io.File;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -37,7 +42,7 @@ import org.apache.logging.log4j.Logger;
* OR
* - {@link AbstractCloudIOManager}
*/
-class EagerCloudIOManager extends AbstractCloudIOManager {
+final class EagerCloudIOManager extends AbstractCloudIOManager {
private static final Logger LOGGER = LogManager.getLogger();
public EagerCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties) throws HyracksDataException {
@@ -66,4 +71,30 @@ class EagerCloudIOManager extends AbstractCloudIOManager {
protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode
rwMode, FileSyncMode syncMode) {
// NoOp
}
+
+ @Override
+ public boolean exists(FileReference fileRef) throws HyracksDataException {
+ return localIoManager.exists(fileRef);
+ }
+
+ @Override
+ public void delete(FileReference fileRef) throws HyracksDataException {
+ // Never delete the storage dir in cloud storage
+ if
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
{
+ File localFile = fileRef.getFile();
+ // if file reference exists,and it is a file, then list is not
required
+ Set<String> paths =
+ localFile.exists() && localFile.isFile() ?
Collections.singleton(fileRef.getRelativePath())
+ :
list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
+ cloudClient.deleteObjects(bucket, paths);
+ }
+ localIoManager.delete(fileRef);
+ }
+
+ @Override
+ public void overwrite(FileReference fileRef, byte[] bytes) throws
HyracksDataException {
+ // Write here will overwrite the older object if exists
+ cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
+ localIoManager.overwrite(fileRef, bytes);
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 2232dcdeb7..0a83687206 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -18,19 +18,24 @@
*/
package org.apache.asterix.cloud;
-import static
org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
-import java.io.File;
import java.io.FilenameFilter;
-import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.stream.Collectors;
-import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
+import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
+import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
+import org.apache.asterix.cloud.lazy.accessor.LocalAccessor;
+import org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor;
import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,12 +43,24 @@ import org.apache.logging.log4j.Logger;
/**
* CloudIOManager with lazy caching
* - Overrides some of {@link IOManager} functions
+ * Note: once everything is cached, this will eventually be similar to {@link
EagerCloudIOManager}
*/
-class LazyCloudIOManager extends AbstractCloudIOManager {
+final class LazyCloudIOManager extends AbstractCloudIOManager {
private static final Logger LOGGER = LogManager.getLogger();
+ private final ILazyAccessorReplacer replacer;
+ private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties) throws HyracksDataException {
super(ioManager, cloudProperties);
+ accessor = new InitialCloudAccessor(cloudClient, bucket,
localIoManager, writeBufferProvider);
+ replacer = () -> {
+ synchronized (this) {
+ if (!accessor.isLocalAccessor()) {
+ LOGGER.warn("Replacing cloud-accessor to local-accessor");
+ accessor = new LocalAccessor(cloudClient, bucket,
localIoManager);
+ }
+ }
+ };
}
/*
@@ -53,28 +70,40 @@ class LazyCloudIOManager extends AbstractCloudIOManager {
*/
@Override
- protected void downloadPartitions() {
- // NoOp
+ protected void downloadPartitions() throws HyracksDataException {
+ // Get the files in all relevant partitions from the cloud
+ Set<String> cloudFiles = cloudClient.listObjects(bucket,
STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
+ .filter(f ->
partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f)))
+ .collect(Collectors.toSet());
+
+ // Get all files stored locally
+ Set<String> localFiles = new HashSet<>();
+ for (IODeviceHandle deviceHandle : getIODevices()) {
+ FileReference storageRoot =
deviceHandle.createFileRef(STORAGE_ROOT_DIR_NAME);
+ Set<FileReference> deviceFiles = localIoManager.list(storageRoot,
IoUtil.NO_OP_FILTER);
+ for (FileReference fileReference : deviceFiles) {
+ localFiles.add(fileReference.getRelativePath());
+ }
+ }
+
+ // Keep uncached files list (i.e., files exists in cloud only)
+ cloudFiles.removeAll(localFiles);
+ int remainingUncachedFiles = cloudFiles.size();
+ if (remainingUncachedFiles > 0) {
+ // Local cache misses some files, cloud-based accessor is needed
for read operations
+ accessor = new ReplaceableCloudAccessor(cloudClient, bucket,
localIoManager, partitions,
+ remainingUncachedFiles, writeBufferProvider, replacer);
+ } else {
+ // Everything is cached, no need to invoke cloud-based accessor
for read operations
+ accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+ }
+ LOGGER.info("The number of uncached files: {}",
remainingUncachedFiles);
}
@Override
protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode
rwMode, FileSyncMode syncMode)
throws HyracksDataException {
- FileReference fileRef = fileHandle.getFileReference();
- if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket,
fileRef.getRelativePath())) {
- // File doesn't exist locally, download it.
- ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
- try {
- // TODO download for all partitions at once
- LOGGER.info("Downloading {} from S3..",
fileRef.getRelativePath());
- CloudFileUtil.downloadFile(localIoManager, cloudClient,
bucket, fileHandle, rwMode, syncMode,
- writeBuffer);
- localIoManager.close(fileHandle);
- LOGGER.info("Finished downloading {} from S3..",
fileRef.getRelativePath());
- } finally {
- writeBufferProvider.recycle(writeBuffer);
- }
- }
+ accessor.doOnOpen(fileHandle, rwMode, syncMode);
}
/*
@@ -84,58 +113,31 @@ class LazyCloudIOManager extends AbstractCloudIOManager {
*/
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
- Set<String> cloudFiles = cloudClient.listObjects(bucket,
dir.getRelativePath(), filter);
- if (cloudFiles.isEmpty()) {
- return Collections.emptySet();
- }
-
- // First get the set of local files
- Set<FileReference> localFiles = localIoManager.list(dir, filter);
-
- // Reconcile local files and cloud files
- for (FileReference file : localFiles) {
- String path = file.getRelativePath();
- if (!cloudFiles.contains(path)) {
- throw new IllegalStateException("Local file is not clean");
- } else {
- // No need to re-add it in the following loop
- cloudFiles.remove(path);
- }
- }
+ return accessor.doList(dir, filter);
+ }
- // Add the remaining files that are not stored locally in their
designated partitions (if any)
- for (String cloudFile : cloudFiles) {
- FileReference localFile = resolve(cloudFile);
- if (isInNodePartition(cloudFile) &&
dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
- localFiles.add(localFile);
- }
- }
- return new HashSet<>(localFiles);
+ @Override
+ public boolean exists(FileReference fileRef) throws HyracksDataException {
+ return accessor.doExists(fileRef);
}
@Override
public long getSize(FileReference fileReference) throws
HyracksDataException {
- if (localIoManager.exists(fileReference)) {
- return localIoManager.getSize(fileReference);
- }
- return cloudClient.getObjectSize(bucket,
fileReference.getRelativePath());
+ return accessor.doGetSize(fileReference);
}
@Override
public byte[] readAllBytes(FileReference fileRef) throws
HyracksDataException {
- if (!localIoManager.exists(fileRef) &&
isInNodePartition(fileRef.getRelativePath())) {
- byte[] bytes = cloudClient.readAllBytes(bucket,
fileRef.getRelativePath());
- if (bytes != null && !partitions.isEmpty()) {
- localIoManager.overwrite(fileRef, bytes);
- }
- return bytes;
- }
- return localIoManager.readAllBytes(fileRef);
+ return accessor.doReadAllBytes(fileRef);
}
- private boolean isInNodePartition(String path) {
- int start = path.indexOf(PARTITION_DIR_PREFIX) +
PARTITION_DIR_PREFIX.length();
- int length = path.indexOf(File.separatorChar, start);
- return partitions.contains(Integer.parseInt(path.substring(start,
length)));
+ @Override
+ public void delete(FileReference fileRef) throws HyracksDataException {
+ accessor.doDelete(fileRef);
+ }
+
+ @Override
+ public void overwrite(FileReference fileRef, byte[] bytes) throws
HyracksDataException {
+ accessor.doOverwrite(fileRef, bytes);
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
new file mode 100644
index 0000000000..de7efc188d
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import static
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+abstract class AbstractLazyAccessor implements ILazyAccessor {
+ protected final ICloudClient cloudClient;
+ protected final String bucket;
+ protected final IOManager localIoManager;
+
+ AbstractLazyAccessor(ICloudClient cloudClient, String bucket, IOManager
localIoManager) {
+ this.cloudClient = cloudClient;
+ this.bucket = bucket;
+ this.localIoManager = localIoManager;
+ }
+
+ int doCloudDelete(FileReference fileReference) throws HyracksDataException
{
+ int numberOfCloudDeletes = 0;
+ if
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileReference.getAbsolutePath())))
{
+ File localFile = fileReference.getFile();
+ // if file reference exists,and it is a file, then list is not
required
+ Set<String> paths =
+ localFile.exists() && localFile.isFile() ?
Collections.singleton(fileReference.getRelativePath())
+ : doList(fileReference,
IoUtil.NO_OP_FILTER).stream().map(FileReference::getRelativePath)
+ .collect(Collectors.toSet());
+ cloudClient.deleteObjects(bucket, paths);
+ numberOfCloudDeletes = paths.size();
+ }
+ return numberOfCloudDeletes;
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
new file mode 100644
index 0000000000..efedd6400a
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public interface ILazyAccessor {
+ boolean isLocalAccessor();
+
+ void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode
rwMode, IIOManager.FileSyncMode syncMode)
+ throws HyracksDataException;
+
+ Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws
HyracksDataException;
+
+ boolean doExists(FileReference fileRef) throws HyracksDataException;
+
+ long doGetSize(FileReference fileReference) throws HyracksDataException;
+
+ byte[] doReadAllBytes(FileReference fileReference) throws
HyracksDataException;
+
+ void doDelete(FileReference fileReference) throws HyracksDataException;
+
+ void doOverwrite(FileReference fileReference, byte[] bytes) throws
HyracksDataException;
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
new file mode 100644
index 0000000000..3a4ff8ab09
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+public interface ILazyAccessorReplacer {
+ void replace();
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
new file mode 100644
index 0000000000..b93da54bfe
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.util.Collections;
+
+import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+/**
+ * Initial accessor to allow {@link
org.apache.asterix.common.transactions.IGlobalTransactionContext} to work before
+ * initializing the NC's partitions
+ */
+public class InitialCloudAccessor extends ReplaceableCloudAccessor {
+ private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
+ };
+
+ public InitialCloudAccessor(ICloudClient cloudClient, String bucket,
IOManager localIoManager,
+ WriteBufferProvider writeBufferProvider) {
+ super(cloudClient, bucket, localIoManager, Collections.emptySet(), 0,
writeBufferProvider, NO_OP_REPLACER);
+ }
+
+ @Override
+ protected void decrementNumberOfUncachedFiles() {
+ // No Op
+ }
+
+ @Override
+ protected void decrementNumberOfUncachedFiles(int count) {
+ // No Op
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
new file mode 100644
index 0000000000..6c480ec777
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+/**
+ * LocalAccessor would be used once everything in the cloud storage is cached
locally
+ */
+public class LocalAccessor extends AbstractLazyAccessor {
+
+ public LocalAccessor(ICloudClient cloudClient, String bucket, IOManager
localIoManager) {
+ super(cloudClient, bucket, localIoManager);
+ }
+
+ @Override
+ public boolean isLocalAccessor() {
+ return true;
+ }
+
+ @Override
+ public void doOnOpen(CloudFileHandle fileHandle,
IIOManager.FileReadWriteMode rwMode,
+ IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+ // NoOp
+ }
+
+ @Override
+ public Set<FileReference> doList(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
+ return localIoManager.list(dir, filter);
+ }
+
+ @Override
+ public boolean doExists(FileReference fileRef) throws HyracksDataException
{
+ return localIoManager.exists(fileRef);
+ }
+
+ @Override
+ public long doGetSize(FileReference fileReference) throws
HyracksDataException {
+ return localIoManager.getSize(fileReference);
+ }
+
+ @Override
+ public byte[] doReadAllBytes(FileReference fileReference) throws
HyracksDataException {
+ return localIoManager.readAllBytes(fileReference);
+ }
+
+ @Override
+ public void doDelete(FileReference fileReference) throws
HyracksDataException {
+ // Never delete the storage dir in cloud storage
+ doCloudDelete(fileReference);
+ localIoManager.delete(fileReference);
+ }
+
+ @Override
+ public void doOverwrite(FileReference fileReference, byte[] bytes) throws
HyracksDataException {
+ cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+ localIoManager.overwrite(fileReference, bytes);
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
new file mode 100644
index 0000000000..e36ac66cfd
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * ReplaceableCloudAccessor will be used when some (or all) of the files in
the cloud storage are not cached locally.
+ * It will be replaced by {@link LocalAccessor} once everything is cached
+ */
+public class ReplaceableCloudAccessor extends AbstractLazyAccessor {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final Set<Integer> partitions;
+ private final AtomicInteger numberOfUncachedFiles;
+ private final WriteBufferProvider writeBufferProvider;
+ private final ILazyAccessorReplacer replacer;
+
+ public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket,
IOManager localIoManager,
+ Set<Integer> partitions, int numberOfUncachedFiles,
WriteBufferProvider writeBufferProvider,
+ ILazyAccessorReplacer replacer) {
+ super(cloudClient, bucket, localIoManager);
+ this.partitions = partitions;
+ this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
+ this.writeBufferProvider = writeBufferProvider;
+ this.replacer = replacer;
+ }
+
+ @Override
+ public boolean isLocalAccessor() {
+ return false;
+ }
+
+ @Override
+ public void doOnOpen(CloudFileHandle fileHandle,
IIOManager.FileReadWriteMode rwMode,
+ IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+ FileReference fileRef = fileHandle.getFileReference();
+ if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket,
fileRef.getRelativePath())) {
+ // File doesn't exist locally, download it.
+ ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
+ try {
+ // TODO download for all partitions at once
+ LOGGER.info("Downloading {} from S3..",
fileRef.getRelativePath());
+ CloudFileUtil.downloadFile(localIoManager, cloudClient,
bucket, fileHandle, rwMode, syncMode,
+ writeBuffer);
+ localIoManager.close(fileHandle);
+ LOGGER.info("Finished downloading {} from S3..",
fileRef.getRelativePath());
+ } finally {
+ writeBufferProvider.recycle(writeBuffer);
+ }
+ // TODO decrement by the number of downloaded files in all
partitions (once the above TODO is fixed)
+ decrementNumberOfUncachedFiles();
+ }
+ }
+
+ @Override
+ public Set<FileReference> doList(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
+ Set<String> cloudFiles = cloudClient.listObjects(bucket,
dir.getRelativePath(), filter);
+ if (cloudFiles.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ // First get the set of local files
+ Set<FileReference> localFiles = localIoManager.list(dir, filter);
+
+ // Reconcile local files and cloud files
+ for (FileReference file : localFiles) {
+ String path = file.getRelativePath();
+ if (!cloudFiles.contains(path)) {
+ throw new IllegalStateException("Local file is not clean");
+ } else {
+ // No need to re-add it in the following loop
+ cloudFiles.remove(path);
+ }
+ }
+
+ // Add the remaining files that are not stored locally in their
designated partitions (if any)
+ for (String cloudFile : cloudFiles) {
+ FileReference localFile = localIoManager.resolve(cloudFile);
+ if (isInNodePartition(cloudFile) &&
dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
+ localFiles.add(localFile);
+ }
+ }
+ return localFiles;
+ }
+
+ @Override
+ public boolean doExists(FileReference fileRef) throws HyracksDataException
{
+ return localIoManager.exists(fileRef) || cloudClient.exists(bucket,
fileRef.getRelativePath());
+ }
+
+ @Override
+ public long doGetSize(FileReference fileReference) throws
HyracksDataException {
+ if (localIoManager.exists(fileReference)) {
+ return localIoManager.getSize(fileReference);
+ }
+ return cloudClient.getObjectSize(bucket,
fileReference.getRelativePath());
+ }
+
+ @Override
+ public byte[] doReadAllBytes(FileReference fileRef) throws
HyracksDataException {
+ if (!localIoManager.exists(fileRef) &&
isInNodePartition(fileRef.getRelativePath())) {
+ byte[] bytes = cloudClient.readAllBytes(bucket,
fileRef.getRelativePath());
+ if (bytes != null && !partitions.isEmpty()) {
+ // Download the missing file for subsequent reads
+ localIoManager.overwrite(fileRef, bytes);
+ decrementNumberOfUncachedFiles();
+ }
+ return bytes;
+ }
+ return localIoManager.readAllBytes(fileRef);
+ }
+
+ @Override
+ public void doDelete(FileReference fileReference) throws
HyracksDataException {
+ // Never delete the storage dir in cloud storage
+ int numberOfCloudDeletes = doCloudDelete(fileReference);
+ // check local
+ if (numberOfCloudDeletes > 0 && localIoManager.exists(fileReference)) {
+ int numberOfLocalDeletes = fileReference.getFile().isFile() ? 1 :
localIoManager.list(fileReference).size();
+ // Decrement by number of cloud deletes that have no counterparts
locally
+ decrementNumberOfUncachedFiles(numberOfCloudDeletes -
numberOfLocalDeletes);
+ }
+
+ // Finally, delete locally
+ localIoManager.delete(fileReference);
+ }
+
+ @Override
+ public void doOverwrite(FileReference fileReference, byte[] bytes) throws
HyracksDataException {
+ boolean existsLocally = localIoManager.exists(fileReference);
+ cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+ localIoManager.overwrite(fileReference, bytes);
+
+ if (!existsLocally) {
+ decrementNumberOfUncachedFiles();
+ }
+ }
+
+ protected void decrementNumberOfUncachedFiles() {
+ replaceAccessor(numberOfUncachedFiles.decrementAndGet());
+ }
+
+ protected void decrementNumberOfUncachedFiles(int count) {
+ if (count > 0) {
+ replaceAccessor(numberOfUncachedFiles.addAndGet(-count));
+ }
+ }
+
+ private boolean isInNodePartition(String path) {
+ return
partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
+ }
+
+ void replaceAccessor(int remainingUncached) {
+ if (remainingUncached > 0) {
+ // Some files still not cached yet
+ return;
+ }
+
+ if (remainingUncached < 0) {
+ // This should not happen, log in case that happen
+ LOGGER.warn("Some files were downloaded multiple times. Reported
remaining uncached files = {}",
+ remainingUncached);
+ }
+ replacer.replace();
+ }
+}