This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7c5d0a8128 [ASTERIXDB-3393][STO] Refactor Cloud writer
7c5d0a8128 is described below
commit 7c5d0a8128353c00dc26d5c805eef67c8f060d7c
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon May 6 12:44:15 2024 -0700
[ASTERIXDB-3393][STO] Refactor Cloud writer
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Introduce ICloudWriter, which abstracts the write
operations, in preparation for other cloud providers.
Change-Id: I52124696b50d6dcc1f3c65b7e0fe251df1579ac5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18259
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Ian Maxon <[email protected]>
---
.../asterix/cloud/AbstractCloudIOManager.java | 31 ++---
.../org/apache/asterix/cloud/CloudFileHandle.java | 21 +--
.../apache/asterix/cloud/CloudOutputStream.java | 16 ++-
.../asterix/cloud/CloudResettableInputStream.java | 80 ++++++++----
.../apache/asterix/cloud/EagerCloudIOManager.java | 2 +-
.../apache/asterix/cloud/LazyCloudIOManager.java | 5 +-
.../apache/asterix/cloud/WriteBufferProvider.java | 12 +-
.../asterix/cloud/WriterSingleBufferProvider.java | 9 +-
.../asterix/cloud/clients/CloudClientProvider.java | 6 +-
.../apache/asterix/cloud/clients/ICloudClient.java | 14 +-
.../apache/asterix/cloud/clients/ICloudWriter.java | 74 +++++++++++
.../cloud/clients/aws/s3/S3ClientConfig.java | 2 +
.../cloud/clients/aws/s3/S3CloudClient.java | 15 ++-
.../cloud/clients/google/gcs/GCSClientConfig.java | 2 +
.../cloud/clients/google/gcs/GCSCloudClient.java | 31 +++--
.../gcs/{GCSBufferedWriter.java => GCSWriter.java} | 42 +++---
.../asterix/cloud/lazy/accessor/ILazyAccessor.java | 4 +-
.../asterix/cloud/lazy/accessor/LocalAccessor.java | 4 +-
.../lazy/accessor/ReplaceableCloudAccessor.java | 4 +-
.../writer/AbstractCloudExternalFileWriter.java | 17 +--
...=> AbstractCloudExternalFileWriterFactory.java} | 96 +++++---------
.../cloud/writer/GCSExternalFileWriterFactory.java | 138 +++-----------------
.../cloud/writer/S3ExternalFileWriterFactory.java | 143 +++------------------
.../cloud/{LSMTest.java => AbstractLSMTest.java} | 23 ++--
.../org/apache/asterix/cloud/gcs/LSMGCSTest.java | 4 +-
.../org/apache/asterix/cloud/s3/LSMS3Test.java | 4 +-
26 files changed, 329 insertions(+), 470 deletions(-)
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 368be26316..7ab8a5a99d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -34,6 +34,7 @@ import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
import org.apache.asterix.cloud.clients.CloudClientProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.util.CloudFileUtil;
import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.cloud.IPartitionBootstrapper;
@@ -71,7 +72,7 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
this.bucket = cloudProperties.getStorageBucket();
cloudClient = CloudClientProvider.getClient(cloudProperties);
int numOfThreads = getIODevices().size() * getIOParallelism();
- writeBufferProvider = new WriteBufferProvider(numOfThreads);
+ writeBufferProvider = new WriteBufferProvider(numOfThreads,
cloudClient.getWriteBufferSize());
partitions = new HashSet<>();
partitionPaths = new ArrayList<>();
this.localIoManager = ioManager;
@@ -165,8 +166,9 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
@Override
public final IFileHandle open(FileReference fileRef, FileReadWriteMode
rwMode, FileSyncMode syncMode)
throws HyracksDataException {
- CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket,
fileRef, writeBufferProvider);
- onOpen(fHandle, rwMode, syncMode);
+ ICloudWriter cloudWriter = cloudClient.createdWriter(bucket,
fileRef.getRelativePath(), writeBufferProvider);
+ CloudFileHandle fHandle = new CloudFileHandle(fileRef, cloudWriter);
+ onOpen(fHandle);
try {
fHandle.open(rwMode, syncMode);
} catch (IOException e) {
@@ -180,18 +182,17 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
*
* @param fileHandle file to open
*/
- protected abstract void onOpen(CloudFileHandle fileHandle,
FileReadWriteMode rwMode, FileSyncMode syncMode)
- throws HyracksDataException;
+ protected abstract void onOpen(CloudFileHandle fileHandle) throws
HyracksDataException;
@Override
public final long doSyncWrite(IFileHandle fHandle, long offset,
ByteBuffer[] dataArray)
throws HyracksDataException {
long writtenBytes = localIoManager.doSyncWrite(fHandle, offset,
dataArray);
- CloudResettableInputStream inputStream = ((CloudFileHandle)
fHandle).getInputStream();
+ ICloudWriter cloudWriter = ((CloudFileHandle)
fHandle).getCloudWriter();
try {
- inputStream.write(dataArray[0], dataArray[1]);
+ cloudWriter.write(dataArray[0], dataArray[1]);
} catch (HyracksDataException e) {
- inputStream.abort();
+ cloudWriter.abort();
throw e;
}
return writtenBytes;
@@ -200,11 +201,11 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
@Override
public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer
dataArray) throws HyracksDataException {
int writtenBytes = localIoManager.doSyncWrite(fHandle, offset,
dataArray);
- CloudResettableInputStream inputStream = ((CloudFileHandle)
fHandle).getInputStream();
+ ICloudWriter cloudWriter = ((CloudFileHandle)
fHandle).getCloudWriter();
try {
- inputStream.write(dataArray);
+ cloudWriter.write(dataArray);
} catch (HyracksDataException e) {
- inputStream.abort();
+ cloudWriter.abort();
throw e;
}
return writtenBytes;
@@ -231,16 +232,16 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
if (metadata) {
// only finish writing if metadata == true to prevent write
limiter from finishing the stream and
// completing the upload.
- CloudResettableInputStream stream = ((CloudFileHandle)
fileHandle).getInputStream();
+ ICloudWriter cloudWriter = ((CloudFileHandle)
fileHandle).getCloudWriter();
try {
- stream.finish();
+ cloudWriter.finish();
} catch (HyracksDataException e) {
savedEx = e;
}
if (savedEx != null) {
try {
- stream.abort();
+ cloudWriter.abort();
} catch (HyracksDataException e) {
savedEx.addSuppressed(e);
}
@@ -286,7 +287,7 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
/**
* Writes the bytes to the specified key in the bucket
*
- * @param key the key where the bytes will be written
+ * @param key the key where the bytes will be written
* @param bytes the bytes to write
*/
public final void put(String key, byte[] bytes) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 14c44ad5ad..0ae93cfca0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -20,20 +20,17 @@ package org.apache.asterix.cloud;
import java.io.IOException;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
-import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.control.nc.io.FileHandle;
public class CloudFileHandle extends FileHandle {
- private final CloudResettableInputStream inputStream;
+ private final ICloudWriter cloudWriter;
- public CloudFileHandle(ICloudClient cloudClient, String bucket,
FileReference fileRef,
- IWriteBufferProvider bufferProvider) {
+ public CloudFileHandle(FileReference fileRef, ICloudWriter cloudWriter) {
super(fileRef);
- ICloudBufferedWriter bufferedWriter =
cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath());
- inputStream = new CloudResettableInputStream(bufferedWriter,
bufferProvider);
+ this.cloudWriter = cloudWriter;
}
@Override
@@ -43,13 +40,7 @@ public class CloudFileHandle extends FileHandle {
}
}
- @Override
- public synchronized void close() throws IOException {
- inputStream.close();
- super.close();
- }
-
- public CloudResettableInputStream getInputStream() {
- return inputStream;
+ public ICloudWriter getCloudWriter() {
+ return cloudWriter;
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
index 349b1b1baf..bea91fb60e 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
@@ -21,29 +21,31 @@ package org.apache.asterix.cloud;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.asterix.cloud.clients.ICloudWriter;
+
public final class CloudOutputStream extends OutputStream {
- private final CloudResettableInputStream inputStream;
+ private final ICloudWriter cloudWriter;
- public CloudOutputStream(CloudResettableInputStream inputStream) {
- this.inputStream = inputStream;
+ public CloudOutputStream(ICloudWriter cloudWriter) {
+ this.cloudWriter = cloudWriter;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
- inputStream.write(b, off, len);
+ cloudWriter.write(b, off, len);
}
@Override
public void write(int b) throws IOException {
- inputStream.write(b);
+ cloudWriter.write(b);
}
@Override
public void close() throws IOException {
- inputStream.finish();
+ cloudWriter.finish();
}
public void abort() throws IOException {
- inputStream.abort();
+ cloudWriter.abort();
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 053318468a..885d612727 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -23,14 +23,13 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class CloudResettableInputStream extends InputStream {
+public class CloudResettableInputStream extends InputStream implements
ICloudWriter {
private static final Logger LOGGER = LogManager.getLogger();
- // TODO: make configurable
- public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024;
private final IWriteBufferProvider bufferProvider;
private ByteBuffer writeBuffer;
@@ -41,12 +40,10 @@ public class CloudResettableInputStream extends InputStream
{
this.bufferProvider = bufferProvider;
}
- private void open() {
- if (writeBuffer == null) {
- writeBuffer = bufferProvider.getBuffer();
- writeBuffer.clear();
- }
- }
+ /* ************************************************************
+ * InputStream methods
+ * ************************************************************
+ */
@Override
public void reset() {
@@ -63,16 +60,23 @@ public class CloudResettableInputStream extends InputStream
{
writeBuffer.mark();
}
- public void write(ByteBuffer header, ByteBuffer page) throws
HyracksDataException {
- write(header);
- write(page);
+ /* ************************************************************
+ * ICloudWriter methods
+ * ************************************************************
+ */
+
+ @Override
+ public int write(ByteBuffer header, ByteBuffer page) throws
HyracksDataException {
+ return write(header) + write(page);
}
+ @Override
public int write(ByteBuffer page) throws HyracksDataException {
open();
return write(page.array(), 0, page.limit());
}
+ @Override
public void write(int b) throws HyracksDataException {
if (writeBuffer.remaining() == 0) {
uploadAndWait();
@@ -80,6 +84,7 @@ public class CloudResettableInputStream extends InputStream {
writeBuffer.put((byte) b);
}
+ @Override
public int write(byte[] b, int off, int len) throws HyracksDataException {
open();
@@ -108,6 +113,23 @@ public class CloudResettableInputStream extends
InputStream {
return len;
}
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (writeBuffer.remaining() == 0) {
+ return -1;
+ }
+
+ int length = Math.min(len, writeBuffer.remaining());
+ writeBuffer.get(b, off, length);
+ return length;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return writeBuffer.get();
+ }
+
+ @Override
public void finish() throws HyracksDataException {
open();
try {
@@ -124,14 +146,32 @@ public class CloudResettableInputStream extends
InputStream {
} finally {
returnBuffer();
}
+ doClose();
}
+ @Override
public void abort() throws HyracksDataException {
try {
bufferedWriter.abort();
} finally {
returnBuffer();
}
+ doClose();
+ }
+
+ private void open() {
+ if (writeBuffer == null) {
+ writeBuffer = bufferProvider.getBuffer();
+ writeBuffer.clear();
+ }
+ }
+
+ private void doClose() throws HyracksDataException {
+ try {
+ close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
}
private void uploadAndWait() throws HyracksDataException {
@@ -146,22 +186,6 @@ public class CloudResettableInputStream extends
InputStream {
writeBuffer.clear();
}
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (writeBuffer.remaining() == 0) {
- return -1;
- }
-
- int length = Math.min(len, writeBuffer.remaining());
- writeBuffer.get(b, off, length);
- return length;
- }
-
- @Override
- public int read() throws IOException {
- return writeBuffer.get();
- }
-
private void returnBuffer() {
if (writeBuffer != null) {
bufferProvider.recycle(writeBuffer);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index d0b982c834..0b4200cef0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -65,7 +65,7 @@ final class EagerCloudIOManager extends
AbstractCloudIOManager {
}
@Override
- protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode
rwMode, FileSyncMode syncMode) {
+ protected void onOpen(CloudFileHandle fileHandle) {
// NoOp
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 6ecd201425..cb47d00a88 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -136,9 +136,8 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
}
@Override
- protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode
rwMode, FileSyncMode syncMode)
- throws HyracksDataException {
- accessor.doOnOpen(fileHandle, rwMode, syncMode);
+ protected void onOpen(CloudFileHandle fileHandle) throws
HyracksDataException {
+ accessor.doOnOpen(fileHandle);
}
/*
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
index ee174000e6..d1eb1eef37 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.cloud;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
-
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -27,11 +25,13 @@ import java.util.concurrent.BlockingQueue;
import org.apache.hyracks.util.annotations.ThreadSafe;
@ThreadSafe
-public class WriteBufferProvider implements IWriteBufferProvider {
+public final class WriteBufferProvider implements IWriteBufferProvider {
+ private final int bufferSize;
private final BlockingQueue<ByteBuffer> writeBuffers;
- public WriteBufferProvider(int ioParallelism) {
- writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
+ public WriteBufferProvider(int numberOfBuffers, int bufferSize) {
+ this.bufferSize = bufferSize;
+ writeBuffers = new ArrayBlockingQueue<>(numberOfBuffers);
}
@Override
@@ -43,7 +43,7 @@ public class WriteBufferProvider implements
IWriteBufferProvider {
public ByteBuffer getBuffer() {
ByteBuffer writeBuffer = writeBuffers.poll();
if (writeBuffer == null) {
- return ByteBuffer.allocate(MIN_BUFFER_SIZE);
+ return ByteBuffer.allocate(bufferSize);
}
return writeBuffer;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
index 287900d907..fab1cc2e57 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
@@ -18,19 +18,16 @@
*/
package org.apache.asterix.cloud;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
-
import java.nio.ByteBuffer;
import org.apache.hyracks.util.annotations.NotThreadSafe;
@NotThreadSafe
-public class WriterSingleBufferProvider implements IWriteBufferProvider {
-
+public final class WriterSingleBufferProvider implements IWriteBufferProvider {
private final ByteBuffer buffer;
- public WriterSingleBufferProvider() {
- buffer = ByteBuffer.allocate(MIN_BUFFER_SIZE);
+ public WriterSingleBufferProvider(int size) {
+ buffer = ByteBuffer.allocate(size);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index cc511c7236..35ab467107 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -26,6 +26,8 @@ import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class CloudClientProvider {
+ private static final String S3 = "s3";
+ private static final String GCS = "gcs";
private CloudClientProvider() {
throw new AssertionError("do not instantiate");
@@ -33,10 +35,10 @@ public class CloudClientProvider {
public static ICloudClient getClient(CloudProperties cloudProperties)
throws HyracksDataException {
String storageScheme = cloudProperties.getStorageScheme();
- if ("s3".equalsIgnoreCase(storageScheme)) {
+ if (S3.equalsIgnoreCase(storageScheme)) {
S3ClientConfig config = S3ClientConfig.of(cloudProperties);
return new S3CloudClient(config);
- } else if ("gcs".equalsIgnoreCase(storageScheme)) {
+ } else if (GCS.equalsIgnoreCase(storageScheme)) {
GCSClientConfig config = GCSClientConfig.of(cloudProperties);
return new GCSCloudClient(config);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 2bd08028ca..0307846cac 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Set;
+import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.control.nc.io.IOManager;
@@ -35,15 +36,20 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* Interface containing methods to perform IO operation on the Cloud Storage
*/
public interface ICloudClient {
+ /**
+ * @return write buffer size
+ */
+ int getWriteBufferSize();
/**
* Creates a cloud buffered writer
*
- * @param bucket bucket to write to
- * @param path path to write to
- * @return buffered writer
+ * @param bucket bucket to write to
+ * @param path path to write to
+ * @param bufferProvider buffer provider
+ * @return cloud writer
*/
- ICloudBufferedWriter createBufferedWriter(String bucket, String path);
+ ICloudWriter createdWriter(String bucket, String path,
IWriteBufferProvider bufferProvider);
/**
* Lists objects at the specified bucket and path, and applies the file
name filter on the returned objects
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
new file mode 100644
index 0000000000..15822c4d9c
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A cloud-based writer that write bytes sequentially in a cloud blob storage
+ */
+public interface ICloudWriter {
+ /**
+ * Write a header and a page
+ *
+ * @param header to write
+ * @param page to write
+ * @return written bytes
+ */
+ int write(ByteBuffer header, ByteBuffer page) throws HyracksDataException;
+
+ /**
+ * Write a page
+ *
+ * @param page to write
+ * @return written bytes
+ */
+ int write(ByteBuffer page) throws HyracksDataException;
+
+ /**
+ * Write a byte
+ *
+ * @param b to write
+ */
+ void write(int b) throws HyracksDataException;
+
+ /**
+ * Write a byte array
+ *
+ * @param b bytes to write
+ * @param off starting offset
+ * @param len length to write
+ * @return written bytes
+ */
+ int write(byte[] b, int off, int len) throws HyracksDataException;
+
+ /**
+ * Finish the write operation
+ * Note: this should be called upon successful write
+ */
+ void finish() throws HyracksDataException;
+
+ /**
+ * Abort the write operation
+ * Note: should be called instead of {@link #finish()} when the write
operation encountered an error
+ */
+ void abort() throws HyracksDataException;
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index bc13078a47..161fb37fc5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -22,12 +22,14 @@ import java.util.Map;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.hyracks.util.StorageUtil;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
public final class S3ClientConfig {
+ static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
// The maximum number of file that can be deleted (AWS restriction)
static final int DELETE_BATCH_SIZE = 1000;
private final String region;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 5cdf971c09..9c31e17000 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -35,8 +35,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
@@ -69,7 +72,7 @@ import
software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
@ThreadSafe
-public class S3CloudClient implements ICloudClient {
+public final class S3CloudClient implements ICloudClient {
private final S3ClientConfig config;
private final S3Client s3Client;
private final IRequestProfiler profiler;
@@ -90,8 +93,14 @@ public class S3CloudClient implements ICloudClient {
}
@Override
- public ICloudBufferedWriter createBufferedWriter(String bucket, String
path) {
- return new S3BufferedWriter(s3Client, profiler, bucket, path);
+ public int getWriteBufferSize() {
+ return S3ClientConfig.WRITE_BUFFER_SIZE;
+ }
+
+ @Override
+ public ICloudWriter createdWriter(String bucket, String path,
IWriteBufferProvider bufferProvider) {
+ ICloudBufferedWriter bufferedWriter = new S3BufferedWriter(s3Client,
profiler, bucket, path);
+ return new CloudResettableInputStream(bufferedWriter, bufferProvider);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index e8e44805bc..4edb7a7175 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -25,12 +25,14 @@ import java.util.Map;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.StorageUtil;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.NoCredentials;
public class GCSClientConfig {
+ public static final int WRITE_BUFFER_SIZE =
StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE);
// The maximum number of files that can be deleted (GCS restriction):
https://cloud.google.com/storage/quotas#json-requests
static final int DELETE_BATCH_SIZE = 100;
private final String region;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 2b7303d382..c725ca5089 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -32,8 +32,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
@@ -60,7 +61,6 @@ import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
public class GCSCloudClient implements ICloudClient {
-
private final Storage gcsClient;
private final GCSClientConfig config;
private final IRequestProfiler profiler;
@@ -80,18 +80,14 @@ public class GCSCloudClient implements ICloudClient {
this(config, buildClient(config));
}
- private static Storage buildClient(GCSClientConfig config) throws
HyracksDataException {
- StorageOptions.Builder builder =
StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
-
- if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
- builder.setHost(config.getEndpoint());
- }
- return builder.build().getService();
+ @Override
+ public int getWriteBufferSize() {
+ return GCSClientConfig.WRITE_BUFFER_SIZE;
}
@Override
- public ICloudBufferedWriter createBufferedWriter(String bucket, String
path) {
- return new GCSBufferedWriter(bucket, path, gcsClient, profiler);
+ public ICloudWriter createdWriter(String bucket, String path,
IWriteBufferProvider bufferProvider) {
+ return new GCSWriter(bucket, path, gcsClient, profiler);
}
@Override
@@ -115,12 +111,10 @@ public class GCSCloudClient implements ICloudClient {
BlobId blobId = BlobId.of(bucket, path);
long readTo = offset + buffer.remaining();
int totalRead = 0;
- int read = 0;
try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
while (buffer.remaining() > 0) {
from.seek(offset + totalRead);
- read = from.read(buffer);
- totalRead += read;
+ totalRead += from.read(buffer);
}
} catch (IOException | StorageException ex) {
throw HyracksDataException.create(ex);
@@ -248,4 +242,13 @@ public class GCSCloudClient implements ICloudClient {
throw HyracksDataException.create(ex);
}
}
+
+ private static Storage buildClient(GCSClientConfig config) throws
HyracksDataException {
+ StorageOptions.Builder builder =
StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
+
+ if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+ builder.setHost(config.getEndpoint());
+ }
+ return builder.build().getService();
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
similarity index 74%
rename from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
rename to
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 4f9d437f63..cccd9ec05b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSBufferedWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -18,13 +18,12 @@
*/
package org.apache.asterix.cloud.clients.google.gcs;
-import static
org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+import static
org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.WRITE_BUFFER_SIZE;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
@@ -35,17 +34,16 @@ import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
-public class GCSBufferedWriter implements ICloudBufferedWriter {
+public class GCSWriter implements ICloudWriter {
private static final Logger LOGGER = LogManager.getLogger();
private final String bucket;
private final String path;
private final IRequestProfiler profiler;
private final Storage gcsClient;
private boolean uploadStarted = false;
- private int partNumber;
private WriteChannel writer = null;
- public GCSBufferedWriter(String bucket, String path, Storage gcsClient,
IRequestProfiler profiler) {
+ public GCSWriter(String bucket, String path, Storage gcsClient,
IRequestProfiler profiler) {
this.bucket = bucket;
this.path = path;
this.profiler = profiler;
@@ -53,30 +51,39 @@ public class GCSBufferedWriter implements
ICloudBufferedWriter {
}
@Override
- public int upload(InputStream stream, int length) throws
HyracksDataException {
+ public int write(ByteBuffer header, ByteBuffer page) throws
HyracksDataException {
+ return write(header) + write(page);
+ }
+
+ @Override
+ public int write(ByteBuffer page) throws HyracksDataException {
profiler.objectMultipartUpload();
setUploadId();
+ int written = 0;
try {
- ByteBuffer buffer = ByteBuffer.wrap(stream.readNBytes(length));
- while (buffer.hasRemaining()) {
- writer.write(buffer);
+ while (page.hasRemaining()) {
+ written += writer.write(page);
}
} catch (IOException e) {
throw HyracksDataException.create(e);
}
- return partNumber++;
+
+ return written;
+ }
+
+ @Override
+ public int write(byte[] b, int off, int len) throws HyracksDataException {
+ return write(ByteBuffer.wrap(b, off, len));
}
@Override
- public boolean isEmpty() {
- return !uploadStarted;
+ public void write(int b) throws HyracksDataException {
+ write(ByteBuffer.wrap(new byte[] { (byte) b }));
}
@Override
public void finish() throws HyracksDataException {
- if (!uploadStarted) {
- throw new IllegalStateException("Cannot finish without writing any
bytes");
- }
+ setUploadId();
profiler.objectMultipartUpload();
try {
writer.close();
@@ -99,9 +106,8 @@ public class GCSBufferedWriter implements
ICloudBufferedWriter {
private void setUploadId() {
if (!uploadStarted) {
uploadStarted = true;
- partNumber = 1;
writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket,
path)).build());
- writer.setChunkSize(MIN_BUFFER_SIZE);
+ writer.setChunkSize(WRITE_BUFFER_SIZE);
log("STARTED");
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index 8f803a0792..534ff5ded5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -25,15 +25,13 @@ import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
public interface ILazyAccessor {
boolean isLocalAccessor();
IBulkOperationCallBack getBulkOperationCallBack();
- void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode
rwMode, IIOManager.FileSyncMode syncMode)
- throws HyracksDataException;
+ void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException;
Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws
HyracksDataException;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
index 378cf0365e..ae324020e0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java
@@ -27,7 +27,6 @@ import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.control.nc.io.IOManager;
/**
@@ -50,8 +49,7 @@ public class LocalAccessor extends AbstractLazyAccessor {
}
@Override
- public void doOnOpen(CloudFileHandle fileHandle,
IIOManager.FileReadWriteMode rwMode,
- IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+ public void doOnOpen(CloudFileHandle fileHandle) throws
HyracksDataException {
// NoOp
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index 277d42563e..e4e168eb72 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -30,7 +30,6 @@ import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -70,8 +69,7 @@ public class ReplaceableCloudAccessor extends
AbstractLazyAccessor {
}
@Override
- public void doOnOpen(CloudFileHandle fileHandle,
IIOManager.FileReadWriteMode rwMode,
- IIOManager.FileSyncMode syncMode) throws HyracksDataException {
+ public void doOnOpen(CloudFileHandle fileHandle) throws
HyracksDataException {
FileReference fileRef = fileHandle.getFileReference();
if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket,
fileRef.getRelativePath())) {
if (cacher.downloadData(fileRef)) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index bbae29a974..4277800797 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -21,11 +21,10 @@ package org.apache.asterix.cloud.writer;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import org.apache.asterix.cloud.CloudOutputStream;
-import org.apache.asterix.cloud.CloudResettableInputStream;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
@@ -46,7 +45,7 @@ abstract class AbstractCloudExternalFileWriter implements
IExternalFileWriter {
private final IWarningCollector warningCollector;
private final SourceLocation pathSourceLocation;
private final IWriteBufferProvider bufferProvider;
- private ICloudBufferedWriter bufferedWriter;
+ private ICloudWriter cloudWriter;
AbstractCloudExternalFileWriter(IExternalPrinter printer, ICloudClient
cloudClient, String bucket,
boolean partitionedPath, IWarningCollector warningCollector,
SourceLocation pathSourceLocation) {
@@ -56,7 +55,7 @@ abstract class AbstractCloudExternalFileWriter implements
IExternalFileWriter {
this.partitionedPath = partitionedPath;
this.warningCollector = warningCollector;
this.pathSourceLocation = pathSourceLocation;
- bufferProvider = new WriterSingleBufferProvider();
+ bufferProvider = new
WriterSingleBufferProvider(cloudClient.getWriteBufferSize());
}
@Override
@@ -82,10 +81,8 @@ abstract class AbstractCloudExternalFileWriter implements
IExternalFileWriter {
return false;
}
- bufferedWriter = cloudClient.createBufferedWriter(bucket, fullPath);
- CloudResettableInputStream inputStream = new
CloudResettableInputStream(bufferedWriter, bufferProvider);
-
- CloudOutputStream outputStream = new CloudOutputStream(inputStream);
+ cloudWriter = cloudClient.createdWriter(bucket, fullPath,
bufferProvider);
+ CloudOutputStream outputStream = new CloudOutputStream(cloudWriter);
printer.newStream(outputStream);
return true;
@@ -108,8 +105,8 @@ abstract class AbstractCloudExternalFileWriter implements
IExternalFileWriter {
@Override
public final void abort() throws HyracksDataException {
try {
- if (bufferedWriter != null) {
- bufferedWriter.abort();
+ if (cloudWriter != null) {
+ cloudWriter.abort();
}
printer.close();
} catch (HyracksDataException e) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
similarity index 60%
copy from
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
copy to
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index cdaa6dcf80..75c4ec5f61 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -26,80 +26,49 @@ import java.util.Collections;
import java.util.Map;
import java.util.Random;
-import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
-import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalPrinter;
-import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
-
-public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFactory {
- private static final long serialVersionUID = 4551318140901866805L;
+abstract class AbstractCloudExternalFileWriterFactory implements
IExternalFileWriterFactory {
+ private static final long serialVersionUID = -6204498482419719403L;
private static final Logger LOGGER = LogManager.getLogger();
- static final char SEPARATOR = '/';
- public static final IExternalFileWriterFactoryProvider PROVIDER = new
IExternalFileWriterFactoryProvider() {
- @Override
- public IExternalFileWriterFactory
create(ExternalFileWriterConfiguration configuration) {
- return new S3ExternalFileWriterFactory(configuration);
- }
- @Override
- public char getSeparator() {
- return SEPARATOR;
- }
- };
- private final Map<String, String> configuration;
- private final SourceLocation pathSourceLocation;
- private final String staticPath;
- private transient S3CloudClient cloudClient;
+ protected final Map<String, String> configuration;
+ protected final SourceLocation pathSourceLocation;
+ protected final String staticPath;
+ protected transient ICloudClient cloudClient;
- private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration
externalConfig) {
+ AbstractCloudExternalFileWriterFactory(ExternalFileWriterConfiguration
externalConfig) {
configuration = externalConfig.getConfiguration();
pathSourceLocation = externalConfig.getPathSourceLocation();
staticPath = externalConfig.getStaticPath();
- cloudClient = null;
}
- @Override
- public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalPrinterFactory printerFactory)
- throws HyracksDataException {
- buildClient();
- String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- IExternalPrinter printer = printerFactory.createPrinter();
- IWarningCollector warningCollector = context.getWarningCollector();
- return new S3ExternalFileWriter(printer, cloudClient, bucket,
staticPath == null, warningCollector,
- pathSourceLocation);
- }
+ abstract ICloudClient createCloudClient() throws CompilationException;
- private void buildClient() throws HyracksDataException {
+ abstract boolean isNoContainerFoundException(IOException e);
+
+ abstract boolean isSdkException(Throwable e);
+
+ final void buildClient() throws HyracksDataException {
try {
synchronized (this) {
if (cloudClient == null) {
- // only a single client should be built
- S3ClientConfig config = S3ClientConfig.of(configuration);
- cloudClient = new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration));
+ cloudClient = createCloudClient();
}
}
} catch (CompilationException e) {
@@ -108,14 +77,8 @@ public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFac
}
@Override
- public char getSeparator() {
- return SEPARATOR;
- }
-
- @Override
- public void validate() throws AlgebricksException {
- S3ClientConfig config = S3ClientConfig.of(configuration);
- ICloudClient testClient = new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration));
+ public final void validate() throws AlgebricksException {
+ ICloudClient testClient = createCloudClient();
String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
if (bucket == null || bucket.isEmpty()) {
@@ -126,14 +89,17 @@ public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFac
try {
doValidate(testClient, bucket);
} catch (IOException e) {
- if (e.getCause() instanceof NoSuchBucketException) {
+ if (isNoContainerFoundException(e)) {
throw
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND,
bucket);
} else {
throw
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ExceptionUtils.getMessageOrToString(e));
}
- } catch (SdkException e) {
- throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
e, getMessageOrToString(e));
+ } catch (Throwable e) {
+ if (isSdkException(e)) {
+ throw
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, e,
getMessageOrToString(e));
+ }
+ throw e;
}
}
@@ -168,19 +134,17 @@ public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFac
long writeValue = random.nextLong();
byte[] data = new byte[Long.BYTES];
LongPointable.setLong(data, 0, writeValue);
- ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket,
path);
- CloudResettableInputStream stream = null;
+ IWriteBufferProvider bufferProvider = new
WriterSingleBufferProvider(testClient.getWriteBufferSize());
+ ICloudWriter writer = testClient.createdWriter(bucket, path,
bufferProvider);
boolean aborted = false;
try {
- stream = new CloudResettableInputStream(writer, new
WriterSingleBufferProvider());
- stream.write(data, 0, data.length);
+ writer.write(data, 0, data.length);
} catch (HyracksDataException e) {
- stream.abort();
+ writer.abort();
aborted = true;
} finally {
- if (stream != null && !aborted) {
- stream.finish();
- stream.close();
+ if (writer != null && !aborted) {
+ writer.finish();
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index ca93ff6fa4..32458695cf 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -18,22 +18,12 @@
*/
package org.apache.asterix.cloud.writer;
-import static
org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-
import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Random;
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -42,22 +32,15 @@ import
org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.StorageException;
-public final class GCSExternalFileWriterFactory implements
IExternalFileWriterFactory {
+public final class GCSExternalFileWriterFactory extends
AbstractCloudExternalFileWriterFactory {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LogManager.getLogger();
static final char SEPARATOR = '/';
public static final IExternalFileWriterFactoryProvider PROVIDER = new
IExternalFileWriterFactoryProvider() {
@Override
@@ -70,18 +53,28 @@ public final class GCSExternalFileWriterFactory implements
IExternalFileWriterFa
return SEPARATOR;
}
};
- private final Map<String, String> configuration;
- private final SourceLocation pathSourceLocation;
- private final String staticPath;
- private transient GCSCloudClient cloudClient;
private GCSExternalFileWriterFactory(ExternalFileWriterConfiguration
externalConfig) {
- configuration = externalConfig.getConfiguration();
- pathSourceLocation = externalConfig.getPathSourceLocation();
- staticPath = externalConfig.getStaticPath();
+ super(externalConfig);
cloudClient = null;
}
+ @Override
+ ICloudClient createCloudClient() throws CompilationException {
+ GCSClientConfig config = GCSClientConfig.of(configuration);
+ return new GCSCloudClient(config, GCSUtils.buildClient(configuration));
+ }
+
+ @Override
+ boolean isNoContainerFoundException(IOException e) {
+ return e.getCause() instanceof StorageException;
+ }
+
+ @Override
+ boolean isSdkException(Throwable e) {
+ return e instanceof BaseServiceException;
+ }
+
@Override
public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalPrinterFactory printerFactory)
throws HyracksDataException {
@@ -93,103 +86,8 @@ public final class GCSExternalFileWriterFactory implements
IExternalFileWriterFa
pathSourceLocation);
}
- private void buildClient() throws HyracksDataException {
- try {
- synchronized (this) {
- if (cloudClient == null) {
- GCSClientConfig config = GCSClientConfig.of(configuration);
- cloudClient = new GCSCloudClient(config,
GCSUtils.buildClient(configuration));
- }
- }
- } catch (CompilationException e) {
- throw HyracksDataException.create(e);
- }
- }
-
@Override
public char getSeparator() {
return SEPARATOR;
}
-
- @Override
- public void validate() throws AlgebricksException {
- GCSClientConfig config = GCSClientConfig.of(configuration);
- ICloudClient testClient = new GCSCloudClient(config,
GCSUtils.buildClient(configuration));
- String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
- if (bucket == null || bucket.isEmpty()) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
- ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- }
-
- try {
- doValidate(testClient, bucket);
- } catch (IOException e) {
- if (e.getCause() instanceof StorageException) {
- throw
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND,
bucket);
- } else {
- throw
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
- ExceptionUtils.getMessageOrToString(e));
- }
- } catch (BaseServiceException e) {
- throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
e, getMessageOrToString(e));
- }
- }
-
- private void doValidate(ICloudClient testClient, String bucket) throws
IOException, AlgebricksException {
- if (staticPath != null) {
- if (isExceedingMaxLength(staticPath,
GCSExternalFileWriter.MAX_LENGTH_IN_BYTES)) {
- throw new
CompilationException(ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH,
pathSourceLocation,
- staticPath, GCSExternalFileWriter.MAX_LENGTH_IN_BYTES,
- ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
- }
-
- if (!testClient.isEmptyPrefix(bucket, staticPath)) {
- throw new
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation,
staticPath);
- }
- }
-
- String validateWritePermissions = configuration
-
.getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION,
Boolean.TRUE.toString());
- if (!Boolean.parseBoolean(validateWritePermissions)) {
- return;
- }
-
- Random random = new Random();
- String pathPrefix = "testFile";
- String path = pathPrefix + random.nextInt();
- while (testClient.exists(bucket, path)) {
- path = pathPrefix + random.nextInt();
- }
-
- long writeValue = random.nextLong();
- byte[] data = new byte[Long.BYTES];
- LongPointable.setLong(data, 0, writeValue);
- ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket,
path);
- CloudResettableInputStream stream = null;
- boolean aborted = false;
- try {
- stream = new CloudResettableInputStream(writer, new
WriterSingleBufferProvider());
- stream.write(data, 0, data.length);
- } catch (HyracksDataException e) {
- stream.abort();
- aborted = true;
- } finally {
- if (stream != null && !aborted) {
- stream.finish();
- stream.close();
- }
- }
-
- try {
- long readValue =
LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
- if (writeValue != readValue) {
- LOGGER.warn(
- "The writer can write but the written values wasn't
successfully read back (wrote: {}, read:{})",
- writeValue, readValue);
- }
- } finally {
- testClient.deleteObjects(bucket, Collections.singleton(path));
- }
- }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index cdaa6dcf80..96aa929b15 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -18,22 +18,12 @@
*/
package org.apache.asterix.cloud.writer;
-import static
org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-
import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Random;
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -42,22 +32,15 @@ import
org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
-public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFactory {
+public final class S3ExternalFileWriterFactory extends
AbstractCloudExternalFileWriterFactory {
private static final long serialVersionUID = 4551318140901866805L;
- private static final Logger LOGGER = LogManager.getLogger();
static final char SEPARATOR = '/';
public static final IExternalFileWriterFactoryProvider PROVIDER = new
IExternalFileWriterFactoryProvider() {
@Override
@@ -70,18 +53,28 @@ public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFac
return SEPARATOR;
}
};
- private final Map<String, String> configuration;
- private final SourceLocation pathSourceLocation;
- private final String staticPath;
- private transient S3CloudClient cloudClient;
private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration
externalConfig) {
- configuration = externalConfig.getConfiguration();
- pathSourceLocation = externalConfig.getPathSourceLocation();
- staticPath = externalConfig.getStaticPath();
+ super(externalConfig);
cloudClient = null;
}
+ @Override
+ ICloudClient createCloudClient() throws CompilationException {
+ S3ClientConfig config = S3ClientConfig.of(configuration);
+ return new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration));
+ }
+
+ @Override
+ boolean isNoContainerFoundException(IOException e) {
+ return e.getCause() instanceof NoSuchBucketException;
+ }
+
+ @Override
+ boolean isSdkException(Throwable e) {
+ return e instanceof SdkException;
+ }
+
@Override
public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalPrinterFactory printerFactory)
throws HyracksDataException {
@@ -93,108 +86,8 @@ public final class S3ExternalFileWriterFactory implements
IExternalFileWriterFac
pathSourceLocation);
}
- private void buildClient() throws HyracksDataException {
- try {
- synchronized (this) {
- if (cloudClient == null) {
- // only a single client should be built
- S3ClientConfig config = S3ClientConfig.of(configuration);
- cloudClient = new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration));
- }
- }
- } catch (CompilationException e) {
- throw HyracksDataException.create(e);
- }
- }
-
@Override
public char getSeparator() {
return SEPARATOR;
}
-
- @Override
- public void validate() throws AlgebricksException {
- S3ClientConfig config = S3ClientConfig.of(configuration);
- ICloudClient testClient = new S3CloudClient(config,
S3Utils.buildAwsS3Client(configuration));
- String bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
- if (bucket == null || bucket.isEmpty()) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
- ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- }
-
- try {
- doValidate(testClient, bucket);
- } catch (IOException e) {
- if (e.getCause() instanceof NoSuchBucketException) {
- throw
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND,
bucket);
- } else {
- throw
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
- ExceptionUtils.getMessageOrToString(e));
- }
- } catch (SdkException e) {
- throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
e, getMessageOrToString(e));
- }
- }
-
- private void doValidate(ICloudClient testClient, String bucket) throws
IOException, AlgebricksException {
- if (staticPath != null) {
- if (isExceedingMaxLength(staticPath,
S3ExternalFileWriter.MAX_LENGTH_IN_BYTES)) {
- throw new
CompilationException(ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH,
pathSourceLocation,
- staticPath, S3ExternalFileWriter.MAX_LENGTH_IN_BYTES,
- ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
- }
-
- if (!testClient.isEmptyPrefix(bucket, staticPath)) {
- // Ensure that the static path is empty
- throw new
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation,
staticPath);
- }
- }
-
- // do not validate write permissions if specified by the user not to
do so
- String validateWritePermissions = configuration
-
.getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION,
Boolean.TRUE.toString());
- if (!Boolean.parseBoolean(validateWritePermissions)) {
- return;
- }
-
- Random random = new Random();
- String pathPrefix = "testFile";
- String path = pathPrefix + random.nextInt();
- while (testClient.exists(bucket, path)) {
- path = pathPrefix + random.nextInt();
- }
-
- long writeValue = random.nextLong();
- byte[] data = new byte[Long.BYTES];
- LongPointable.setLong(data, 0, writeValue);
- ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket,
path);
- CloudResettableInputStream stream = null;
- boolean aborted = false;
- try {
- stream = new CloudResettableInputStream(writer, new
WriterSingleBufferProvider());
- stream.write(data, 0, data.length);
- } catch (HyracksDataException e) {
- stream.abort();
- aborted = true;
- } finally {
- if (stream != null && !aborted) {
- stream.finish();
- stream.close();
- }
- }
-
- try {
- long readValue =
LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
- if (writeValue != readValue) {
- // This should never happen unless S3 is messed up. But log
for sanity check
- LOGGER.warn(
- "The writer can write but the written values wasn't
successfully read back (wrote: {}, read:{})",
- writeValue, readValue);
- }
- } finally {
- // Delete the written file
- testClient.deleteObjects(bucket, Collections.singleton(path));
- }
- }
}
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
similarity index 83%
rename from
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
rename to
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
index 92d7f12ba8..484f372e27 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.FixMethodOrder;
@@ -32,7 +32,7 @@ import org.junit.Test;
import org.junit.runners.MethodSorters;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public abstract class LSMTest {
+public abstract class AbstractLSMTest {
public static final Logger LOGGER = LogManager.getLogger();
public static final String BTREE_SUFFIX = "b";
@@ -53,28 +53,23 @@ public abstract class LSMTest {
@Test
public void a1writeToS3Test() throws IOException {
- CloudResettableInputStream stream = null;
+ IWriteBufferProvider bufferProvider = new
WriterSingleBufferProvider(CLOUD_CLIENT.getWriteBufferSize());
+ ICloudWriter cloudWriter =
+ CLOUD_CLIENT.createdWriter(PLAYGROUND_CONTAINER,
BUCKET_STORAGE_ROOT + "/0_b", bufferProvider);
try {
- ICloudBufferedWriter s3BufferedWriter =
- CLOUD_CLIENT.createBufferedWriter(PLAYGROUND_CONTAINER,
BUCKET_STORAGE_ROOT + "/0_b");
- stream = new CloudResettableInputStream(s3BufferedWriter, new
WriteBufferProvider(1));
ByteBuffer content = createContent(BUFFER_SIZE);
int size = 0;
for (int i = 0; i < 10; i++) {
content.clear();
- size += stream.write(content);
+ size += cloudWriter.write(content);
}
- stream.finish();
+ cloudWriter.finish();
System.err.println(size);
} catch (Exception e) {
e.printStackTrace();
- if (stream != null) {
- stream.abort();
- }
- } finally {
- if (stream != null) {
- stream.close();
+ if (cloudWriter != null) {
+ cloudWriter.abort();
}
}
}
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 87a3e2988f..86bb5ad714 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.cloud.gcs;
-import org.apache.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.AbstractLSMTest;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
import org.junit.AfterClass;
@@ -31,7 +31,7 @@ import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageClass;
import com.google.cloud.storage.StorageOptions;
-public class LSMGCSTest extends LSMTest {
+public class LSMGCSTest extends AbstractLSMTest {
private static Storage client;
private static final int MOCK_SERVER_PORT = 4443;
private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" +
MOCK_SERVER_PORT;
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 0452da0b05..c785e57e8a 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -20,7 +20,7 @@ package org.apache.asterix.cloud.s3;
import java.net.URI;
-import org.apache.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.AbstractLSMTest;
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
import org.junit.AfterClass;
@@ -34,7 +34,7 @@ import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
-public class LSMS3Test extends LSMTest {
+public class LSMS3Test extends AbstractLSMTest {
private static S3Client client;
private static S3Mock s3MockServer;