This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 498eb1f073 [ASTERIXDB-3218][*DB] Utilize local accessor when 
everything is cached
498eb1f073 is described below

commit 498eb1f0739244cea293819f8773ed0ca8b69e4a
Author: Wail Alkowaileet <wael....@gmail.com>
AuthorDate: Thu Jul 13 15:33:35 2023 -0700

    [ASTERIXDB-3218][*DB] Utilize local accessor when everything is cached
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    With the lazy caching policy, every I/O request (e.g., list) has
    to be accompanied by a cloud storage request to reconcile any
    differences between the local cache and what is stored in the
    cloud. However, those requests are not necessary if both
    the cloud and the local cache are in sync. This patch removes
    all those unnecessary cloud requests once everything is cached.
    
    Change-Id: I0c5c2c09ce62930cec109e465f4fcff01423889c
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17640
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wail Alkowaileet <wael....@gmail.com>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
---
 .../asterix/cloud/AbstractCloudIOManager.java      |  28 ---
 .../apache/asterix/cloud/EagerCloudIOManager.java  |  33 +++-
 .../apache/asterix/cloud/LazyCloudIOManager.java   | 132 +++++++-------
 .../cloud/lazy/accessor/AbstractLazyAccessor.java  |  59 ++++++
 .../asterix/cloud/lazy/accessor/ILazyAccessor.java |  46 +++++
 .../cloud/lazy/accessor/ILazyAccessorReplacer.java |  23 +++
 .../cloud/lazy/accessor/InitialCloudAccessor.java  |  49 +++++
 .../asterix/cloud/lazy/accessor/LocalAccessor.java |  83 +++++++++
 .../lazy/accessor/ReplaceableCloudAccessor.java    | 198 +++++++++++++++++++++
 9 files changed, 557 insertions(+), 94 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 49f71f49d2..3288444f9f 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -26,11 +26,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
 import org.apache.asterix.cloud.clients.CloudClientProvider;
@@ -150,11 +148,6 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
      * ******************************************************************
      */
 
-    @Override
-    public boolean exists(FileReference fileRef) throws HyracksDataException {
-        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, 
fileRef.getRelativePath());
-    }
-
     @Override
     public final IFileHandle open(FileReference fileRef, FileReadWriteMode 
rwMode, FileSyncMode syncMode)
             throws HyracksDataException {
@@ -203,20 +196,6 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
         return writtenBytes;
     }
 
-    @Override
-    public final void delete(FileReference fileRef) throws 
HyracksDataException {
-        // Never delete the storage dir in cloud storage
-        if 
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
 {
-            File localFile = fileRef.getFile();
-            // if file reference exists,and it is a file, then list is not 
required
-            Set<String> paths =
-                    localFile.exists() && localFile.isFile() ? 
Collections.singleton(fileRef.getRelativePath())
-                            : 
list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
-            cloudClient.deleteObjects(bucket, paths);
-        }
-        localIoManager.delete(fileRef);
-    }
-
     @Override
     public IIOBulkOperation createDeleteBulkOperation() {
         return new DeleteBulkCloudOperation(localIoManager, bucket, 
cloudClient);
@@ -258,13 +237,6 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
         localIoManager.sync(fileHandle, metadata);
     }
 
-    @Override
-    public final void overwrite(FileReference fileRef, byte[] bytes) throws 
HyracksDataException {
-        // Write here will overwrite the older object if exists
-        cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
-        localIoManager.overwrite(fileRef, bytes);
-    }
-
     @Override
     public final void create(FileReference fileRef) throws 
HyracksDataException {
         // We need to delete the local file on create as the cloud storage 
didn't complete the upload
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 0c61957f34..f869f3706c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -20,12 +20,17 @@ package org.apache.asterix.cloud;
 
 import static 
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
+import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -37,7 +42,7 @@ import org.apache.logging.log4j.Logger;
  * OR
  * - {@link AbstractCloudIOManager}
  */
-class EagerCloudIOManager extends AbstractCloudIOManager {
+final class EagerCloudIOManager extends AbstractCloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
 
     public EagerCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties) throws HyracksDataException {
@@ -66,4 +71,30 @@ class EagerCloudIOManager extends AbstractCloudIOManager {
     protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode 
rwMode, FileSyncMode syncMode) {
         // NoOp
     }
+
+    @Override
+    public boolean exists(FileReference fileRef) throws HyracksDataException {
+        return localIoManager.exists(fileRef);
+    }
+
+    @Override
+    public void delete(FileReference fileRef) throws HyracksDataException {
+        // Never delete the storage dir in cloud storage
+        if 
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
 {
+            File localFile = fileRef.getFile();
+            // if file reference exists,and it is a file, then list is not 
required
+            Set<String> paths =
+                    localFile.exists() && localFile.isFile() ? 
Collections.singleton(fileRef.getRelativePath())
+                            : 
list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
+            cloudClient.deleteObjects(bucket, paths);
+        }
+        localIoManager.delete(fileRef);
+    }
+
+    @Override
+    public void overwrite(FileReference fileRef, byte[] bytes) throws 
HyracksDataException {
+        // Write here will overwrite the older object if exists
+        cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
+        localIoManager.overwrite(fileRef, bytes);
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 2232dcdeb7..0a83687206 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -18,19 +18,24 @@
  */
 package org.apache.asterix.cloud;
 
-import static 
org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static 
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
-import java.io.File;
 import java.io.FilenameFilter;
-import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
-import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
+import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
+import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
+import org.apache.asterix.cloud.lazy.accessor.LocalAccessor;
+import org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor;
 import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -38,12 +43,24 @@ import org.apache.logging.log4j.Logger;
 /**
  * CloudIOManager with lazy caching
  * - Overrides some of {@link IOManager} functions
+ * Note: once everything is cached, this will eventually be similar to {@link 
EagerCloudIOManager}
  */
-class LazyCloudIOManager extends AbstractCloudIOManager {
+final class LazyCloudIOManager extends AbstractCloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
+    private final ILazyAccessorReplacer replacer;
+    private ILazyAccessor accessor;
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties) throws HyracksDataException {
         super(ioManager, cloudProperties);
+        accessor = new InitialCloudAccessor(cloudClient, bucket, 
localIoManager, writeBufferProvider);
+        replacer = () -> {
+            synchronized (this) {
+                if (!accessor.isLocalAccessor()) {
+                    LOGGER.warn("Replacing cloud-accessor to local-accessor");
+                    accessor = new LocalAccessor(cloudClient, bucket, 
localIoManager);
+                }
+            }
+        };
     }
 
     /*
@@ -53,28 +70,40 @@ class LazyCloudIOManager extends AbstractCloudIOManager {
      */
 
     @Override
-    protected void downloadPartitions() {
-        // NoOp
+    protected void downloadPartitions() throws HyracksDataException {
+        // Get the files in all relevant partitions from the cloud
+        Set<String> cloudFiles = cloudClient.listObjects(bucket, 
STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
+                .filter(f -> 
partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f)))
+                .collect(Collectors.toSet());
+
+        // Get all files stored locally
+        Set<String> localFiles = new HashSet<>();
+        for (IODeviceHandle deviceHandle : getIODevices()) {
+            FileReference storageRoot = 
deviceHandle.createFileRef(STORAGE_ROOT_DIR_NAME);
+            Set<FileReference> deviceFiles = localIoManager.list(storageRoot, 
IoUtil.NO_OP_FILTER);
+            for (FileReference fileReference : deviceFiles) {
+                localFiles.add(fileReference.getRelativePath());
+            }
+        }
+
+        // Keep uncached files list (i.e., files exists in cloud only)
+        cloudFiles.removeAll(localFiles);
+        int remainingUncachedFiles = cloudFiles.size();
+        if (remainingUncachedFiles > 0) {
+            // Local cache misses some files, cloud-based accessor is needed 
for read operations
+            accessor = new ReplaceableCloudAccessor(cloudClient, bucket, 
localIoManager, partitions,
+                    remainingUncachedFiles, writeBufferProvider, replacer);
+        } else {
+            // Everything is cached, no need to invoke cloud-based accessor 
for read operations
+            accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
+        }
+        LOGGER.info("The number of uncached files: {}", 
remainingUncachedFiles);
     }
 
     @Override
     protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode 
rwMode, FileSyncMode syncMode)
             throws HyracksDataException {
-        FileReference fileRef = fileHandle.getFileReference();
-        if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, 
fileRef.getRelativePath())) {
-            // File doesn't exist locally, download it.
-            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
-            try {
-                // TODO download for all partitions at once
-                LOGGER.info("Downloading {} from S3..", 
fileRef.getRelativePath());
-                CloudFileUtil.downloadFile(localIoManager, cloudClient, 
bucket, fileHandle, rwMode, syncMode,
-                        writeBuffer);
-                localIoManager.close(fileHandle);
-                LOGGER.info("Finished downloading {} from S3..", 
fileRef.getRelativePath());
-            } finally {
-                writeBufferProvider.recycle(writeBuffer);
-            }
-        }
+        accessor.doOnOpen(fileHandle, rwMode, syncMode);
     }
 
     /*
@@ -84,58 +113,31 @@ class LazyCloudIOManager extends AbstractCloudIOManager {
      */
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) 
throws HyracksDataException {
-        Set<String> cloudFiles = cloudClient.listObjects(bucket, 
dir.getRelativePath(), filter);
-        if (cloudFiles.isEmpty()) {
-            return Collections.emptySet();
-        }
-
-        // First get the set of local files
-        Set<FileReference> localFiles = localIoManager.list(dir, filter);
-
-        // Reconcile local files and cloud files
-        for (FileReference file : localFiles) {
-            String path = file.getRelativePath();
-            if (!cloudFiles.contains(path)) {
-                throw new IllegalStateException("Local file is not clean");
-            } else {
-                // No need to re-add it in the following loop
-                cloudFiles.remove(path);
-            }
-        }
+        return accessor.doList(dir, filter);
+    }
 
-        // Add the remaining files that are not stored locally in their 
designated partitions (if any)
-        for (String cloudFile : cloudFiles) {
-            FileReference localFile = resolve(cloudFile);
-            if (isInNodePartition(cloudFile) && 
dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
-                localFiles.add(localFile);
-            }
-        }
-        return new HashSet<>(localFiles);
+    @Override
+    public boolean exists(FileReference fileRef) throws HyracksDataException {
+        return accessor.doExists(fileRef);
     }
 
     @Override
     public long getSize(FileReference fileReference) throws 
HyracksDataException {
-        if (localIoManager.exists(fileReference)) {
-            return localIoManager.getSize(fileReference);
-        }
-        return cloudClient.getObjectSize(bucket, 
fileReference.getRelativePath());
+        return accessor.doGetSize(fileReference);
     }
 
     @Override
     public byte[] readAllBytes(FileReference fileRef) throws 
HyracksDataException {
-        if (!localIoManager.exists(fileRef) && 
isInNodePartition(fileRef.getRelativePath())) {
-            byte[] bytes = cloudClient.readAllBytes(bucket, 
fileRef.getRelativePath());
-            if (bytes != null && !partitions.isEmpty()) {
-                localIoManager.overwrite(fileRef, bytes);
-            }
-            return bytes;
-        }
-        return localIoManager.readAllBytes(fileRef);
+        return accessor.doReadAllBytes(fileRef);
     }
 
-    private boolean isInNodePartition(String path) {
-        int start = path.indexOf(PARTITION_DIR_PREFIX) + 
PARTITION_DIR_PREFIX.length();
-        int length = path.indexOf(File.separatorChar, start);
-        return partitions.contains(Integer.parseInt(path.substring(start, 
length)));
+    @Override
+    public void delete(FileReference fileRef) throws HyracksDataException {
+        accessor.doDelete(fileRef);
+    }
+
+    @Override
+    public void overwrite(FileReference fileRef, byte[] bytes) throws 
HyracksDataException {
+        accessor.doOverwrite(fileRef, bytes);
     }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
new file mode 100644
index 0000000000..de7efc188d
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import static 
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+abstract class AbstractLazyAccessor implements ILazyAccessor {
+    protected final ICloudClient cloudClient;
+    protected final String bucket;
+    protected final IOManager localIoManager;
+
+    AbstractLazyAccessor(ICloudClient cloudClient, String bucket, IOManager 
localIoManager) {
+        this.cloudClient = cloudClient;
+        this.bucket = bucket;
+        this.localIoManager = localIoManager;
+    }
+
+    int doCloudDelete(FileReference fileReference) throws HyracksDataException 
{
+        int numberOfCloudDeletes = 0;
+        if 
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileReference.getAbsolutePath())))
 {
+            File localFile = fileReference.getFile();
+            // if file reference exists,and it is a file, then list is not 
required
+            Set<String> paths =
+                    localFile.exists() && localFile.isFile() ? 
Collections.singleton(fileReference.getRelativePath())
+                            : doList(fileReference, 
IoUtil.NO_OP_FILTER).stream().map(FileReference::getRelativePath)
+                                    .collect(Collectors.toSet());
+            cloudClient.deleteObjects(bucket, paths);
+            numberOfCloudDeletes = paths.size();
+        }
+        return numberOfCloudDeletes;
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
new file mode 100644
index 0000000000..efedd6400a
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public interface ILazyAccessor {
+    boolean isLocalAccessor();
+
+    void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode 
rwMode, IIOManager.FileSyncMode syncMode)
+            throws HyracksDataException;
+
+    Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws 
HyracksDataException;
+
+    boolean doExists(FileReference fileRef) throws HyracksDataException;
+
+    long doGetSize(FileReference fileReference) throws HyracksDataException;
+
+    byte[] doReadAllBytes(FileReference fileReference) throws 
HyracksDataException;
+
+    void doDelete(FileReference fileReference) throws HyracksDataException;
+
+    void doOverwrite(FileReference fileReference, byte[] bytes) throws 
HyracksDataException;
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
new file mode 100644
index 0000000000..3a4ff8ab09
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+public interface ILazyAccessorReplacer {
+    void replace();
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
new file mode 100644
index 0000000000..b93da54bfe
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.util.Collections;
+
+import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+/**
+ * Initial accessor to allow {@link 
org.apache.asterix.common.transactions.IGlobalTransactionContext} to work before
+ * initializing the NC's partitions
+ */
+public class InitialCloudAccessor extends ReplaceableCloudAccessor {
+    private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
+    };
+
+    public InitialCloudAccessor(ICloudClient cloudClient, String bucket, 
IOManager localIoManager,
+            WriteBufferProvider writeBufferProvider) {
+        super(cloudClient, bucket, localIoManager, Collections.emptySet(), 0, 
writeBufferProvider, NO_OP_REPLACER);
+    }
+
+    @Override
+    protected void decrementNumberOfUncachedFiles() {
+        // No Op
+    }
+
+    @Override
+    protected void decrementNumberOfUncachedFiles(int count) {
+        // No Op
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
new file mode 100644
index 0000000000..6c480ec777
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+/**
+ * LocalAccessor would be used once everything in the cloud storage is cached 
locally
+ */
+public class LocalAccessor extends AbstractLazyAccessor {
+
+    public LocalAccessor(ICloudClient cloudClient, String bucket, IOManager 
localIoManager) {
+        super(cloudClient, bucket, localIoManager);
+    }
+
+    @Override
+    public boolean isLocalAccessor() {
+        return true;
+    }
+
+    @Override
+    public void doOnOpen(CloudFileHandle fileHandle, 
IIOManager.FileReadWriteMode rwMode,
+            IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+        // NoOp
+    }
+
+    @Override
+    public Set<FileReference> doList(FileReference dir, FilenameFilter filter) 
throws HyracksDataException {
+        return localIoManager.list(dir, filter);
+    }
+
+    @Override
+    public boolean doExists(FileReference fileRef) throws HyracksDataException 
{
+        return localIoManager.exists(fileRef);
+    }
+
+    @Override
+    public long doGetSize(FileReference fileReference) throws 
HyracksDataException {
+        return localIoManager.getSize(fileReference);
+    }
+
+    @Override
+    public byte[] doReadAllBytes(FileReference fileReference) throws 
HyracksDataException {
+        return localIoManager.readAllBytes(fileReference);
+    }
+
+    @Override
+    public void doDelete(FileReference fileReference) throws 
HyracksDataException {
+        // Never delete the storage dir in cloud storage
+        doCloudDelete(fileReference);
+        localIoManager.delete(fileReference);
+    }
+
+    @Override
+    public void doOverwrite(FileReference fileReference, byte[] bytes) throws 
HyracksDataException {
+        cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+        localIoManager.overwrite(fileReference, bytes);
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
new file mode 100644
index 0000000000..e36ac66cfd
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy.accessor;
+
+import java.io.FilenameFilter;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.cloud.CloudFileHandle;
+import org.apache.asterix.cloud.WriteBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * ReplaceableCloudAccessor will be used when some (or all) of the files in 
the cloud storage are not cached locally.
+ * It will be replaced by {@link LocalAccessor} once everything is cached
+ */
+public class ReplaceableCloudAccessor extends AbstractLazyAccessor {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final Set<Integer> partitions;
+    private final AtomicInteger numberOfUncachedFiles;
+    private final WriteBufferProvider writeBufferProvider;
+    private final ILazyAccessorReplacer replacer;
+
+    public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, 
IOManager localIoManager,
+            Set<Integer> partitions, int numberOfUncachedFiles, 
WriteBufferProvider writeBufferProvider,
+            ILazyAccessorReplacer replacer) {
+        super(cloudClient, bucket, localIoManager);
+        this.partitions = partitions;
+        this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
+        this.writeBufferProvider = writeBufferProvider;
+        this.replacer = replacer;
+    }
+
+    @Override
+    public boolean isLocalAccessor() {
+        return false;
+    }
+
+    @Override
+    public void doOnOpen(CloudFileHandle fileHandle, 
IIOManager.FileReadWriteMode rwMode,
+            IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+        FileReference fileRef = fileHandle.getFileReference();
+        if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, 
fileRef.getRelativePath())) {
+            // File doesn't exist locally, download it.
+            ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
+            try {
+                // TODO download for all partitions at once
+                LOGGER.info("Downloading {} from S3..", 
fileRef.getRelativePath());
+                CloudFileUtil.downloadFile(localIoManager, cloudClient, 
bucket, fileHandle, rwMode, syncMode,
+                        writeBuffer);
+                localIoManager.close(fileHandle);
+                LOGGER.info("Finished downloading {} from S3..", 
fileRef.getRelativePath());
+            } finally {
+                writeBufferProvider.recycle(writeBuffer);
+            }
+            // TODO decrement by the number of downloaded files in all 
partitions (once the above TODO is fixed)
+            decrementNumberOfUncachedFiles();
+        }
+    }
+
+    @Override
+    public Set<FileReference> doList(FileReference dir, FilenameFilter filter) 
throws HyracksDataException {
+        Set<String> cloudFiles = cloudClient.listObjects(bucket, 
dir.getRelativePath(), filter);
+        if (cloudFiles.isEmpty()) {
+            return Collections.emptySet();
+        }
+
+        // First get the set of local files
+        Set<FileReference> localFiles = localIoManager.list(dir, filter);
+
+        // Reconcile local files and cloud files
+        for (FileReference file : localFiles) {
+            String path = file.getRelativePath();
+            if (!cloudFiles.contains(path)) {
+                throw new IllegalStateException("Local file is not clean");
+            } else {
+                // No need to re-add it in the following loop
+                cloudFiles.remove(path);
+            }
+        }
+
+        // Add the remaining files that are not stored locally in their 
designated partitions (if any)
+        for (String cloudFile : cloudFiles) {
+            FileReference localFile = localIoManager.resolve(cloudFile);
+            if (isInNodePartition(cloudFile) && 
dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
+                localFiles.add(localFile);
+            }
+        }
+        return localFiles;
+    }
+
+    @Override
+    public boolean doExists(FileReference fileRef) throws HyracksDataException 
{
+        return localIoManager.exists(fileRef) || cloudClient.exists(bucket, 
fileRef.getRelativePath());
+    }
+
+    @Override
+    public long doGetSize(FileReference fileReference) throws 
HyracksDataException {
+        if (localIoManager.exists(fileReference)) {
+            return localIoManager.getSize(fileReference);
+        }
+        return cloudClient.getObjectSize(bucket, 
fileReference.getRelativePath());
+    }
+
+    @Override
+    public byte[] doReadAllBytes(FileReference fileRef) throws 
HyracksDataException {
+        if (!localIoManager.exists(fileRef) && 
isInNodePartition(fileRef.getRelativePath())) {
+            byte[] bytes = cloudClient.readAllBytes(bucket, 
fileRef.getRelativePath());
+            if (bytes != null && !partitions.isEmpty()) {
+                // Download the missing file for subsequent reads
+                localIoManager.overwrite(fileRef, bytes);
+                decrementNumberOfUncachedFiles();
+            }
+            return bytes;
+        }
+        return localIoManager.readAllBytes(fileRef);
+    }
+
+    @Override
+    public void doDelete(FileReference fileReference) throws 
HyracksDataException {
+        // Never delete the storage dir in cloud storage
+        int numberOfCloudDeletes = doCloudDelete(fileReference);
+        // check local
+        if (numberOfCloudDeletes > 0 && localIoManager.exists(fileReference)) {
+            int numberOfLocalDeletes = fileReference.getFile().isFile() ? 1 : 
localIoManager.list(fileReference).size();
+            // Decrement by number of cloud deletes that have no counterparts 
locally
+            decrementNumberOfUncachedFiles(numberOfCloudDeletes - 
numberOfLocalDeletes);
+        }
+
+        // Finally, delete locally
+        localIoManager.delete(fileReference);
+    }
+
+    @Override
+    public void doOverwrite(FileReference fileReference, byte[] bytes) throws 
HyracksDataException {
+        boolean existsLocally = localIoManager.exists(fileReference);
+        cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+        localIoManager.overwrite(fileReference, bytes);
+
+        if (!existsLocally) {
+            decrementNumberOfUncachedFiles();
+        }
+    }
+
+    protected void decrementNumberOfUncachedFiles() {
+        replaceAccessor(numberOfUncachedFiles.decrementAndGet());
+    }
+
+    protected void decrementNumberOfUncachedFiles(int count) {
+        if (count > 0) {
+            replaceAccessor(numberOfUncachedFiles.addAndGet(-count));
+        }
+    }
+
+    private boolean isInNodePartition(String path) {
+        return 
partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
+    }
+
+    void replaceAccessor(int remainingUncached) {
+        if (remainingUncached > 0) {
+            // Some files still not cached yet
+            return;
+        }
+
+        if (remainingUncached < 0) {
+            // This should not happen, log in case that happen
+            LOGGER.warn("Some files were downloaded multiple times. Reported 
remaining uncached files = {}",
+                    remainingUncached);
+        }
+        replacer.replace();
+    }
+}


Reply via email to