galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r633099616



##########
File path: flink-filesystems/flink-gs-fs-hadoop/pom.xml
##########
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-filesystems</artifactId>
+               <version>1.13-SNAPSHOT</version>

Review comment:
       Done.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** Abstract blob storage, used to simplify interface to Google storage and 
make it mockable. */
+public interface BlobStorage {
+
+    /** Abstract blob metadata. */
+    interface BlobMetadata {
+
+        /**
+         * The crc32 checksum for the blob.
+         *
+         * @return The checksum
+         */
+        String getChecksum();
+    }
+
+    /** Abstract blob write channel. */
+    interface WriteChannel {
+
+        /**
+         * Sets the chunk size for upload.
+         *
+         * @param chunkSize The chunk size
+         */
+        void setChunkSize(int chunkSize);
+
+        /**
+         * Writes data to the channel.
+         *
+         * @param content The data buffer
+         * @param start Start offset in the data buffer
+         * @param length Number of bytes to write
+         * @return The number of bytes written
+         * @throws IOException On underlying failure
+         */
+        int write(byte[] content, int start, int length) throws IOException;
+
+        /**
+         * Closes the channel.
+         *
+         * @throws IOException On underlying failure
+         */
+        void close() throws IOException;
+    }
+
+    /**
+     * Creates a write channel.
+     *
+     * @param blobId The blob id to write
+     * @param uploadContentType The content type for the upload
+     * @return The WriteChannel helper
+     */
+    WriteChannel write(BlobId blobId, String uploadContentType);

Review comment:
       Done in 
[df17222](https://github.com/apache/flink/pull/15599/commits/df17222b8b4e1cea268da7589e8cb808535d7dd8).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** Abstract blob storage, used to simplify interface to Google storage and 
make it mockable. */
+public interface BlobStorage {
+
+    /** Abstract blob metadata. */
+    interface BlobMetadata {

Review comment:
       Done in 
[df17222](https://github.com/apache/flink/pull/15599/commits/df17222b8b4e1cea268da7589e8cb808535d7dd8).

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** BlobStorage implementation for Google storage. */
+public class GSBlobStorage implements BlobStorage {
+
+    /** Blob metadata, wraps Google storage Blob. */
+    static class BlobMetadata implements BlobStorage.BlobMetadata {
+
+        private final Blob blob;
+
+        private BlobMetadata(Blob blob) {
+            this.blob = Preconditions.checkNotNull(blob);
+        }
+
+        @Override
+        public String getChecksum() {
+            return blob.getCrc32c();
+        }
+    }
+
+    /** Blob write channel, wraps Google storage WriteChannel. */
+    static class WriteChannel implements BlobStorage.WriteChannel {
+
+        final com.google.cloud.WriteChannel writeChannel;
+
+        private WriteChannel(com.google.cloud.WriteChannel writeChannel) {
+            this.writeChannel = Preconditions.checkNotNull(writeChannel);
+        }
+
+        @Override
+        public void setChunkSize(int chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0);
+
+            writeChannel.setChunkSize(chunkSize);
+        }
+
+        @Override
+        public int write(byte[] content, int start, int length) throws 
IOException {
+            Preconditions.checkNotNull(content);
+            Preconditions.checkArgument(start >= 0);
+            Preconditions.checkArgument(length >= 0);
+
+            ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
+            return writeChannel.write(byteBuffer);
+        }
+
+        @Override
+        public void close() throws IOException {
+            writeChannel.close();
+        }
+    }
+
+    /** The wrapped Google Storage instance. */
+    private final Storage storage;
+
+    /**
+     * Constructs a GSBlobStorage instance.
+     *
+     * @param storage The wrapped Google Storage instance.
+     */
+    public GSBlobStorage(Storage storage) {
+        this.storage = Preconditions.checkNotNull(storage);
+    }
+
+    @Override
+    public BlobStorage.WriteChannel write(BlobId blobId, String contentType) {
+        Preconditions.checkNotNull(blobId);
+        Preconditions.checkNotNull(contentType);
+
+        BlobInfo blobInfo = 
BlobInfo.newBuilder(blobId).setContentType(contentType).build();
+        com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+        return new WriteChannel(writeChannel);
+    }
+
+    @Override
+    public Optional<BlobStorage.BlobMetadata> getMetadata(BlobId blobId) {
+        Preconditions.checkNotNull(blobId);
+
+        Blob blob = storage.get(blobId);
+        if (blob == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new BlobMetadata(blob));
+        }

Review comment:
       Done in 
[df17222](https://github.com/apache/flink/pull/15599/commits/df17222b8b4e1cea268da7589e8cb808535d7dd8).

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** BlobStorage implementation for Google storage. */
+public class GSBlobStorage implements BlobStorage {
+
+    /** Blob metadata, wraps Google storage Blob. */
+    static class BlobMetadata implements BlobStorage.BlobMetadata {
+
+        private final Blob blob;
+
+        private BlobMetadata(Blob blob) {
+            this.blob = Preconditions.checkNotNull(blob);
+        }
+
+        @Override
+        public String getChecksum() {
+            return blob.getCrc32c();
+        }
+    }
+
+    /** Blob write channel, wraps Google storage WriteChannel. */
+    static class WriteChannel implements BlobStorage.WriteChannel {
+
+        final com.google.cloud.WriteChannel writeChannel;
+
+        private WriteChannel(com.google.cloud.WriteChannel writeChannel) {
+            this.writeChannel = Preconditions.checkNotNull(writeChannel);
+        }
+
+        @Override
+        public void setChunkSize(int chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0);
+
+            writeChannel.setChunkSize(chunkSize);
+        }
+
+        @Override
+        public int write(byte[] content, int start, int length) throws 
IOException {
+            Preconditions.checkNotNull(content);
+            Preconditions.checkArgument(start >= 0);
+            Preconditions.checkArgument(length >= 0);
+
+            ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
+            return writeChannel.write(byteBuffer);
+        }
+
+        @Override
+        public void close() throws IOException {
+            writeChannel.close();
+        }
+    }
+
+    /** The wrapped Google Storage instance. */
+    private final Storage storage;
+
+    /**
+     * Constructs a GSBlobStorage instance.
+     *
+     * @param storage The wrapped Google Storage instance.
+     */
+    public GSBlobStorage(Storage storage) {
+        this.storage = Preconditions.checkNotNull(storage);
+    }
+
+    @Override
+    public BlobStorage.WriteChannel write(BlobId blobId, String contentType) {
+        Preconditions.checkNotNull(blobId);
+        Preconditions.checkNotNull(contentType);
+
+        BlobInfo blobInfo = 
BlobInfo.newBuilder(blobId).setContentType(contentType).build();
+        com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+        return new WriteChannel(writeChannel);
+    }
+
+    @Override
+    public Optional<BlobStorage.BlobMetadata> getMetadata(BlobId blobId) {
+        Preconditions.checkNotNull(blobId);
+
+        Blob blob = storage.get(blobId);
+        if (blob == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new BlobMetadata(blob));
+        }
+    }
+
+    @Override
+    public List<BlobId> list(String bucketName, String objectPrefix) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkNotNull(objectPrefix);
+
+        Page<Blob> blobs = storage.list(bucketName, 
Storage.BlobListOption.prefix(objectPrefix));
+        return StreamSupport.stream(blobs.iterateAll().spliterator(), false)
+                .map(BlobInfo::getBlobId)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void copy(BlobId sourceBlobId, BlobId targetBlobId) {
+        Preconditions.checkNotNull(sourceBlobId);
+        Preconditions.checkNotNull(targetBlobId);
+
+        storage.get(sourceBlobId).copyTo(targetBlobId).getResult();
+    }
+
+    @Override
+    public void compose(List<BlobId> sourceBlobIds, BlobId targetBlobId, 
String contentType) {
+        Preconditions.checkNotNull(sourceBlobIds);
+        Preconditions.checkArgument(sourceBlobIds.size() > 0);

Review comment:
       Done in 
[df17222](https://github.com/apache/flink/pull/15599/commits/df17222b8b4e1cea268da7589e8cb808535d7dd8).

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** BlobStorage implementation for Google storage. */
+public class GSBlobStorage implements BlobStorage {
+
+    /** Blob metadata, wraps Google storage Blob. */
+    static class BlobMetadata implements BlobStorage.BlobMetadata {

Review comment:
       Done in 
[df17222](https://github.com/apache/flink/pull/15599/commits/df17222b8b4e1cea268da7589e8cb808535d7dd8).

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import com.google.cloud.storage.BlobId;
+
+import java.net.URI;
+
+/** Utility functions related to blobs. */
+public class BlobUtils {
+
+    /** The maximum number of blobs that can be composed in a single 
operation. */
+    public static final int COMPOSE_MAX_BLOBS = 32;
+
+    /**
+     * Normalizes a blob id, ensuring that the generation is null.
+     *
+     * @param blobId The blob id
+     * @return The blob id with the generation set to null
+     */
+    public static BlobId normalizeBlobId(BlobId blobId) {
+        return BlobId.of(blobId.getBucket(), blobId.getName());
+    }

Review comment:
       Removed in 
[df17222](https://github.com/apache/flink/pull/15599/commits/df17222b8b4e1cea268da7589e8cb808535d7dd8).

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;

Review comment:
       Done in 
[df17222](https://github.com/apache/flink/pull/15599/commits/df17222b8b4e1cea268da7589e8cb808535d7dd8).

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")

Review comment:
       Per your suggestion below, I'm changing this to use ```memoryType```. 
Since it's no longer using ```integerType```, I'm thinking this removes the 
need for explicit units, but please let me know if you disagree.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)

Review comment:
       Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");

Review comment:
       Removed in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")
+                    .intType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE)

Review comment:
       Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")
+                    .intType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE)
+                    .withDescription(
+                            "This option sets the chunk size for writes by the 
recoverable writer. This value is passed through to the underlying "
+                                    + "Google WriteChannel; if zero, the 
default WriteChannel value is used.");

Review comment:
       Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")
+                    .intType()

Review comment:
       Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.util.Preconditions;
+
+/** The GS file system options. */
+public class GSFileSystemOptions {
+
+    /**
+     * The default value for the temporary bucket, i.e. empty string which 
means use same bucket as
+     * the final blob being written
+     */
+    public static final String DEFAULT_WRITER_TEMPORARY_BUCKET_NAME = "";
+
+    /** The default value for the temporary object prefix. */
+    public static final String DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX = 
".inprogress/";
+
+    /** The default value for the content type of blobs written by the 
recoverable writer. */
+    public static final String DEFAULT_WRITER_CONTENT_TYPE = 
"application/octet-stream";
+
+    /** The default value for the writer chunk size, i.e. zero which means use 
Google default * */
+    public static final int DEFAULT_WRITER_CHUNK_SIZE = 0;
+
+    /**
+     * The temporary bucket name to use for recoverable writes. If empty, use 
the same bucket as the
+     * final blob to write.
+     */
+    public final String writerTemporaryBucketName;
+
+    /** The prefix to be applied to the final object name when generating 
temporary object names. */
+    public final String writerTemporaryObjectPrefix;
+
+    /** The content type used for files written by the recoverable writer. */
+    public final String writerContentType;
+
+    /**
+     * The chunk size to use for writes on the underlying Google WriteChannel. 
If zero, then the
+     * chunk size is not set on the underlying channel, and the default value 
is used.
+     */
+    public final int writerChunkSize;
+
+    /**
+     * Constructs an options instance.
+     *
+     * @param writerTemporaryBucketName The temporary bucket name, if empty 
use same bucket as final
+     *     blob
+     * @param writerTemporaryObjectPrefix The temporary object prefix
+     * @param writerContentType The content type
+     * @param writerChunkSize The chunk size, if zero this means use Google 
default
+     */
+    public GSFileSystemOptions(
+            String writerTemporaryBucketName,
+            String writerTemporaryObjectPrefix,
+            String writerContentType,
+            int writerChunkSize) {
+        this.writerTemporaryBucketName = 
Preconditions.checkNotNull(writerTemporaryBucketName);
+        this.writerTemporaryObjectPrefix = 
Preconditions.checkNotNull(writerTemporaryObjectPrefix);
+        this.writerContentType = Preconditions.checkNotNull(writerContentType);
+        Preconditions.checkArgument(writerChunkSize >= 0);

Review comment:
       Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.writer.GSRecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+import java.io.IOException;
+
+/** Provides recoverable-writer functionality for the standard 
GoogleHadoopFileSystem. */
+class GSFileSystem extends HadoopFileSystem {

Review comment:
       Done in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");

Review comment:
       Config for temporary-object prefix removed in 
[2dc286c](https://github.com/apache/flink/pull/15599/commits/2dc286c697fff7ad6adb68c2a0e6de40d1c20bfe).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to