This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a41229b24d8 [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL
a41229b24d8 is described below

commit a41229b24d82e8c561350c42d8a98dfb865c3f69
Author: Patrick Lucas <m...@patricklucas.com>
AuthorDate: Thu Nov 30 10:27:14 2023 +0100

    [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL
---
 .../apache/flink/fs/gs/GSFileSystemFactory.java    | 15 ++++++--
 .../org/apache/flink/fs/gs/utils/ConfigUtils.java  | 10 ++++++
 .../flink/fs/gs/GSFileSystemFactoryTest.java       | 41 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
index 052b6aaddc3..61937dc1551 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.fs.gs;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
@@ -92,11 +93,16 @@ public class GSFileSystemFactory implements 
FileSystemFactory {
         this.fileSystemOptions = new GSFileSystemOptions(flinkConfig);
         LOGGER.info("Using file system options {}", fileSystemOptions);
 
-        // get storage credentials and construct Storage instance
+        StorageOptions.Builder storageOptionsBuilder = 
StorageOptions.newBuilder();
+
+        // get storage credentials
         Optional<GoogleCredentials> credentials =
                 ConfigUtils.getStorageCredentials(hadoopConfig, configContext);
-        StorageOptions.Builder storageOptionsBuilder = 
StorageOptions.newBuilder();
         credentials.ifPresent(storageOptionsBuilder::setCredentials);
+
+        // override the GCS root URL only if overridden in the Hadoop config
+        
ConfigUtils.getGcsRootUrl(hadoopConfig).ifPresent(storageOptionsBuilder::setHost);
+
         this.storage = storageOptionsBuilder.build().getService();
     }
 
@@ -123,6 +129,11 @@ public class GSFileSystemFactory implements 
FileSystemFactory {
         return new GSFileSystem(googleHadoopFileSystem, storage, 
fileSystemOptions);
     }
 
+    @VisibleForTesting
+    Storage getStorage() {
+        return storage;
+    }
+
     /** Config context implementation used at runtime. */
     private static class RuntimeConfigContext implements 
ConfigUtils.ConfigContext {
 
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java
index 6670e3d2f1d..cdb9bfffbdd 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.util.HadoopConfigLoader;
 
 import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +32,7 @@ import java.io.StringWriter;
 import java.io.Writer;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.function.BiFunction;
 
 /** Utilities class for configuration of Hadoop and Google Storage. */
 public class ConfigUtils {
@@ -156,6 +158,14 @@ public class ConfigUtils {
         }
     }
 
+    public static Optional<String> getGcsRootUrl(
+            org.apache.hadoop.conf.Configuration hadoopConfig) {
+        // Ignore the default value, only returning a value if actually 
included in the config
+        BiFunction<String, String, String> getterFn = (key, defaultValue) -> 
hadoopConfig.get(key);
+        String value = 
GoogleHadoopFileSystemConfiguration.GCS_ROOT_URL.get(hadoopConfig, getterFn);
+        return Optional.ofNullable(value);
+    }
+
     /**
      * Helper to serialize a Hadoop config to a string, for logging.
      *
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemFactoryTest.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemFactoryTest.java
new file mode 100644
index 00000000000..44aa266804f
--- /dev/null
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link GSFileSystemFactory}. */
+public class GSFileSystemFactoryTest {
+
+    @Test
+    public void testOverrideStorageRootUrl() {
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.setString("gs.storage.root.url", "http://240.0.0.0:12345";);
+
+        GSFileSystemFactory factory = new GSFileSystemFactory();
+        factory.configure(flinkConfig);
+
+        String gsStorageClientHost = 
factory.getStorage().getOptions().getHost();
+        assertEquals(gsStorageClientHost, "http://240.0.0.0:12345";);
+    }
+}

Reply via email to