This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c5d0a8128 [ASTERIXDB-3393][STO] Refactor Cloud writer
7c5d0a8128 is described below

commit 7c5d0a8128353c00dc26d5c805eef67c8f060d7c
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon May 6 12:44:15 2024 -0700

    [ASTERIXDB-3393][STO] Refactor Cloud writer
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Introduce ICloudWriter, which abstracts the write
    operations, in preparation for other cloud providers.
    
    Change-Id: I52124696b50d6dcc1f3c65b7e0fe251df1579ac5
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18259
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Ian Maxon <[email protected]>
---
 .../asterix/cloud/AbstractCloudIOManager.java      |  31 ++---
 .../org/apache/asterix/cloud/CloudFileHandle.java  |  21 +--
 .../apache/asterix/cloud/CloudOutputStream.java    |  16 ++-
 .../asterix/cloud/CloudResettableInputStream.java  |  80 ++++++++----
 .../apache/asterix/cloud/EagerCloudIOManager.java  |   2 +-
 .../apache/asterix/cloud/LazyCloudIOManager.java   |   5 +-
 .../apache/asterix/cloud/WriteBufferProvider.java  |  12 +-
 .../asterix/cloud/WriterSingleBufferProvider.java  |   9 +-
 .../asterix/cloud/clients/CloudClientProvider.java |   6 +-
 .../apache/asterix/cloud/clients/ICloudClient.java |  14 +-
 .../apache/asterix/cloud/clients/ICloudWriter.java |  74 +++++++++++
 .../cloud/clients/aws/s3/S3ClientConfig.java       |   2 +
 .../cloud/clients/aws/s3/S3CloudClient.java        |  15 ++-
 .../cloud/clients/google/gcs/GCSClientConfig.java  |   2 +
 .../cloud/clients/google/gcs/GCSCloudClient.java   |  31 +++--
 .../gcs/{GCSBufferedWriter.java => GCSWriter.java} |  42 +++---
 .../asterix/cloud/lazy/accessor/ILazyAccessor.java |   4 +-
 .../asterix/cloud/lazy/accessor/LocalAccessor.java |   4 +-
 .../lazy/accessor/ReplaceableCloudAccessor.java    |   4 +-
 .../writer/AbstractCloudExternalFileWriter.java    |  17 +--
 ...=> AbstractCloudExternalFileWriterFactory.java} |  96 +++++---------
 .../cloud/writer/GCSExternalFileWriterFactory.java | 138 +++-----------------
 .../cloud/writer/S3ExternalFileWriterFactory.java  | 143 +++------------------
 .../cloud/{LSMTest.java => AbstractLSMTest.java}   |  23 ++--
 .../org/apache/asterix/cloud/gcs/LSMGCSTest.java   |   4 +-
 .../org/apache/asterix/cloud/s3/LSMS3Test.java     |   4 +-
 26 files changed, 329 insertions(+), 470 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 368be26316..7ab8a5a99d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -34,6 +34,7 @@ import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
 import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
 import org.apache.asterix.cloud.clients.CloudClientProvider;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.util.CloudFileUtil;
 import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
@@ -71,7 +72,7 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
         this.bucket = cloudProperties.getStorageBucket();
         cloudClient = CloudClientProvider.getClient(cloudProperties);
         int numOfThreads = getIODevices().size() * getIOParallelism();
-        writeBufferProvider = new WriteBufferProvider(numOfThreads);
+        writeBufferProvider = new WriteBufferProvider(numOfThreads, 
cloudClient.getWriteBufferSize());
         partitions = new HashSet<>();
         partitionPaths = new ArrayList<>();
         this.localIoManager = ioManager;
@@ -165,8 +166,9 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     @Override
     public final IFileHandle open(FileReference fileRef, FileReadWriteMode 
rwMode, FileSyncMode syncMode)
             throws HyracksDataException {
-        CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, 
fileRef, writeBufferProvider);
-        onOpen(fHandle, rwMode, syncMode);
+        ICloudWriter cloudWriter = cloudClient.createdWriter(bucket, 
fileRef.getRelativePath(), writeBufferProvider);
+        CloudFileHandle fHandle = new CloudFileHandle(fileRef, cloudWriter);
+        onOpen(fHandle);
         try {
             fHandle.open(rwMode, syncMode);
         } catch (IOException e) {
@@ -180,18 +182,17 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
      *
      * @param fileHandle file to open
      */
-    protected abstract void onOpen(CloudFileHandle fileHandle, 
FileReadWriteMode rwMode, FileSyncMode syncMode)
-            throws HyracksDataException;
+    protected abstract void onOpen(CloudFileHandle fileHandle) throws 
HyracksDataException;
 
     @Override
     public final long doSyncWrite(IFileHandle fHandle, long offset, 
ByteBuffer[] dataArray)
             throws HyracksDataException {
         long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, 
dataArray);
-        CloudResettableInputStream inputStream = ((CloudFileHandle) 
fHandle).getInputStream();
+        ICloudWriter cloudWriter = ((CloudFileHandle) 
fHandle).getCloudWriter();
         try {
-            inputStream.write(dataArray[0], dataArray[1]);
+            cloudWriter.write(dataArray[0], dataArray[1]);
         } catch (HyracksDataException e) {
-            inputStream.abort();
+            cloudWriter.abort();
             throw e;
         }
         return writtenBytes;
@@ -200,11 +201,11 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     @Override
     public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer 
dataArray) throws HyracksDataException {
         int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, 
dataArray);
-        CloudResettableInputStream inputStream = ((CloudFileHandle) 
fHandle).getInputStream();
+        ICloudWriter cloudWriter = ((CloudFileHandle) 
fHandle).getCloudWriter();
         try {
-            inputStream.write(dataArray);
+            cloudWriter.write(dataArray);
         } catch (HyracksDataException e) {
-            inputStream.abort();
+            cloudWriter.abort();
             throw e;
         }
         return writtenBytes;
@@ -231,16 +232,16 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
         if (metadata) {
             // only finish writing if metadata == true to prevent write 
limiter from finishing the stream and
             // completing the upload.
-            CloudResettableInputStream stream = ((CloudFileHandle) 
fileHandle).getInputStream();
+            ICloudWriter cloudWriter = ((CloudFileHandle) 
fileHandle).getCloudWriter();
             try {
-                stream.finish();
+                cloudWriter.finish();
             } catch (HyracksDataException e) {
                 savedEx = e;
             }
 
             if (savedEx != null) {
                 try {
-                    stream.abort();
+                    cloudWriter.abort();
                 } catch (HyracksDataException e) {
                     savedEx.addSuppressed(e);
                 }
@@ -286,7 +287,7 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     /**
      * Writes the bytes to the specified key in the bucket
      *
-     * @param key the key where the bytes will be written
+     * @param key   the key where the bytes will be written
      * @param bytes the bytes to write
      */
     public final void put(String key, byte[] bytes) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 14c44ad5ad..0ae93cfca0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -20,20 +20,17 @@ package org.apache.asterix.cloud;
 
 import java.io.IOException;
 
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
-import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.control.nc.io.FileHandle;
 
 public class CloudFileHandle extends FileHandle {
-    private final CloudResettableInputStream inputStream;
+    private final ICloudWriter cloudWriter;
 
-    public CloudFileHandle(ICloudClient cloudClient, String bucket, 
FileReference fileRef,
-            IWriteBufferProvider bufferProvider) {
+    public CloudFileHandle(FileReference fileRef, ICloudWriter cloudWriter) {
         super(fileRef);
-        ICloudBufferedWriter bufferedWriter = 
cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath());
-        inputStream = new CloudResettableInputStream(bufferedWriter, 
bufferProvider);
+        this.cloudWriter = cloudWriter;
     }
 
     @Override
@@ -43,13 +40,7 @@ public class CloudFileHandle extends FileHandle {
         }
     }
 
-    @Override
-    public synchronized void close() throws IOException {
-        inputStream.close();
-        super.close();
-    }
-
-    public CloudResettableInputStream getInputStream() {
-        return inputStream;
+    public ICloudWriter getCloudWriter() {
+        return cloudWriter;
     }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
index 349b1b1baf..bea91fb60e 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
@@ -21,29 +21,31 @@ package org.apache.asterix.cloud;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.asterix.cloud.clients.ICloudWriter;
+
 public final class CloudOutputStream extends OutputStream {
-    private final CloudResettableInputStream inputStream;
+    private final ICloudWriter cloudWriter;
 
-    public CloudOutputStream(CloudResettableInputStream inputStream) {
-        this.inputStream = inputStream;
+    public CloudOutputStream(ICloudWriter cloudWriter) {
+        this.cloudWriter = cloudWriter;
     }
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-        inputStream.write(b, off, len);
+        cloudWriter.write(b, off, len);
     }
 
     @Override
     public void write(int b) throws IOException {
-        inputStream.write(b);
+        cloudWriter.write(b);
     }
 
     @Override
     public void close() throws IOException {
-        inputStream.finish();
+        cloudWriter.finish();
     }
 
     public void abort() throws IOException {
-        inputStream.abort();
+        cloudWriter.abort();
     }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 053318468a..885d612727 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -23,14 +23,13 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public class CloudResettableInputStream extends InputStream {
+public class CloudResettableInputStream extends InputStream implements 
ICloudWriter {
     private static final Logger LOGGER = LogManager.getLogger();
-    // TODO: make configurable
-    public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024;
     private final IWriteBufferProvider bufferProvider;
     private ByteBuffer writeBuffer;
 
@@ -41,12 +40,10 @@ public class CloudResettableInputStream extends InputStream 
{
         this.bufferProvider = bufferProvider;
     }
 
-    private void open() {
-        if (writeBuffer == null) {
-            writeBuffer = bufferProvider.getBuffer();
-            writeBuffer.clear();
-        }
-    }
+    /* ************************************************************
+     * InputStream methods
+     * ************************************************************
+     */
 
     @Override
     public void reset() {
@@ -63,16 +60,23 @@ public class CloudResettableInputStream extends InputStream 
{
         writeBuffer.mark();
     }
 
-    public void write(ByteBuffer header, ByteBuffer page) throws 
HyracksDataException {
-        write(header);
-        write(page);
+    /* ************************************************************
+     * ICloudWriter methods
+     * ************************************************************
+     */
+
+    @Override
+    public int write(ByteBuffer header, ByteBuffer page) throws 
HyracksDataException {
+        return write(header) + write(page);
     }
 
+    @Override
     public int write(ByteBuffer page) throws HyracksDataException {
         open();
         return write(page.array(), 0, page.limit());
     }
 
+    @Override
     public void write(int b) throws HyracksDataException {
         if (writeBuffer.remaining() == 0) {
             uploadAndWait();
@@ -80,6 +84,7 @@ public class CloudResettableInputStream extends InputStream {
         writeBuffer.put((byte) b);
     }
 
+    @Override
     public int write(byte[] b, int off, int len) throws HyracksDataException {
         open();
 
@@ -108,6 +113,23 @@ public class CloudResettableInputStream extends 
InputStream {
         return len;
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (writeBuffer.remaining() == 0) {
+            return -1;
+        }
+
+        int length = Math.min(len, writeBuffer.remaining());
+        writeBuffer.get(b, off, length);
+        return length;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return writeBuffer.get();
+    }
+
+    @Override
     public void finish() throws HyracksDataException {
         open();
         try {
@@ -124,14 +146,32 @@ public class CloudResettableInputStream extends 
InputStream {
         } finally {
             returnBuffer();
         }
+        doClose();
     }
 
+    @Override
     public void abort() throws HyracksDataException {
         try {
             bufferedWriter.abort();
         } finally {
             returnBuffer();
         }
+        doClose();
+    }
+
+    private void open() {
+        if (writeBuffer == null) {
+            writeBuffer = bufferProvider.getBuffer();
+            writeBuffer.clear();
+        }
+    }
+
+    private void doClose() throws HyracksDataException {
+        try {
+            close();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
     }
 
     private void uploadAndWait() throws HyracksDataException {
@@ -146,22 +186,6 @@ public class CloudResettableInputStream extends 
InputStream {
         writeBuffer.clear();
     }
 
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (writeBuffer.remaining() == 0) {
-            return -1;
-        }
-
-        int length = Math.min(len, writeBuffer.remaining());
-        writeBuffer.get(b, off, length);
-        return length;
-    }
-
-    @Override
-    public int read() throws IOException {
-        return writeBuffer.get();
-    }
-
     private void returnBuffer() {
         if (writeBuffer != null) {
             bufferProvider.recycle(writeBuffer);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index d0b982c834..0b4200cef0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -65,7 +65,7 @@ final class EagerCloudIOManager extends 
AbstractCloudIOManager {
     }
 
     @Override
-    protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode 
rwMode, FileSyncMode syncMode) {
+    protected void onOpen(CloudFileHandle fileHandle) {
         // NoOp
     }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 6ecd201425..cb47d00a88 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -136,9 +136,8 @@ final class LazyCloudIOManager extends 
AbstractCloudIOManager {
     }
 
     @Override
-    protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode 
rwMode, FileSyncMode syncMode)
-            throws HyracksDataException {
-        accessor.doOnOpen(fileHandle, rwMode, syncMode);
+    protected void onOpen(CloudFileHandle fileHandle) throws 
HyracksDataException {
+        accessor.doOnOpen(fileHandle);
     }
 
     /*
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
index ee174000e6..d1eb1eef37 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.cloud;
 
-import static 
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
-
 import java.nio.ByteBuffer;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -27,11 +25,13 @@ import java.util.concurrent.BlockingQueue;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 @ThreadSafe
-public class WriteBufferProvider implements IWriteBufferProvider {
+public final class WriteBufferProvider implements IWriteBufferProvider {
+    private final int bufferSize;
     private final BlockingQueue<ByteBuffer> writeBuffers;
 
-    public WriteBufferProvider(int ioParallelism) {
-        writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
+    public WriteBufferProvider(int numberOfBuffers, int bufferSize) {
+        this.bufferSize = bufferSize;
+        writeBuffers = new ArrayBlockingQueue<>(numberOfBuffers);
     }
 
     @Override
@@ -43,7 +43,7 @@ public class WriteBufferProvider implements 
IWriteBufferProvider {
     public ByteBuffer getBuffer() {
         ByteBuffer writeBuffer = writeBuffers.poll();
         if (writeBuffer == null) {
-            return ByteBuffer.allocate(MIN_BUFFER_SIZE);
+            return ByteBuffer.allocate(bufferSize);
         }
         return writeBuffer;
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
index 287900d907..fab1cc2e57 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
@@ -18,19 +18,16 @@
  */
 package org.apache.asterix.cloud;
 
-import static 
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
-
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.util.annotations.NotThreadSafe;
 
 @NotThreadSafe
-public class WriterSingleBufferProvider implements IWriteBufferProvider {
-
+public final class WriterSingleBufferProvider implements IWriteBufferProvider {
     private final ByteBuffer buffer;
 
-    public WriterSingleBufferProvider() {
-        buffer = ByteBuffer.allocate(MIN_BUFFER_SIZE);
+    public WriterSingleBufferProvider(int size) {
+        buffer = ByteBuffer.allocate(size);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index cc511c7236..35ab467107 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -26,6 +26,8 @@ import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class CloudClientProvider {
+    private static final String S3 = "s3";
+    private static final String GCS = "gcs";
 
     private CloudClientProvider() {
         throw new AssertionError("do not instantiate");
@@ -33,10 +35,10 @@ public class CloudClientProvider {
 
     public static ICloudClient getClient(CloudProperties cloudProperties) 
throws HyracksDataException {
         String storageScheme = cloudProperties.getStorageScheme();
-        if ("s3".equalsIgnoreCase(storageScheme)) {
+        if (S3.equalsIgnoreCase(storageScheme)) {
             S3ClientConfig config = S3ClientConfig.of(cloudProperties);
             return new S3CloudClient(config);
-        } else if ("gcs".equalsIgnoreCase(storageScheme)) {
+        } else if (GCS.equalsIgnoreCase(storageScheme)) {
             GCSClientConfig config = GCSClientConfig.of(cloudProperties);
             return new GCSCloudClient(config);
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 2bd08028ca..0307846cac 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Set;
 
+import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -35,15 +36,20 @@ import com.fasterxml.jackson.databind.ObjectMapper;
  * Interface containing methods to perform IO operation on the Cloud Storage
  */
 public interface ICloudClient {
+    /**
+     * @return write buffer size
+     */
+    int getWriteBufferSize();
 
     /**
      * Creates a cloud buffered writer
      *
-     * @param bucket bucket to write to
-     * @param path   path to write to
-     * @return buffered writer
+     * @param bucket         bucket to write to
+     * @param path           path to write to
+     * @param bufferProvider buffer provider
+     * @return cloud writer
      */
-    ICloudBufferedWriter createBufferedWriter(String bucket, String path);
+    ICloudWriter createdWriter(String bucket, String path, 
IWriteBufferProvider bufferProvider);
 
     /**
      * Lists objects at the specified bucket and path, and applies the file 
name filter on the returned objects
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
new file mode 100644
index 0000000000..15822c4d9c
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A cloud-based writer that write bytes sequentially in a cloud blob storage
+ */
+public interface ICloudWriter {
+    /**
+     * Write a header and a page
+     *
+     * @param header to write
+     * @param page   to write
+     * @return written bytes
+     */
+    int write(ByteBuffer header, ByteBuffer page) throws HyracksDataException;
+
+    /**
+     * Write a page
+     *
+     * @param page to write
+     * @return written bytes
+     */
+    int write(ByteBuffer page) throws HyracksDataException;
+
+    /**
+     * Write a byte
+     *
+     * @param b to write
+     */
+    void write(int b) throws HyracksDataException;
+
+    /**
+     * Write a byte array
+     *
+     * @param b   bytes to write
+     * @param off starting offset
+     * @param len length to write
+     * @return written bytes
+     */
+    int write(byte[] b, int off, int len) throws HyracksDataException;
+
+    /**
+     * Finish the write operation
+     * Note: this should be called upon successful write
+     */
+    void finish() throws HyracksDataException;
+
+    /**
+     * Abort the write operation
+     * Note: should be called instead of {@link #finish()} when the write 
operation encountered an error
+     */
+    void abort() throws HyracksDataException;
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index bc13078a47..161fb37fc5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -22,12 +22,14 @@ import java.util.Map;
 
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.hyracks.util.StorageUtil;
 
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
 public final class S3ClientConfig {
+    static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
     // The maximum number of file that can be deleted (AWS restriction)
     static final int DELETE_BATCH_SIZE = 1000;
     private final String region;
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 5cdf971c09..9c31e17000 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -35,8 +35,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
@@ -69,7 +72,7 @@ import 
software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 @ThreadSafe
-public class S3CloudClient implements ICloudClient {
+public final class S3CloudClient implements ICloudClient {
     private final S3ClientConfig config;
     private final S3Client s3Client;
     private final IRequestProfiler profiler;
@@ -90,8 +93,14 @@ public class S3CloudClient implements ICloudClient {
     }
 
     @Override
-    public ICloudBufferedWriter createBufferedWriter(String bucket, String 
path) {
-        return new S3BufferedWriter(s3Client, profiler, bucket, path);
+    public int getWriteBufferSize() {
+        return S3ClientConfig.WRITE_BUFFER_SIZE;
+    }
+
+    @Override
+    public ICloudWriter createdWriter(String bucket, String path, 
IWriteBufferProvider bufferProvider) {
+        ICloudBufferedWriter bufferedWriter = new S3BufferedWriter(s3Client, 
profiler, bucket, path);
+        return new CloudResettableInputStream(bufferedWriter, bufferProvider);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index e8e44805bc..4edb7a7175 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -25,12 +25,14 @@ import java.util.Map;
 
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.StorageUtil;
 
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.auth.oauth2.OAuth2Credentials;
 import com.google.cloud.NoCredentials;
 
 public class GCSClientConfig {
+    public static final int WRITE_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE);
     // The maximum number of files that can be deleted (GCS restriction): 
https://cloud.google.com/storage/quotas#json-requests
     static final int DELETE_BATCH_SIZE = 100;
     private final String region;
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 2b7303d382..c725ca5089 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -32,8 +32,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
@@ -60,7 +61,6 @@ import com.google.cloud.storage.StorageException;
 import com.google.cloud.storage.StorageOptions;
 
 public class GCSCloudClient implements ICloudClient {
-
     private final Storage gcsClient;
     private final GCSClientConfig config;
     private final IRequestProfiler profiler;
@@ -80,18 +80,14 @@ public class GCSCloudClient implements ICloudClient {
         this(config, buildClient(config));
     }
 
-    private static Storage buildClient(GCSClientConfig config) throws 
HyracksDataException {
-        StorageOptions.Builder builder = 
StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
-
-        if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
-            builder.setHost(config.getEndpoint());
-        }
-        return builder.build().getService();
+    @Override
+    public int getWriteBufferSize() {
+        return GCSClientConfig.WRITE_BUFFER_SIZE;
     }
 
     @Override
-    public ICloudBufferedWriter createBufferedWriter(String bucket, String 
path) {
-        return new GCSBufferedWriter(bucket, path, gcsClient, profiler);
+    public ICloudWriter createdWriter(String bucket, String path, 
IWriteBufferProvider bufferProvider) {
+        return new GCSWriter(bucket, path, gcsClient, profiler);
     }
 
     @Override
@@ -115,12 +111,10 @@ public class GCSCloudClient implements ICloudClient {
         BlobId blobId = BlobId.of(bucket, path);
         long readTo = offset + buffer.remaining();
         int totalRead = 0;
-        int read = 0;
         try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
             while (buffer.remaining() > 0) {
                 from.seek(offset + totalRead);
-                read = from.read(buffer);
-                totalRead += read;
+                totalRead += from.read(buffer);
             }
         } catch (IOException | StorageException ex) {
             throw HyracksDataException.create(ex);
@@ -248,4 +242,13 @@ public class GCSCloudClient implements ICloudClient {
             throw HyracksDataException.create(ex);
         }
     }
+
+    private static Storage buildClient(GCSClientConfig config) throws 
HyracksDataException {
+        StorageOptions.Builder builder = 
StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
+
+        if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+            builder.setHost(config.getEndpoint());
+        }
+        return builder.build().getService();
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
similarity index 74%
rename from 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
rename to 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 4f9d437f63..cccd9ec05b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -18,13 +18,12 @@
  */
 package org.apache.asterix.cloud.clients.google.gcs;
 
-import static 
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import static 
org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.WRITE_BUFFER_SIZE;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
@@ -35,17 +34,16 @@ import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
 
-public class GCSBufferedWriter implements ICloudBufferedWriter {
+public class GCSWriter implements ICloudWriter {
     private static final Logger LOGGER = LogManager.getLogger();
     private final String bucket;
     private final String path;
     private final IRequestProfiler profiler;
     private final Storage gcsClient;
     private boolean uploadStarted = false;
-    private int partNumber;
     private WriteChannel writer = null;
 
-    public GCSBufferedWriter(String bucket, String path, Storage gcsClient, 
IRequestProfiler profiler) {
+    public GCSWriter(String bucket, String path, Storage gcsClient, 
IRequestProfiler profiler) {
         this.bucket = bucket;
         this.path = path;
         this.profiler = profiler;
@@ -53,30 +51,39 @@ public class GCSBufferedWriter implements 
ICloudBufferedWriter {
     }
 
     @Override
-    public int upload(InputStream stream, int length) throws 
HyracksDataException {
+    public int write(ByteBuffer header, ByteBuffer page) throws 
HyracksDataException {
+        return write(header) + write(page);
+    }
+
+    @Override
+    public int write(ByteBuffer page) throws HyracksDataException {
         profiler.objectMultipartUpload();
         setUploadId();
+        int written = 0;
         try {
-            ByteBuffer buffer = ByteBuffer.wrap(stream.readNBytes(length));
-            while (buffer.hasRemaining()) {
-                writer.write(buffer);
+            while (page.hasRemaining()) {
+                written += writer.write(page);
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
-        return partNumber++;
+
+        return written;
+    }
+
+    @Override
+    public int write(byte[] b, int off, int len) throws HyracksDataException {
+        return write(ByteBuffer.wrap(b, off, len));
     }
 
     @Override
-    public boolean isEmpty() {
-        return !uploadStarted;
+    public void write(int b) throws HyracksDataException {
+        write(ByteBuffer.wrap(new byte[] { (byte) b }));
     }
 
     @Override
     public void finish() throws HyracksDataException {
-        if (!uploadStarted) {
-            throw new IllegalStateException("Cannot finish without writing any 
bytes");
-        }
+        setUploadId();
         profiler.objectMultipartUpload();
         try {
             writer.close();
@@ -99,9 +106,8 @@ public class GCSBufferedWriter implements 
ICloudBufferedWriter {
     private void setUploadId() {
         if (!uploadStarted) {
             uploadStarted = true;
-            partNumber = 1;
             writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, 
path)).build());
-            writer.setChunkSize(MIN_BUFFER_SIZE);
+            writer.setChunkSize(WRITE_BUFFER_SIZE);
             log("STARTED");
         }
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index 8f803a0792..534ff5ded5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -25,15 +25,13 @@ import org.apache.asterix.cloud.CloudFileHandle;
 import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
 
 public interface ILazyAccessor {
     boolean isLocalAccessor();
 
     IBulkOperationCallBack getBulkOperationCallBack();
 
-    void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode 
rwMode, IIOManager.FileSyncMode syncMode)
-            throws HyracksDataException;
+    void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException;
 
     Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws 
HyracksDataException;
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
index 378cf0365e..ae324020e0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -27,7 +27,6 @@ import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.control.nc.io.IOManager;
 
 /**
@@ -50,8 +49,7 @@ public class LocalAccessor extends AbstractLazyAccessor {
     }
 
     @Override
-    public void doOnOpen(CloudFileHandle fileHandle, 
IIOManager.FileReadWriteMode rwMode,
-            IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+    public void doOnOpen(CloudFileHandle fileHandle) throws 
HyracksDataException {
         // NoOp
     }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index 277d42563e..e4e168eb72 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -30,7 +30,6 @@ import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -70,8 +69,7 @@ public class ReplaceableCloudAccessor extends 
AbstractLazyAccessor {
     }
 
     @Override
-    public void doOnOpen(CloudFileHandle fileHandle, 
IIOManager.FileReadWriteMode rwMode,
-            IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+    public void doOnOpen(CloudFileHandle fileHandle) throws 
HyracksDataException {
         FileReference fileRef = fileHandle.getFileReference();
         if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, 
fileRef.getRelativePath())) {
             if (cacher.downloadData(fileRef)) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index bbae29a974..4277800797 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -21,11 +21,10 @@ package org.apache.asterix.cloud.writer;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import org.apache.asterix.cloud.CloudOutputStream;
-import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
@@ -46,7 +45,7 @@ abstract class AbstractCloudExternalFileWriter implements 
IExternalFileWriter {
     private final IWarningCollector warningCollector;
     private final SourceLocation pathSourceLocation;
     private final IWriteBufferProvider bufferProvider;
-    private ICloudBufferedWriter bufferedWriter;
+    private ICloudWriter cloudWriter;
 
     AbstractCloudExternalFileWriter(IExternalPrinter printer, ICloudClient 
cloudClient, String bucket,
             boolean partitionedPath, IWarningCollector warningCollector, 
SourceLocation pathSourceLocation) {
@@ -56,7 +55,7 @@ abstract class AbstractCloudExternalFileWriter implements 
IExternalFileWriter {
         this.partitionedPath = partitionedPath;
         this.warningCollector = warningCollector;
         this.pathSourceLocation = pathSourceLocation;
-        bufferProvider = new WriterSingleBufferProvider();
+        bufferProvider = new 
WriterSingleBufferProvider(cloudClient.getWriteBufferSize());
     }
 
     @Override
@@ -82,10 +81,8 @@ abstract class AbstractCloudExternalFileWriter implements 
IExternalFileWriter {
             return false;
         }
 
-        bufferedWriter = cloudClient.createBufferedWriter(bucket, fullPath);
-        CloudResettableInputStream inputStream = new 
CloudResettableInputStream(bufferedWriter, bufferProvider);
-
-        CloudOutputStream outputStream = new CloudOutputStream(inputStream);
+        cloudWriter = cloudClient.createdWriter(bucket, fullPath, 
bufferProvider);
+        CloudOutputStream outputStream = new CloudOutputStream(cloudWriter);
         printer.newStream(outputStream);
 
         return true;
@@ -108,8 +105,8 @@ abstract class AbstractCloudExternalFileWriter implements 
IExternalFileWriter {
     @Override
     public final void abort() throws HyracksDataException {
         try {
-            if (bufferedWriter != null) {
-                bufferedWriter.abort();
+            if (cloudWriter != null) {
+                cloudWriter.abort();
             }
             printer.close();
         } catch (HyracksDataException e) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
similarity index 60%
copy from 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
copy to 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index cdaa6dcf80..75c4ec5f61 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -26,80 +26,49 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
-import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalPrinter;
-import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
-
-public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFactory {
-    private static final long serialVersionUID = 4551318140901866805L;
+abstract class AbstractCloudExternalFileWriterFactory implements 
IExternalFileWriterFactory {
+    private static final long serialVersionUID = -6204498482419719403L;
     private static final Logger LOGGER = LogManager.getLogger();
-    static final char SEPARATOR = '/';
-    public static final IExternalFileWriterFactoryProvider PROVIDER = new 
IExternalFileWriterFactoryProvider() {
-        @Override
-        public IExternalFileWriterFactory 
create(ExternalFileWriterConfiguration configuration) {
-            return new S3ExternalFileWriterFactory(configuration);
-        }
 
-        @Override
-        public char getSeparator() {
-            return SEPARATOR;
-        }
-    };
-    private final Map<String, String> configuration;
-    private final SourceLocation pathSourceLocation;
-    private final String staticPath;
-    private transient S3CloudClient cloudClient;
+    protected final Map<String, String> configuration;
+    protected final SourceLocation pathSourceLocation;
+    protected final String staticPath;
+    protected transient ICloudClient cloudClient;
 
-    private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
+    AbstractCloudExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
         configuration = externalConfig.getConfiguration();
         pathSourceLocation = externalConfig.getPathSourceLocation();
         staticPath = externalConfig.getStaticPath();
-        cloudClient = null;
     }
 
-    @Override
-    public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
-            throws HyracksDataException {
-        buildClient();
-        String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        IExternalPrinter printer = printerFactory.createPrinter();
-        IWarningCollector warningCollector = context.getWarningCollector();
-        return new S3ExternalFileWriter(printer, cloudClient, bucket, 
staticPath == null, warningCollector,
-                pathSourceLocation);
-    }
+    abstract ICloudClient createCloudClient() throws CompilationException;
 
-    private void buildClient() throws HyracksDataException {
+    abstract boolean isNoContainerFoundException(IOException e);
+
+    abstract boolean isSdkException(Throwable e);
+
+    final void buildClient() throws HyracksDataException {
         try {
             synchronized (this) {
                 if (cloudClient == null) {
-                    // only a single client should be built
-                    S3ClientConfig config = S3ClientConfig.of(configuration);
-                    cloudClient = new S3CloudClient(config, 
S3Utils.buildAwsS3Client(configuration));
+                    cloudClient = createCloudClient();
                 }
             }
         } catch (CompilationException e) {
@@ -108,14 +77,8 @@ public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFac
     }
 
     @Override
-    public char getSeparator() {
-        return SEPARATOR;
-    }
-
-    @Override
-    public void validate() throws AlgebricksException {
-        S3ClientConfig config = S3ClientConfig.of(configuration);
-        ICloudClient testClient = new S3CloudClient(config, 
S3Utils.buildAwsS3Client(configuration));
+    public final void validate() throws AlgebricksException {
+        ICloudClient testClient = createCloudClient();
         String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 
         if (bucket == null || bucket.isEmpty()) {
@@ -126,14 +89,17 @@ public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFac
         try {
             doValidate(testClient, bucket);
         } catch (IOException e) {
-            if (e.getCause() instanceof NoSuchBucketException) {
+            if (isNoContainerFoundException(e)) {
                 throw 
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, 
bucket);
             } else {
                 throw 
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
                         ExceptionUtils.getMessageOrToString(e));
             }
-        } catch (SdkException e) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
e, getMessageOrToString(e));
+        } catch (Throwable e) {
+            if (isSdkException(e)) {
+                throw 
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, e, 
getMessageOrToString(e));
+            }
+            throw e;
         }
     }
 
@@ -168,19 +134,17 @@ public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFac
         long writeValue = random.nextLong();
         byte[] data = new byte[Long.BYTES];
         LongPointable.setLong(data, 0, writeValue);
-        ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, 
path);
-        CloudResettableInputStream stream = null;
+        IWriteBufferProvider bufferProvider = new 
WriterSingleBufferProvider(testClient.getWriteBufferSize());
+        ICloudWriter writer = testClient.createdWriter(bucket, path, 
bufferProvider);
         boolean aborted = false;
         try {
-            stream = new CloudResettableInputStream(writer, new 
WriterSingleBufferProvider());
-            stream.write(data, 0, data.length);
+            writer.write(data, 0, data.length);
         } catch (HyracksDataException e) {
-            stream.abort();
+            writer.abort();
             aborted = true;
         } finally {
-            if (stream != null && !aborted) {
-                stream.finish();
-                stream.close();
+            if (writer != null && !aborted) {
+                writer.finish();
             }
         }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index ca93ff6fa4..32458695cf 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -18,22 +18,12 @@
  */
 package org.apache.asterix.cloud.writer;
 
-import static 
org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Random;
 
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
 import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -42,22 +32,15 @@ import 
org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
 import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import com.google.cloud.BaseServiceException;
 import com.google.cloud.storage.StorageException;
 
-public final class GCSExternalFileWriterFactory implements 
IExternalFileWriterFactory {
+public final class GCSExternalFileWriterFactory extends 
AbstractCloudExternalFileWriterFactory {
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
     static final char SEPARATOR = '/';
     public static final IExternalFileWriterFactoryProvider PROVIDER = new 
IExternalFileWriterFactoryProvider() {
         @Override
@@ -70,18 +53,28 @@ public final class GCSExternalFileWriterFactory implements 
IExternalFileWriterFa
             return SEPARATOR;
         }
     };
-    private final Map<String, String> configuration;
-    private final SourceLocation pathSourceLocation;
-    private final String staticPath;
-    private transient GCSCloudClient cloudClient;
 
     private GCSExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
-        configuration = externalConfig.getConfiguration();
-        pathSourceLocation = externalConfig.getPathSourceLocation();
-        staticPath = externalConfig.getStaticPath();
+        super(externalConfig);
         cloudClient = null;
     }
 
+    @Override
+    ICloudClient createCloudClient() throws CompilationException {
+        GCSClientConfig config = GCSClientConfig.of(configuration);
+        return new GCSCloudClient(config, GCSUtils.buildClient(configuration));
+    }
+
+    @Override
+    boolean isNoContainerFoundException(IOException e) {
+        return e.getCause() instanceof StorageException;
+    }
+
+    @Override
+    boolean isSdkException(Throwable e) {
+        return e instanceof BaseServiceException;
+    }
+
     @Override
     public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
@@ -93,103 +86,8 @@ public final class GCSExternalFileWriterFactory implements 
IExternalFileWriterFa
                 pathSourceLocation);
     }
 
-    private void buildClient() throws HyracksDataException {
-        try {
-            synchronized (this) {
-                if (cloudClient == null) {
-                    GCSClientConfig config = GCSClientConfig.of(configuration);
-                    cloudClient = new GCSCloudClient(config, 
GCSUtils.buildClient(configuration));
-                }
-            }
-        } catch (CompilationException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
     @Override
     public char getSeparator() {
         return SEPARATOR;
     }
-
-    @Override
-    public void validate() throws AlgebricksException {
-        GCSClientConfig config = GCSClientConfig.of(configuration);
-        ICloudClient testClient = new GCSCloudClient(config, 
GCSUtils.buildClient(configuration));
-        String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
-        if (bucket == null || bucket.isEmpty()) {
-            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
-                    ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        }
-
-        try {
-            doValidate(testClient, bucket);
-        } catch (IOException e) {
-            if (e.getCause() instanceof StorageException) {
-                throw 
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, 
bucket);
-            } else {
-                throw 
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
-                        ExceptionUtils.getMessageOrToString(e));
-            }
-        } catch (BaseServiceException e) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
e, getMessageOrToString(e));
-        }
-    }
-
-    private void doValidate(ICloudClient testClient, String bucket) throws 
IOException, AlgebricksException {
-        if (staticPath != null) {
-            if (isExceedingMaxLength(staticPath, 
GCSExternalFileWriter.MAX_LENGTH_IN_BYTES)) {
-                throw new 
CompilationException(ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH, 
pathSourceLocation,
-                        staticPath, GCSExternalFileWriter.MAX_LENGTH_IN_BYTES,
-                        ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
-            }
-
-            if (!testClient.isEmptyPrefix(bucket, staticPath)) {
-                throw new 
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, 
staticPath);
-            }
-        }
-
-        String validateWritePermissions = configuration
-                
.getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION, 
Boolean.TRUE.toString());
-        if (!Boolean.parseBoolean(validateWritePermissions)) {
-            return;
-        }
-
-        Random random = new Random();
-        String pathPrefix = "testFile";
-        String path = pathPrefix + random.nextInt();
-        while (testClient.exists(bucket, path)) {
-            path = pathPrefix + random.nextInt();
-        }
-
-        long writeValue = random.nextLong();
-        byte[] data = new byte[Long.BYTES];
-        LongPointable.setLong(data, 0, writeValue);
-        ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, 
path);
-        CloudResettableInputStream stream = null;
-        boolean aborted = false;
-        try {
-            stream = new CloudResettableInputStream(writer, new 
WriterSingleBufferProvider());
-            stream.write(data, 0, data.length);
-        } catch (HyracksDataException e) {
-            stream.abort();
-            aborted = true;
-        } finally {
-            if (stream != null && !aborted) {
-                stream.finish();
-                stream.close();
-            }
-        }
-
-        try {
-            long readValue = 
LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
-            if (writeValue != readValue) {
-                LOGGER.warn(
-                        "The writer can write but the written values wasn't 
successfully read back (wrote: {}, read:{})",
-                        writeValue, readValue);
-            }
-        } finally {
-            testClient.deleteObjects(bucket, Collections.singleton(path));
-        }
-    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index cdaa6dcf80..96aa929b15 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -18,22 +18,12 @@
  */
 package org.apache.asterix.cloud.writer;
 
-import static 
org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Random;
 
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -42,22 +32,15 @@ import 
org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
 import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
 
-public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFactory {
+public final class S3ExternalFileWriterFactory extends 
AbstractCloudExternalFileWriterFactory {
     private static final long serialVersionUID = 4551318140901866805L;
-    private static final Logger LOGGER = LogManager.getLogger();
     static final char SEPARATOR = '/';
     public static final IExternalFileWriterFactoryProvider PROVIDER = new 
IExternalFileWriterFactoryProvider() {
         @Override
@@ -70,18 +53,28 @@ public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFac
             return SEPARATOR;
         }
     };
-    private final Map<String, String> configuration;
-    private final SourceLocation pathSourceLocation;
-    private final String staticPath;
-    private transient S3CloudClient cloudClient;
 
     private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
-        configuration = externalConfig.getConfiguration();
-        pathSourceLocation = externalConfig.getPathSourceLocation();
-        staticPath = externalConfig.getStaticPath();
+        super(externalConfig);
         cloudClient = null;
     }
 
+    @Override
+    ICloudClient createCloudClient() throws CompilationException {
+        S3ClientConfig config = S3ClientConfig.of(configuration);
+        return new S3CloudClient(config, 
S3Utils.buildAwsS3Client(configuration));
+    }
+
+    @Override
+    boolean isNoContainerFoundException(IOException e) {
+        return e.getCause() instanceof NoSuchBucketException;
+    }
+
+    @Override
+    boolean isSdkException(Throwable e) {
+        return e instanceof SdkException;
+    }
+
     @Override
     public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
@@ -93,108 +86,8 @@ public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFac
                 pathSourceLocation);
     }
 
-    private void buildClient() throws HyracksDataException {
-        try {
-            synchronized (this) {
-                if (cloudClient == null) {
-                    // only a single client should be built
-                    S3ClientConfig config = S3ClientConfig.of(configuration);
-                    cloudClient = new S3CloudClient(config, 
S3Utils.buildAwsS3Client(configuration));
-                }
-            }
-        } catch (CompilationException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
     @Override
     public char getSeparator() {
         return SEPARATOR;
     }
-
-    @Override
-    public void validate() throws AlgebricksException {
-        S3ClientConfig config = S3ClientConfig.of(configuration);
-        ICloudClient testClient = new S3CloudClient(config, 
S3Utils.buildAwsS3Client(configuration));
-        String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
-        if (bucket == null || bucket.isEmpty()) {
-            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
-                    ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        }
-
-        try {
-            doValidate(testClient, bucket);
-        } catch (IOException e) {
-            if (e.getCause() instanceof NoSuchBucketException) {
-                throw 
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, 
bucket);
-            } else {
-                throw 
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
-                        ExceptionUtils.getMessageOrToString(e));
-            }
-        } catch (SdkException e) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
e, getMessageOrToString(e));
-        }
-    }
-
-    private void doValidate(ICloudClient testClient, String bucket) throws 
IOException, AlgebricksException {
-        if (staticPath != null) {
-            if (isExceedingMaxLength(staticPath, 
S3ExternalFileWriter.MAX_LENGTH_IN_BYTES)) {
-                throw new 
CompilationException(ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH, 
pathSourceLocation,
-                        staticPath, S3ExternalFileWriter.MAX_LENGTH_IN_BYTES,
-                        ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
-            }
-
-            if (!testClient.isEmptyPrefix(bucket, staticPath)) {
-                // Ensure that the static path is empty
-                throw new 
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, 
staticPath);
-            }
-        }
-
-        // do not validate write permissions if specified by the user not to 
do so
-        String validateWritePermissions = configuration
-                
.getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION, 
Boolean.TRUE.toString());
-        if (!Boolean.parseBoolean(validateWritePermissions)) {
-            return;
-        }
-
-        Random random = new Random();
-        String pathPrefix = "testFile";
-        String path = pathPrefix + random.nextInt();
-        while (testClient.exists(bucket, path)) {
-            path = pathPrefix + random.nextInt();
-        }
-
-        long writeValue = random.nextLong();
-        byte[] data = new byte[Long.BYTES];
-        LongPointable.setLong(data, 0, writeValue);
-        ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, 
path);
-        CloudResettableInputStream stream = null;
-        boolean aborted = false;
-        try {
-            stream = new CloudResettableInputStream(writer, new 
WriterSingleBufferProvider());
-            stream.write(data, 0, data.length);
-        } catch (HyracksDataException e) {
-            stream.abort();
-            aborted = true;
-        } finally {
-            if (stream != null && !aborted) {
-                stream.finish();
-                stream.close();
-            }
-        }
-
-        try {
-            long readValue = 
LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
-            if (writeValue != readValue) {
-                // This should never happen unless S3 is messed up. But log 
for sanity check
-                LOGGER.warn(
-                        "The writer can write but the written values wasn't 
successfully read back (wrote: {}, read:{})",
-                        writeValue, readValue);
-            }
-        } finally {
-            // Delete the written file
-            testClient.deleteObjects(bucket, Collections.singleton(path));
-        }
-    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
similarity index 83%
rename from 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
rename to 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
index 92d7f12ba8..484f372e27 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
@@ -23,8 +23,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.junit.FixMethodOrder;
@@ -32,7 +32,7 @@ import org.junit.Test;
 import org.junit.runners.MethodSorters;
 
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public abstract class LSMTest {
+public abstract class AbstractLSMTest {
     public static final Logger LOGGER = LogManager.getLogger();
 
     public static final String BTREE_SUFFIX = "b";
@@ -53,28 +53,23 @@ public abstract class LSMTest {
 
     @Test
     public void a1writeToS3Test() throws IOException {
-        CloudResettableInputStream stream = null;
+        IWriteBufferProvider bufferProvider = new 
WriterSingleBufferProvider(CLOUD_CLIENT.getWriteBufferSize());
+        ICloudWriter cloudWriter =
+                CLOUD_CLIENT.createdWriter(PLAYGROUND_CONTAINER, 
BUCKET_STORAGE_ROOT + "/0_b", bufferProvider);
 
         try {
-            ICloudBufferedWriter s3BufferedWriter =
-                    CLOUD_CLIENT.createBufferedWriter(PLAYGROUND_CONTAINER, 
BUCKET_STORAGE_ROOT + "/0_b");
-            stream = new CloudResettableInputStream(s3BufferedWriter, new 
WriteBufferProvider(1));
             ByteBuffer content = createContent(BUFFER_SIZE);
             int size = 0;
             for (int i = 0; i < 10; i++) {
                 content.clear();
-                size += stream.write(content);
+                size += cloudWriter.write(content);
             }
-            stream.finish();
+            cloudWriter.finish();
             System.err.println(size);
         } catch (Exception e) {
             e.printStackTrace();
-            if (stream != null) {
-                stream.abort();
-            }
-        } finally {
-            if (stream != null) {
-                stream.close();
+            if (cloudWriter != null) {
+                cloudWriter.abort();
             }
         }
     }
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 87a3e2988f..86bb5ad714 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.cloud.gcs;
 
-import org.apache.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
 import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
 import org.junit.AfterClass;
@@ -31,7 +31,7 @@ import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageClass;
 import com.google.cloud.storage.StorageOptions;
 
-public class LSMGCSTest extends LSMTest {
+public class LSMGCSTest extends AbstractLSMTest {
     private static Storage client;
     private static final int MOCK_SERVER_PORT = 4443;
     private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:"; + 
MOCK_SERVER_PORT;
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 0452da0b05..c785e57e8a 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -20,7 +20,7 @@ package org.apache.asterix.cloud.s3;
 
 import java.net.URI;
 
-import org.apache.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import org.junit.AfterClass;
@@ -34,7 +34,7 @@ import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
 
-public class LSMS3Test extends LSMTest {
+public class LSMS3Test extends AbstractLSMTest {
 
     private static S3Client client;
     private static S3Mock s3MockServer;

Reply via email to