This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f55c9e58a8b add google as external storage for msq export (#16051)
f55c9e58a8b is described below

commit f55c9e58a8b7ead515cc8c6200d9c505853d1c19
Author: Parag Jain <[email protected]>
AuthorDate: Fri Apr 5 12:10:10 2024 +0530

    add google as external storage for msq export (#16051)
    
    Support for exporting msq results to gcs bucket. This is essentially 
copying the logic of s3 export for gs, originally done by @adarshsanjeev in 
this PR - #15689
---
 docs/multi-stage-query/reference.md                |  33 +++++
 .../google/GoogleCloudStorageInputSource.java      |   2 +-
 .../storage/google/output/GoogleExportConfig.java  |  72 ++++++++++
 .../google/output/GoogleExportStorageProvider.java | 149 +++++++++++++++++++++
 .../output/GoogleStorageConnectorModule.java       |   8 +-
 .../output/GoogleExportStorageProviderTest.java    |  73 ++++++++++
 website/.spelling                                  |   1 +
 7 files changed, 335 insertions(+), 3 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 0b10e14b50f..19b1740b9d4 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -150,6 +150,39 @@ The following runtime parameters must be configured to 
export into an S3 destina
 | `druid.export.storage.s3.maxRetry`           | No       | Defines the max 
number times to attempt S3 API calls to avoid failures due to transient errors. 
                                                                                
                                                     | 10  |
 | `druid.export.storage.s3.chunkSize`          | No       | Defines the size 
of each chunk to temporarily store in `tempDir`. The chunk size must be between 
5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it 
requires more disk space to store the temporary chunks. | 100MiB |
 
+
+##### GS
+
+Export results to GCS by passing the function `google()` as an argument to the 
`EXTERN` function. Note that this requires the `druid-google-extensions`.
+The `google()` function is a Druid function that configures the connection. 
Arguments for `google()` should be passed as named parameters with the value in 
single quotes like the following example:
+
+```sql
+INSERT INTO
+  EXTERN(
+    google(bucket => 'your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments for the function:
+
+| Parameter   | Required | Description                                         
                                                                                
                                                                                
                                                               | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The GS bucket to which the files are exported to. 
The bucket and prefix combination should be whitelisted in 
`druid.export.storage.google.allowedExportPaths`.                               
                                                                                
      | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The 
export query expects the destination to be empty. If the location includes 
other files, then the query will fail. The bucket and prefix combination should 
be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a     |
+
+The following runtime parameters must be configured to export into a GCS 
destination:
+
+| Runtime Parameter                                | Required | Description    
                                                                                
                                                                                
                                                   | Default |
+|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `druid.export.storage.google.tempLocalDir`       | Yes      | Directory used 
on the local storage of the worker to store temporary files required while 
uploading the data.                                                             
                                                        | n/a     |
+| `druid.export.storage.google.allowedExportPaths` | Yes      | An array of GS 
prefixes that are allowed as export destinations. Export queries fail if the 
export destination does not match any of the configured prefixes. Example: 
`[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a     |
+| `druid.export.storage.google.maxRetry`           | No       | Defines the 
max number times to attempt GS API calls to avoid failures due to transient 
errors.                                                                         
                                                          | 10      |
+| `druid.export.storage.google.chunkSize`          | No       | Defines the 
size of each chunk to temporarily store in `tempDir`. A large chunk size 
reduces the API calls to GS; however, it requires more disk space to store the 
temporary chunks.                                              | 4MiB    |
+
 ##### LOCAL
 
 You can export to the local storage, which exports the results to the 
filesystem of the MSQ worker.
diff --git 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
index b99e8a36df9..c241d02fc42 100644
--- 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
+++ 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
@@ -50,7 +50,7 @@ import java.util.Set;
 
 public class GoogleCloudStorageInputSource extends CloudObjectInputSource
 {
-  static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME;
+  public static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME;
   private static final Logger LOG = new 
Logger(GoogleCloudStorageInputSource.class);
 
   private final GoogleStorage storage;
diff --git 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportConfig.java
 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportConfig.java
new file mode 100644
index 00000000000..ccb38851ec4
--- /dev/null
+++ 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class GoogleExportConfig
+{
+  @JsonProperty("tempLocalDir")
+  private final String tempLocalDir;
+  @JsonProperty("chunkSize")
+  private final HumanReadableBytes chunkSize;
+  @JsonProperty("maxRetry")
+  private final Integer maxRetry;
+  @JsonProperty("allowedExportPaths")
+  private final List<String> allowedExportPaths;
+
+  @JsonCreator
+  public GoogleExportConfig(
+      @JsonProperty("tempLocalDir") final String tempLocalDir,
+      @JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize,
+      @JsonProperty("maxRetry") @Nullable final Integer maxRetry,
+      @JsonProperty("allowedExportPaths") final List<String> 
allowedExportPaths)
+  {
+    this.tempLocalDir = tempLocalDir;
+    this.chunkSize = chunkSize;
+    this.maxRetry = maxRetry;
+    this.allowedExportPaths = allowedExportPaths;
+  }
+
+  public String getTempLocalDir()
+  {
+    return tempLocalDir;
+  }
+
+  public HumanReadableBytes getChunkSize()
+  {
+    return chunkSize;
+  }
+
+  public Integer getMaxRetry()
+  {
+    return maxRetry;
+  }
+
+  public List<String> getAllowedExportPaths()
+  {
+    return allowedExportPaths;
+  }
+}
diff --git 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java
 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java
new file mode 100644
index 00000000000..480b80e118a
--- /dev/null
+++ 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageDruidModule;
+
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+
+@JsonTypeName(GoogleExportStorageProvider.TYPE_NAME)
+public class GoogleExportStorageProvider implements ExportStorageProvider
+{
+  public static final String TYPE_NAME = 
GoogleCloudStorageInputSource.TYPE_KEY;
+  @JsonProperty
+  private final String bucket;
+  @JsonProperty
+  private final String prefix;
+
+  @JacksonInject
+  GoogleExportConfig googleExportConfig;
+  @JacksonInject
+  GoogleStorage googleStorage;
+  @JacksonInject
+  GoogleInputDataConfig googleInputDataConfig;
+
+  @JsonCreator
+  public GoogleExportStorageProvider(
+      @JsonProperty(value = "bucket", required = true) String bucket,
+      @JsonProperty(value = "prefix", required = true) String prefix
+  )
+  {
+    this.bucket = bucket;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    final String tempDir = googleExportConfig.getTempLocalDir();
+    if (tempDir == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build("The runtime property 
`druid.export.storage.google.tempLocalDir` must be configured for GCS export.");
+    }
+    final List<String> allowedExportPaths = 
googleExportConfig.getAllowedExportPaths();
+    if (allowedExportPaths == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build(
+                              "The runtime property 
`druid.export.storage.google.allowedExportPaths` must be configured for GCS 
export.");
+    }
+    validatePrefix(allowedExportPaths, bucket, prefix);
+    final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig(
+        bucket,
+        prefix,
+        new File(tempDir),
+        googleExportConfig.getChunkSize(),
+        googleExportConfig.getMaxRetry()
+    );
+    return new GoogleStorageConnector(googleOutputConfig, googleStorage, 
googleInputDataConfig);
+  }
+
+  @VisibleForTesting
+  static void validatePrefix(@NotNull final List<String> allowedExportPaths, 
final String bucket, final String prefix)
+  {
+    final URI providedUri = new CloudObjectLocation(bucket, 
prefix).toUri(GoogleStorageDruidModule.SCHEME_GS);
+    for (final String path : allowedExportPaths) {
+      final URI allowedUri = URI.create(path);
+      if (validateUri(allowedUri, providedUri)) {
+        return;
+      }
+    }
+    throw DruidException.forPersona(DruidException.Persona.USER)
+                        .ofCategory(DruidException.Category.INVALID_INPUT)
+                        .build("None of the allowed prefixes matched the input 
path [%s]. "
+                               + "Please reach out to the cluster admin for 
the whitelisted paths for export. "
+                               + "The paths are controlled via the property 
`druid.export.storage.google.allowedExportPaths`.",
+                               providedUri);
+  }
+
+  private static boolean validateUri(final URI allowedUri, final URI 
providedUri)
+  {
+    if (!allowedUri.getHost().equals(providedUri.getHost())) {
+      return false;
+    }
+    final String allowedPath = 
StringUtils.maybeAppendTrailingSlash(allowedUri.getPath());
+    final String providedPath = 
StringUtils.maybeAppendTrailingSlash(providedUri.getPath());
+    return providedPath.startsWith(allowedPath);
+  }
+
+  @JsonProperty("bucket")
+  public String getBucket()
+  {
+    return bucket;
+  }
+
+  @JsonProperty("prefix")
+  public String getPrefix()
+  {
+    return prefix;
+  }
+
+  @Override
+  @JsonIgnore
+  public String getResourceType()
+  {
+    return TYPE_NAME;
+  }
+
+  @Override
+  @JsonIgnore
+  public String getBasePath()
+  {
+    return new CloudObjectLocation(bucket, 
prefix).toUri(GoogleStorageDruidModule.SCHEME_GS).toString();
+  }
+}
diff --git 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
index cba33b5804c..452ccb1524a 100644
--- 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
+++ 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
@@ -22,6 +22,7 @@ package org.apache.druid.storage.google.output;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.inject.Binder;
+import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.initialization.DruidModule;
 
 import java.util.Collections;
@@ -33,12 +34,15 @@ public class GoogleStorageConnectorModule implements 
DruidModule
   public List<? extends Module> getJacksonModules()
   {
     return Collections.singletonList(
-        new 
SimpleModule(this.getClass().getSimpleName()).registerSubtypes(GoogleStorageConnectorProvider.class));
+        new SimpleModule(this.getClass().getSimpleName())
+                .registerSubtypes(GoogleStorageConnectorProvider.class)
+                .registerSubtypes(GoogleExportStorageProvider.class)
+    );
   }
 
   @Override
   public void configure(Binder binder)
   {
-
+    JsonConfigProvider.bind(binder, "druid.export.storage.google", 
GoogleExportConfig.class);
   }
 }
diff --git 
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java
 
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java
new file mode 100644
index 00000000000..e40846a848d
--- /dev/null
+++ 
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.storage.StorageConnector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class GoogleExportStorageProviderTest
+{
+  private final List<String> validPrefixes = ImmutableList.of(
+      "gs://bucket-name/validPath1",
+      "gs://bucket-name/validPath2"
+  );
+
+  @Test
+  public void testGoogleExportStorageProvider()
+  {
+    GoogleExportStorageProvider googleExportStorageProvider = new 
GoogleExportStorageProvider("bucket-name", "validPath1");
+    googleExportStorageProvider.googleExportConfig = new 
GoogleExportConfig("tempLocalDir", null, null, validPrefixes);
+    StorageConnector storageConnector = googleExportStorageProvider.get();
+    Assert.assertNotNull(storageConnector);
+    Assert.assertTrue(storageConnector instanceof GoogleStorageConnector);
+
+    Assert.assertEquals("gs://bucket-name/validPath1", 
googleExportStorageProvider.getBasePath());
+  }
+
+  @Test
+  public void testValidatePaths()
+  {
+    GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", 
"validPath1/");
+    GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", 
"validPath1");
+    GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", 
"validPath1/validSubPath/");
+
+    
GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"),
 "bucket-name", "");
+    
GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"),
 "bucket-name", "validPath");
+    GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", 
"validPath1/../validPath2/");
+
+    Assert.assertThrows(
+        DruidException.class,
+        () -> GoogleExportStorageProvider.validatePrefix(validPrefixes, 
"incorrect-bucket", "validPath1/")
+    );
+    Assert.assertThrows(
+        DruidException.class,
+        () -> GoogleExportStorageProvider.validatePrefix(validPrefixes, 
"bucket-name", "invalidPath1")
+    );
+    Assert.assertThrows(
+        DruidException.class,
+        () -> GoogleExportStorageProvider.validatePrefix(validPrefixes, 
"bucket-name", "validPath123")
+    );
+  }
+}
diff --git a/website/.spelling b/website/.spelling
index 0eaf3b2f0f3..e4c23aa7a44 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -21,6 +21,7 @@
 1M
 100MiB
 32-bit
+4MiB
 500MiB
 64-bit
 ACL


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to