This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2978d6af699 add support for GCS warehouses in iceberg (#19137)
2978d6af699 is described below
commit 2978d6af699df2cac4ae37d6d0c8e15679b809dd
Author: Ben Smithgall <[email protected]>
AuthorDate: Wed Mar 25 21:03:03 2026 -0400
add support for GCS warehouses in iceberg (#19137)
Add `GoogleCloudStorageInputSourceFactory` to allow reading Iceberg data
files from `gs://` paths with `"warehouseSource": "google"`. Add
`iceberg-gcp` and `google-cloud-storage` dependencies to iceberg
extension.
---
docs/development/extensions-contrib/iceberg.md | 29 +++-
.../druid-iceberg-extensions/pom.xml | 18 +++
.../GoogleCloudStorageInputSourceFactory.java | 73 ++++++++++
.../storage/google/GoogleStorageDruidModule.java | 4 +-
.../GoogleCloudStorageInputSourceFactoryTest.java | 162 +++++++++++++++++++++
website/.spelling | 4 +
6 files changed, 288 insertions(+), 2 deletions(-)
diff --git a/docs/development/extensions-contrib/iceberg.md
b/docs/development/extensions-contrib/iceberg.md
index f3eed056422..bbb98101e5f 100644
--- a/docs/development/extensions-contrib/iceberg.md
+++ b/docs/development/extensions-contrib/iceberg.md
@@ -46,7 +46,7 @@ The `druid-iceberg-extensions` extension relies on the
existing input source con
For Druid to seamlessly talk to the Hive metastore, ensure that the Hive
configuration files such as `hive-site.xml` and `core-site.xml` are available
in the Druid classpath for peon processes.
You can also specify Hive properties under the `catalogProperties` object in
the ingestion spec.
-The `druid-iceberg-extensions` extension presently only supports HDFS, S3 and
local warehouse directories.
+The `druid-iceberg-extensions` extension presently supports HDFS, S3, GCS, and
local warehouse directories.
### Read from HDFS warehouse
@@ -106,6 +106,33 @@ The following properties are required in the
`catalogProperties`:
```
Since the Hadoop AWS connector uses the `s3a` filesystem client, specify the
warehouse path with the `s3a://` protocol instead of `s3://`.
+### Read from GCS warehouse
+
+To read from a GCS warehouse, load the `druid-google-extensions` extension.
Druid extracts the
+data file paths from the catalog and uses `GoogleCloudStorageInputSource` to
ingest these files.
+Set the `type` property of the `warehouseSource` object to `google` in the
ingestion spec:
+
+```json
+"warehouseSource": {
+ "type": "google"
+}
+```
+
+For the Iceberg catalog to read its own metadata files from GCS, set `io-impl`
to
+`org.apache.iceberg.gcp.gcs.GCSFileIO` in `catalogProperties`:
+
+```json
+"catalogProperties": {
+ "io-impl": "org.apache.iceberg.gcp.gcs.GCSFileIO",
+ "gcs.project-id": "my-gcp-project",
+ "warehouse": "gs://my-bucket/warehouse"
+}
+```
+
+Authentication uses Application Default Credentials (ADC). Ensure that the
Druid processes
+have access to valid GCP credentials (e.g., via a service account key file,
workload identity,
+or metadata server).
+
## Local catalog
The local catalog type can be used for catalogs configured on the local
filesystem. Set the `icebergCatalog` type to `local`. You can use this catalog
for demos or localized tests. It is not recommended for production use cases.
diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml
b/extensions-contrib/druid-iceberg-extensions/pom.xml
index 56e4cd917ec..ec4f7c04b7c 100644
--- a/extensions-contrib/druid-iceberg-extensions/pom.xml
+++ b/extensions-contrib/druid-iceberg-extensions/pom.xml
@@ -229,6 +229,10 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.re2j</groupId>
+ <artifactId>re2j</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -297,6 +301,18 @@
<version>${aws.sdk.v2.version}</version>
</dependency>
+ <!-- Iceberg GCSFileIO for reading metadata from GCS -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-gcp</artifactId>
+ <version>${iceberg.core.version}</version>
+ </dependency>
+ <!-- Required: iceberg-gcp declares google-cloud-storage as compileOnly
(not transitive) -->
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-storage</artifactId>
+ <version>${com.google.cloud.storage.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hive</groupId>
@@ -780,6 +796,8 @@
<ignoredUnusedDeclaredDependency>software.amazon.awssdk:glue</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>software.amazon.awssdk:s3</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>software.amazon.awssdk:sts</ignoredUnusedDeclaredDependency>
+
<ignoredUnusedDeclaredDependency>org.apache.iceberg:iceberg-gcp</ignoredUnusedDeclaredDependency>
+
<ignoredUnusedDeclaredDependency>com.google.cloud:google-cloud-storage</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
diff --git
a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceFactory.java
b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceFactory.java
new file mode 100644
index 00000000000..162d05747ca
--- /dev/null
+++
b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.google;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.data.input.InputSourceFactory;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.data.input.impl.systemfield.SystemFields;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class GoogleCloudStorageInputSourceFactory implements InputSourceFactory
+{
+ private final GoogleStorage storage;
+ private final GoogleInputDataConfig inputDataConfig;
+
+ @JsonCreator
+ public GoogleCloudStorageInputSourceFactory(
+ @JacksonInject GoogleStorage storage,
+ @JacksonInject GoogleInputDataConfig inputDataConfig
+ )
+ {
+ this.storage = storage;
+ this.inputDataConfig = inputDataConfig;
+ }
+
+ @Override
+ public SplittableInputSource create(List<String> inputFilePaths)
+ {
+ final List<URI> uris = inputFilePaths.stream().map(chosenPath -> {
+ try {
+ return new URI(chosenPath);
+ }
+ catch (URISyntaxException e) {
+ throw new IAE(e, "Invalid input file path [%s]", chosenPath);
+ }
+ }).collect(Collectors.toList());
+
+ return new GoogleCloudStorageInputSource(
+ storage,
+ inputDataConfig,
+ uris,
+ null,
+ null,
+ null,
+ SystemFields.none()
+ );
+ }
+}
diff --git
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
index 070a2878be9..d951abde6b2 100644
---
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
+++
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
@@ -32,6 +32,7 @@ import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
+import org.apache.druid.data.input.google.GoogleCloudStorageInputSourceFactory;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -74,7 +75,8 @@ public class GoogleStorageDruidModule implements DruidModule
}
},
new SimpleModule().registerSubtypes(
- new NamedType(GoogleCloudStorageInputSource.class, SCHEME)
+ new NamedType(GoogleCloudStorageInputSource.class, SCHEME),
+ new NamedType(GoogleCloudStorageInputSourceFactory.class, SCHEME)
)
);
}
diff --git
a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceFactoryTest.java
b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceFactoryTest.java
new file mode 100644
index 00000000000..eb012c5d1cc
--- /dev/null
+++
b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceFactoryTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.google;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputSourceFactory;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageDruidModule;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class GoogleCloudStorageInputSourceFactoryTest
+{
+ private static final GoogleStorage STORAGE =
EasyMock.createMock(GoogleStorage.class);
+ private static final GoogleInputDataConfig INPUT_DATA_CONFIG =
EasyMock.createMock(GoogleInputDataConfig.class);
+
+ private static ObjectMapper createObjectMapper()
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ new
GoogleStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
+ mapper.setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(GoogleStorage.class, STORAGE)
+ .addValue(GoogleInputDataConfig.class, INPUT_DATA_CONFIG)
+ );
+ return mapper;
+ }
+
+ @Test
+ public void testCreateReturnsGoogleCloudStorageInputSource()
+ {
+ final GoogleCloudStorageInputSourceFactory factory = new
GoogleCloudStorageInputSourceFactory(
+ STORAGE,
+ INPUT_DATA_CONFIG
+ );
+ final List<String> paths = Arrays.asList(
+ "gs://foo/bar/file.csv",
+ "gs://bar/foo/file2.csv"
+ );
+
+ final SplittableInputSource inputSource = factory.create(paths);
+ Assert.assertTrue(inputSource instanceof GoogleCloudStorageInputSource);
+ }
+
+ @Test
+ public void testCreateWithMultiplePaths()
+ {
+ final GoogleCloudStorageInputSourceFactory factory = new
GoogleCloudStorageInputSourceFactory(
+ STORAGE,
+ INPUT_DATA_CONFIG
+ );
+ final List<String> paths = Arrays.asList(
+ "gs://foo/bar/file.csv",
+ "gs://bar/foo/file2.csv",
+ "gs://baz/qux/file3.txt"
+ );
+
+ final GoogleCloudStorageInputSource inputSource =
(GoogleCloudStorageInputSource) factory.create(paths);
+ Assert.assertNotNull(inputSource.getUris());
+ Assert.assertEquals(3, inputSource.getUris().size());
+ Assert.assertEquals(URI.create("gs://foo/bar/file.csv"),
inputSource.getUris().get(0));
+ Assert.assertEquals(URI.create("gs://bar/foo/file2.csv"),
inputSource.getUris().get(1));
+ Assert.assertEquals(URI.create("gs://baz/qux/file3.txt"),
inputSource.getUris().get(2));
+ }
+
+ @Test
+ public void testCreatePreservesGsScheme()
+ {
+ final GoogleCloudStorageInputSourceFactory factory = new
GoogleCloudStorageInputSourceFactory(
+ STORAGE,
+ INPUT_DATA_CONFIG
+ );
+ final List<String> paths =
Collections.singletonList("gs://bucket/path/to/file.csv");
+
+ final GoogleCloudStorageInputSource inputSource =
(GoogleCloudStorageInputSource) factory.create(paths);
+ final URI uri = inputSource.getUris().get(0);
+ Assert.assertEquals("gs", uri.getScheme());
+ Assert.assertEquals("bucket", uri.getHost());
+ Assert.assertEquals("/path/to/file.csv", uri.getPath());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateWithEmptyListThrows()
+ {
+ final GoogleCloudStorageInputSourceFactory factory = new
GoogleCloudStorageInputSourceFactory(
+ STORAGE,
+ INPUT_DATA_CONFIG
+ );
+ factory.create(Collections.emptyList());
+ }
+
+ @Test
+ public void testCreateWithEncodedPaths()
+ {
+ final GoogleCloudStorageInputSourceFactory factory = new
GoogleCloudStorageInputSourceFactory(
+ STORAGE,
+ INPUT_DATA_CONFIG
+ );
+ final List<String> paths =
Collections.singletonList("gs://bucket/path%20with%20spaces/file.csv");
+
+ final GoogleCloudStorageInputSource inputSource =
(GoogleCloudStorageInputSource) factory.create(paths);
+ final URI uri = inputSource.getUris().get(0);
+ Assert.assertEquals("gs://bucket/path%20with%20spaces/file.csv",
uri.toString());
+ }
+
+ @Test
+ public void testJacksonDeserialization() throws Exception
+ {
+ final ObjectMapper mapper = createObjectMapper();
+
+ final String json = "{\"type\": \"google\"}";
+ final InputSourceFactory factory = mapper.readValue(json,
InputSourceFactory.class);
+ Assert.assertTrue(factory instanceof GoogleCloudStorageInputSourceFactory);
+ }
+
+ @Test
+ public void testSerializationRoundTrip() throws Exception
+ {
+ final ObjectMapper mapper = createObjectMapper();
+
+ final GoogleCloudStorageInputSourceFactory original = new
GoogleCloudStorageInputSourceFactory(
+ STORAGE,
+ INPUT_DATA_CONFIG
+ );
+
+ final String json = mapper.writeValueAsString(original);
+ Assert.assertTrue(json.contains("\"type\":\"google\""));
+
+ final GoogleCloudStorageInputSourceFactory deserialized = mapper.readValue(
+ json,
+ GoogleCloudStorageInputSourceFactory.class
+ );
+ Assert.assertNotNull(deserialized);
+ }
+}
diff --git a/website/.spelling b/website/.spelling
index 703046dc335..f5d3ddf6714 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -29,6 +29,7 @@ docusaurus
64-bit
ACL
ACLs
+ADC
APIs
apache.org
AvroStorage
@@ -114,6 +115,7 @@ Float.POSITIVE_INFINITY.
ForwardedRequestCustomizer
GC
GPG
+GoogleCloudStorageInputSource
GSSAPI
GUIs
GroupBy
@@ -1137,7 +1139,9 @@ POD_NAMESPACE
ConfigMap
ConfigMaps
PT17S
+GCP
GCS
+GCSFileIO
gcs-connector
hdfs
Aotearoa
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]