This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 81abe06c11 ASTERIXDB-3493: The storage engine should use the cloud
storage prefix whilst writing objects to Google Cloud Storage (GCS)
81abe06c11 is described below
commit 81abe06c116208456c42a33f906b8b32db593ff6
Author: Mohammad Nawazish Khan <[email protected]>
AuthorDate: Wed Aug 28 01:38:52 2024 +0530
ASTERIXDB-3493: The storage engine should use the cloud storage prefix
whilst writing objects to Google Cloud Storage (GCS)
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-63369
Details:
When the object store used at the backend is Google Cloud Storage, the
current storage engine implementation does not use
the cloud storage prefix, as a result the database objects are written
directly under the bucket.
After the implementation the expectation is that the database objects would
be written under the prefix directory.
Change-Id: I7b84bf98272581bc96851855d4bd8663780ab611
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../cloud/clients/google/gcs/GCSClientConfig.java | 20 ++++++++++----
.../cloud/clients/google/gcs/GCSCloudClient.java | 32 +++++++++++++---------
.../clients/google/gcs/GCSParallelDownloader.java | 22 +++++++++------
.../org/apache/asterix/cloud/gcs/LSMGCSTest.java | 2 +-
.../external/util/google/gcs/GCSConstants.java | 1 +
5 files changed, 49 insertions(+), 28 deletions(-)
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index fe5dd4db8a..e4e471d96a 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -19,6 +19,7 @@
package org.apache.asterix.cloud.clients.google.gcs;
import static
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+import static
org.apache.asterix.external.util.google.gcs.GCSConstants.STORAGE_PREFIX;
import java.io.IOException;
import java.util.Map;
@@ -42,10 +43,11 @@ public class GCSClientConfig {
private final int readMaxRequestsPerSeconds;
private final int writeMaxRequestsPerSeconds;
private final int writeBufferSize;
+ private final String prefix;
private GCSClientConfig(String region, String endpoint, boolean
anonymousAuth, long profilerLogInterval,
long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int
readMaxRequestsPerSeconds,
- int writeBufferSize) {
+ int writeBufferSize, String prefix) {
this.region = region;
this.endpoint = endpoint;
this.anonymousAuth = anonymousAuth;
@@ -54,18 +56,20 @@ public class GCSClientConfig {
this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
this.writeBufferSize = writeBufferSize;
+ this.prefix = prefix;
}
public GCSClientConfig(String region, String endpoint, boolean
anonymousAuth, long profilerLogInterval,
- int writeBufferSize) {
- this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0,
writeBufferSize);
+ int writeBufferSize, String prefix) {
+ this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0,
writeBufferSize, prefix);
}
public static GCSClientConfig of(CloudProperties cloudProperties) {
return new GCSClientConfig(cloudProperties.getStorageRegion(),
cloudProperties.getStorageEndpoint(),
cloudProperties.isStorageAnonymousAuth(),
cloudProperties.getProfilerLogInterval(),
cloudProperties.getTokenAcquireTimeout(),
cloudProperties.getWriteMaxRequestsPerSecond(),
- cloudProperties.getReadMaxRequestsPerSecond(),
cloudProperties.getWriteBufferSize());
+ cloudProperties.getReadMaxRequestsPerSecond(),
cloudProperties.getWriteBufferSize(),
+ cloudProperties.getStoragePrefix());
}
public static GCSClientConfig of(Map<String, String> configuration, int
writeBufferSize) {
@@ -73,10 +77,10 @@ public class GCSClientConfig {
long profilerLogInterval = 0;
String region = "";
- String prefix = "";
+ String prefix = configuration.getOrDefault(STORAGE_PREFIX, "");
boolean anonymousAuth = false;
- return new GCSClientConfig(region, endPoint, anonymousAuth,
profilerLogInterval, writeBufferSize);
+ return new GCSClientConfig(region, endPoint, anonymousAuth,
profilerLogInterval, writeBufferSize, prefix);
}
public String getRegion() {
@@ -118,4 +122,8 @@ public class GCSClientConfig {
public int getWriteBufferSize() {
return writeBufferSize;
}
+
+ public String getPrefix() {
+ return prefix;
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 010a6bb185..a74e7d390b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -100,20 +100,20 @@ public class GCSCloudClient implements ICloudClient {
@Override
public ICloudWriter createWriter(String bucket, String path,
IWriteBufferProvider bufferProvider) {
- return new GCSWriter(bucket, path, gcsClient, profilerLimiter,
guardian, writeBufferSize);
+ return new GCSWriter(bucket, config.getPrefix() + path, gcsClient,
profilerLimiter, guardian, writeBufferSize);
}
@Override
public Set<CloudFile> listObjects(String bucket, String path,
FilenameFilter filter) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs =
- gcsClient.list(bucket, BlobListOption.prefix(path),
BlobListOption.fields(Storage.BlobField.SIZE));
+ Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + path),
+ BlobListOption.fields(Storage.BlobField.SIZE));
Set<CloudFile> files = new HashSet<>();
for (Blob blob : blobs.iterateAll()) {
if (filter.accept(null,
IoUtil.getFileNameFromPath(blob.getName()))) {
- files.add(CloudFile.of(blob.getName(), blob.getSize()));
+ files.add(CloudFile.of(stripCloudPrefix(blob.getName()),
blob.getSize()));
}
}
return files;
@@ -123,7 +123,7 @@ public class GCSCloudClient implements ICloudClient {
public int read(String bucket, String path, long offset, ByteBuffer
buffer) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- BlobId blobId = BlobId.of(bucket, path);
+ BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
long readTo = offset + buffer.remaining();
int totalRead = 0;
try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
@@ -145,7 +145,7 @@ public class GCSCloudClient implements ICloudClient {
public byte[] readAllBytes(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- BlobId blobId = BlobId.of(bucket, path);
+ BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
try {
return gcsClient.readAllBytes(blobId);
} catch (StorageException e) {
@@ -157,7 +157,7 @@ public class GCSCloudClient implements ICloudClient {
public InputStream getObjectStream(String bucket, String path, long
offset, long length) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset
+ length)) {
+ try (ReadChannel reader = gcsClient.reader(bucket, config.getPrefix()
+ path).limit(offset + length)) {
reader.seek(offset);
return Channels.newInputStream(reader);
} catch (StorageException | IOException e) {
@@ -169,7 +169,7 @@ public class GCSCloudClient implements ICloudClient {
public void write(String bucket, String path, byte[] data) {
guardian.checkWriteAccess(bucket, path);
profilerLimiter.objectWrite();
- BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build();
+ BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() +
path).build();
gcsClient.create(blobInfo, data);
}
@@ -177,7 +177,7 @@ public class GCSCloudClient implements ICloudClient {
public void copy(String bucket, String srcPath, FileReference destPath) {
guardian.checkReadAccess(bucket, srcPath);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(srcPath));
+ Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + srcPath));
for (Blob blob : blobs.iterateAll()) {
profilerLimiter.objectCopy();
BlobId source = blob.getBlobId();
@@ -200,7 +200,7 @@ public class GCSCloudClient implements ICloudClient {
while (pathIter.hasNext()) {
batchRequest = gcsClient.batch();
for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
- BlobId blobId = BlobId.of(bucket, pathIter.next());
+ BlobId blobId = BlobId.of(bucket, config.getPrefix() +
pathIter.next());
guardian.checkWriteAccess(bucket, blobId.getName());
batchRequest.delete(blobId);
}
@@ -214,7 +214,8 @@ public class GCSCloudClient implements ICloudClient {
public long getObjectSize(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob = gcsClient.get(bucket, path,
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+ Blob blob =
+ gcsClient.get(bucket, config.getPrefix() + path,
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
if (blob == null) {
return 0;
}
@@ -225,7 +226,8 @@ public class GCSCloudClient implements ICloudClient {
public boolean exists(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob = gcsClient.get(bucket, path,
Storage.BlobGetOption.fields(Storage.BlobField.values()));
+ Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
+ Storage.BlobGetOption.fields(Storage.BlobField.values()));
return blob != null && blob.exists();
}
@@ -233,7 +235,7 @@ public class GCSCloudClient implements ICloudClient {
public boolean isEmptyPrefix(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path));
+ Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + path));
return !blobs.hasNextPage();
}
@@ -278,4 +280,8 @@ public class GCSCloudClient implements ICloudClient {
}
return builder.build().getService();
}
+
+ private String stripCloudPrefix(String objectName) {
+ return objectName.substring(config.getPrefix().length());
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0994cea2c7..0d301208c4 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -56,6 +56,7 @@ public class GCSParallelDownloader implements
IParallelDownloader {
private final Storage gcsClient;
private final TransferManager transferManager;
private final IRequestProfilerLimiter profiler;
+ private final GCSClientConfig config;
public GCSParallelDownloader(String bucket, IOManager ioManager,
GCSClientConfig config,
IRequestProfilerLimiter profiler) throws HyracksDataException {
@@ -70,18 +71,21 @@ public class GCSParallelDownloader implements
IParallelDownloader {
this.gcsClient = builder.build().getService();
this.transferManager =
TransferManagerConfig.newBuilder().setStorageOptions(builder.build()).build().getService();
+ this.config = config;
}
@Override
public void downloadFiles(Collection<FileReference> toDownload) throws
HyracksDataException {
- ParallelDownloadConfig.Builder config =
ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+ ParallelDownloadConfig.Builder downConfig =
+
ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
+
Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
try {
for (FileReference fileReference : toDownload) {
profiler.objectGet();
FileUtils.createParentDirectories(fileReference.getFile());
- addToMap(pathListMap,
fileReference.getDeviceHandle().getMount().toPath(),
- BlobInfo.newBuilder(BlobId.of(bucket,
fileReference.getRelativePath())).build());
+ addToMap(pathListMap,
fileReference.getDeviceHandle().getMount().toPath(), BlobInfo
+ .newBuilder(BlobId.of(bucket, config.getPrefix() +
fileReference.getRelativePath())).build());
}
} catch (IOException e) {
throw HyracksDataException.create(e);
@@ -89,7 +93,7 @@ public class GCSParallelDownloader implements
IParallelDownloader {
List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
- config.setDownloadDirectory(entry.getKey()).build()));
+ downConfig.setDownloadDirectory(entry.getKey()).build()));
}
downloadJobs.forEach(DownloadJob::getDownloadResults);
}
@@ -98,20 +102,22 @@ public class GCSParallelDownloader implements
IParallelDownloader {
public Collection<FileReference>
downloadDirectories(Collection<FileReference> toDownload)
throws HyracksDataException {
Set<FileReference> failedFiles = new HashSet<>();
- ParallelDownloadConfig.Builder config =
ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+ ParallelDownloadConfig.Builder config =
+
ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
for (FileReference fileReference : toDownload) {
profiler.objectMultipartDownload();
- Page<Blob> blobs = gcsClient.list(bucket,
Storage.BlobListOption.prefix(fileReference.getRelativePath()));
+ Page<Blob> blobs = gcsClient.list(bucket,
+ Storage.BlobListOption.prefix(this.config.getPrefix() +
fileReference.getRelativePath()));
for (Blob blob : blobs.iterateAll()) {
addToMap(pathListMap,
fileReference.getDeviceHandle().getMount().toPath(), blob.asBlobInfo());
}
}
List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
- downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
- config.setDownloadDirectory(entry.getKey()).build()));
+ ParallelDownloadConfig parallelDownloadConfig =
config.setDownloadDirectory(entry.getKey()).build();
+ downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
parallelDownloadConfig));
}
List<DownloadResult> results;
for (DownloadJob job : downloadJobs) {
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 09cc3f6112..08864acbcb 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -51,7 +51,7 @@ public class LSMGCSTest extends AbstractLSMTest {
LOGGER.info("Client created successfully");
int writeBufferSize = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
GCSClientConfig config =
- new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME,
true, 0, writeBufferSize);
+ new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME,
true, 0, writeBufferSize, "");
CLOUD_CLIENT = new GCSCloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index f2dbde7f80..6314ce8530 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -26,6 +26,7 @@ public class GCSConstants {
public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME =
"applicationDefaultCredentials";
public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
public static final String ENDPOINT_FIELD_NAME = "endpoint";
+ public static final String STORAGE_PREFIX = "prefix";
/*
* Hadoop internal configuration