galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r619674758



##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import com.google.cloud.storage.BlobId;
+
+import java.net.URI;
+
+/** Utility functions related to blobs. */
+public class BlobUtils {
+
+    /** The maximum number of blobs that can be composed in a single 
operation. */
+    public static final int COMPOSE_MAX_BLOBS = 32;
+
+    /**
+     * Normalizes a blob id, ensuring that the generation is null.
+     *
+     * @param blobId The blob id
+     * @return The blob id with the generation set to null
+     */
+    public static BlobId normalizeBlobId(BlobId blobId) {
+        return BlobId.of(blobId.getBucket(), blobId.getName());
+    }

Review comment:
       A BlobId has three parts -- a bucket name, an object name, and a 
generation. If the generation is null, that means "latest generation". So the 
normalization serves to essentially remove the explicit generation from a blob, 
i.e. making it null which means "latest generation." This is done so that we 
can compare "expected" temporary blob ids, which don't have generations 
specified, with blob ids returned from the API, which do have generations set. 
   
   This comparison is done in exactly one place, in 
```cleanupRecoverableState``` where we're matching up the list of expected 
BlobIds from the recoverable writer state with the list of temporary blobs that 
were found in storage, in order to be able to return true/false from that 
function to indicate whether everything that was expected to be deleted was, in 
fact, deleted. 
   
   Temporary blobs should never be overwritten -- the object name contains a 
UUID that is generated each time one is created -- so the generation of a 
temporary blob should always be zero. So, I figured it would be safe to compare 
them in this way.
   
   However, as you pointed out in a different comment, I probably shouldn't be 
deleting all of those blobs based on partial name match anyway, so I think the 
need for this normalization goes away. So I'll just plan to remove it.
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;

Review comment:
       Good suggestions on naming, will do.
   
   Regarding BlobId, I thought about this, too, while writing the code. You're 
right, the only thing truly specific to google storage in this interface is 
BlobId, and the only parts we use of that are bucket name and object name, so 
an abstraction would certainly be possible. 
   
   I'm not sure it this is where you're going with this, but if we did that, in 
principle the BlobStorage abstraction could be used to implement a recoverable 
writer over any storage that could support BlobStorage. Looking at the 
interface, it seems plausible that other bucket-based storages could support 
that, though I think I'd need to rethink how options are supplied (i.e. 
```setChunkSize``` is somewhat google specific), and I'd probably want to add a 
method to generate a checksum from a byte array, so that the specifics of how a 
checksum is generated could be provider specific and not fixed (in 
ChecksumUtils).
   
   One option I considered was making BlobStorage a generic interface, with a 
BLOBID type parameter, constrained appropriately to extend an interface that 
exposes bucket and object name. That would be really easy to do, maybe I should 
go ahead do at least that part? That would get BlobId out of there.
   
   Whether it makes sense to abstract away *everything* google-specific (i.e. 
options, checksum) such that this could be used with non-google storage, I'll 
leave that up to you. That wouldn't be hard to do either, but probably not 
worth it unless it's realistic that this might be used elsewhere. 
   
   
   
   
   
   Honestly, my primary reason for this interface was for testability. But if 
you think it's worthwhile to make the interface more general, it would be 
pretty straightforward and I'd be happy to make that change. 
   
   
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");

Review comment:
       The main reason to consider adding this knob is to allow one to use 
google storage's [object lifecycle 
management](https://cloud.google.com/storage/docs/lifecycle) to assign TTLs to 
temporary files. In google storage, TTLs can only be assigned at the bucket 
level, not the individual object level. So it would be impossible to assign 
TTLs to temporary objects that reside in the same bucket as the "final" objects 
without also applying the TTL to the final objects.
   
   I do think this could be useful, especially when I make the changes you've 
suggested to make the deletion of temporary files more focused; the TTL 
mechanism might be the only way orphaned temporary files would ever get 
deleted. What do you think?
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableFsDataOutputStream.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** The data output stream implementation for the GS recoverable writer. */
+class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state. */
+    private final GSRecoverableWriterState state;
+
+    /**
+     * The current write channel, if one exists. A channel is created when one 
doesn't exist and
+     * bytes are written, and the channel is closed/destroyed when explicitly 
closed by the consumer
+     * (via close or closeForCommit) or when the data output stream is 
persisted (via persist).
+     * Calling persist does not close the data output stream, so it's possible 
that more bytes will
+     * be written, which will cause another channel to be created. So, 
multiple write channels may
+     * be created and destroyed during the lifetime of the data output stream.
+     */
+    @Nullable GSChecksumWriteChannel currentWriteChannel;
+
+    GSRecoverableFsDataOutputStream(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return state.bytesWritten;
+    }
+
+    @Override
+    public void write(int byteValue) throws IOException {
+        byte[] bytes = new byte[] {(byte) byteValue};
+        write(bytes);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content) throws IOException {
+        Preconditions.checkNotNull(content);
+
+        write(content, 0, content.length);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content, int start, int length) throws 
IOException {
+        Preconditions.checkNotNull(content);
+        Preconditions.checkArgument(start >= 0);
+        Preconditions.checkArgument(length >= 0);
+
+        // if the data stream is already closed, throw an exception
+        if (state.closed) {
+            throw new IOException("Illegal attempt to write to closed output 
stream");
+        }
+
+        // if necessary, create a write channel
+        if (currentWriteChannel == null) {
+            currentWriteChannel = createWriteChannel();
+        }
+
+        // write to the stream. the docs say that, in some circumstances, 
though an attempt will be
+        // made to write all of the requested bytes, there are some cases 
where only some bytes will
+        // be written. it's not clear whether this could ever happen with a 
Google storage
+        // WriteChannel; in any case, recoverable writers don't support 
partial writes, so if this
+        // ever happens, we must fail the write.:
+        // 
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/WritableByteChannel.html#write(java.nio.ByteBuffer)
+        int bytesWritten = currentWriteChannel.write(content, start, length);
+        if (bytesWritten != length) {
+            throw new IOException(
+                    String.format(
+                            "WriteChannel.write wrote %d of %d requested 
bytes, failing.",
+                            bytesWritten, length));
+        }
+
+        // update count of total bytes written
+        state.bytesWritten += length;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }
+
+    @Override
+    public void sync() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }

Review comment:
       Agreed, these methods don't seem to be called -- I set breakpoints and 
watched it run, and I never saw them get hit.
   
   Now that I look at this again, I realize that the current noop 
implementation is really a holdover from the original implementation we 
considered, where a single ```WriteChannel``` would be used for the entire 
upload. In that mode, there's really no way to flush or sync. 
   
   However, since we've changed it so that we use multiple write channels, I 
think I could implement ```flush``` and ```sync``` by closing the active write 
channel (if it exists). So, basically just implement each of those methods by 
calling the private method ```closeChannelIfExists```.
   
   Shall I just do that? That way, if the core recoverable writer code ever 
does start calling one of those methods, we'll have an appropriate 
implementation in place.
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer instance. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state for the commit operation. */
+    private final GSRecoverableWriterState state;
+
+    GSRecoverableWriterCommitter(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // compose all the component blob ids into the final blob id. if the 
component blob ids are
+        // in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+        // compose to a new temporary blob id in the same bucket as the 
component blob ids and
+        // then copy that blob to the final blob location
+        if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+            // compose directly to final blob
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    state.finalBlobId,
+                    options.writerContentType);
+
+        } else {
+
+            // compose to a temporary blob id, then copy to final blob id
+            BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    intermediateBlobId,
+                    options.writerContentType);
+            storage.copy(intermediateBlobId, state.finalBlobId);
+        }
+
+        // clean up after commit
+        writer.cleanupRecoverableState(state);

Review comment:
       I wasn't sure exactly when ```cleanupRecoverableState``` was called in 
the normal flow of things, so I ran a test where I set a breakpoint on that 
method and wrote data through a StreamingFileSink. What I observed was that 
```cleanupRecoverableState``` did not get called as blobs were successfully 
committed, so no cleanup was occurring. 
   
   From the description of the 
[method](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-),
 it seemed plausible to me that this was only called on failure:
   > Frees up any resources that were previously occupied in order to be able 
to recover from a (potential) failure
   
   ... but I wasn't sure.
   
   But yes, I agree, it would make more sense to me if 
```cleanupRecoverableState``` were called on both successes and failures, and 
in that case the call in ```commit``` should be removed.
   
   If it's certainly possible that I made a mistake in my testing. Should 
```cleanupRecoverabeState``` be getting called after successful commits?
   

##########
File path: flink-filesystems/flink-gs-fs-hadoop/pom.xml
##########
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-filesystems</artifactId>
+               <version>1.13-SNAPSHOT</version>

Review comment:
       Will do.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)

Review comment:
       Good suggestion -- will do.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");

Review comment:
       Yes, I suppose this isn't really needed as an option. I don't see 
anything similar in the S3 recoverable writer either. I'll remove it.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")
+                    .intType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE)

Review comment:
       Will fix.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")

Review comment:
       Good suggestion, will do.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;
+
+    GSRecoverableWriterState(
+            BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> 
componentObjectIds) {
+        this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+        Preconditions.checkArgument(bytesWritten >= 0);
+        this.bytesWritten = bytesWritten;
+        this.closed = closed;
+
+        // shallow copy the component object ids to ensure this state object 
exclusively
+        // manages the list of component object ids
+        this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+    }
+
+    GSRecoverableWriterState(GSRecoverableWriterState state) {
+        this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+    }
+
+    GSRecoverableWriterState(BlobId finalBlobId) {
+        this(finalBlobId, 0, false, new ArrayList<>());
+    }
+
+    /**
+     * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+     * one; otherwise, we use the bucket name of the final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary bucket name
+     */
+    String getTemporaryBucketName(GSFileSystemOptions options) {
+        return options.writerTemporaryBucketName.isEmpty()
+                ? finalBlobId.getBucket()
+                : options.writerTemporaryBucketName;
+    }
+
+    /**
+     * Returns a temporary object partial name, i.e. .inprogress/foo/bar/ for 
the final blob with
+     * object name "foo/bar". The included trailing slash is deliberate, so 
that we can be sure that
+     * object names that start with this partial name are, in fact, temporary 
files associated with
+     * the upload of the associated final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary object partial name
+     */
+    String getTemporaryObjectPartialName(GSFileSystemOptions options) {
+        String finalObjectName = finalBlobId.getName();
+        return String.format("%s%s/", options.writerTemporaryObjectPrefix, 
finalObjectName);
+    }
+
+    /**
+     * Returns a temporary object name, formed by appending the compact string 
version of the
+     * temporary object id to the temporary object partial name, i.e.
+     * .inprogress/foo/bar/EjgelvANQ525hLUW2S6DBA for the final blob with 
object name "foo/bar".
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return The temporary object name
+     */
+    String getTemporaryObjectName(UUID temporaryObjectId, GSFileSystemOptions 
options) {
+        return getTemporaryObjectPartialName(options) + 
temporaryObjectId.toString();
+    }
+
+    /**
+     * Creates a temporary blob id for a provided temporary object id.
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return
+     */
+    private BlobId createTemporaryBlobId(UUID temporaryObjectId, 
GSFileSystemOptions options) {
+        String temporaryBucketName = getTemporaryBucketName(options);
+        String temporaryObjectName = getTemporaryObjectName(temporaryObjectId, 
options);
+        return BlobId.of(temporaryBucketName, temporaryObjectName);
+    }
+
+    /**
+     * Creates a new temporary blob id.
+     *
+     * @param options The GS file system options
+     * @return The new temporary blob id.
+     */
+    BlobId createTemporaryBlobId(GSFileSystemOptions options) {
+        UUID temporaryObjectId = UUID.randomUUID();
+        return createTemporaryBlobId(temporaryObjectId, options);
+    }
+
+    /**
+     * Create a new temporary blob id and add to the list of components.
+     *
+     * @param options The GS file system options
+     * @return The new component blob id.
+     */
+    BlobId createComponentBlobId(GSFileSystemOptions options) {

Review comment:
       Good point -- I'll take a look here and make it more intuitive.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;

Review comment:
       Good suggestion -- will do.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableFsDataOutputStream.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** The data output stream implementation for the GS recoverable writer. */
+class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state. */
+    private final GSRecoverableWriterState state;
+
+    /**
+     * The current write channel, if one exists. A channel is created when one 
doesn't exist and
+     * bytes are written, and the channel is closed/destroyed when explicitly 
closed by the consumer
+     * (via close or closeForCommit) or when the data output stream is 
persisted (via persist).
+     * Calling persist does not close the data output stream, so it's possible 
that more bytes will
+     * be written, which will cause another channel to be created. So, 
multiple write channels may
+     * be created and destroyed during the lifetime of the data output stream.
+     */
+    @Nullable GSChecksumWriteChannel currentWriteChannel;
+
+    GSRecoverableFsDataOutputStream(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return state.bytesWritten;
+    }
+
+    @Override
+    public void write(int byteValue) throws IOException {
+        byte[] bytes = new byte[] {(byte) byteValue};
+        write(bytes);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content) throws IOException {
+        Preconditions.checkNotNull(content);
+
+        write(content, 0, content.length);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content, int start, int length) throws 
IOException {
+        Preconditions.checkNotNull(content);
+        Preconditions.checkArgument(start >= 0);
+        Preconditions.checkArgument(length >= 0);
+
+        // if the data stream is already closed, throw an exception
+        if (state.closed) {
+            throw new IOException("Illegal attempt to write to closed output 
stream");
+        }
+
+        // if necessary, create a write channel
+        if (currentWriteChannel == null) {
+            currentWriteChannel = createWriteChannel();
+        }
+
+        // write to the stream. the docs say that, in some circumstances, 
though an attempt will be
+        // made to write all of the requested bytes, there are some cases 
where only some bytes will
+        // be written. it's not clear whether this could ever happen with a 
Google storage
+        // WriteChannel; in any case, recoverable writers don't support 
partial writes, so if this
+        // ever happens, we must fail the write.:
+        // 
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/WritableByteChannel.html#write(java.nio.ByteBuffer)
+        int bytesWritten = currentWriteChannel.write(content, start, length);
+        if (bytesWritten != length) {
+            throw new IOException(
+                    String.format(
+                            "WriteChannel.write wrote %d of %d requested 
bytes, failing.",
+                            bytesWritten, length));
+        }
+
+        // update count of total bytes written
+        state.bytesWritten += length;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }
+
+    @Override
+    public void sync() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }

Review comment:
       They'd be trivial to implement, so I'll just go ahead and do that.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+/** The recoverable writer implementation for Google storage. */
+public class GSRecoverableWriter implements RecoverableWriter {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /**
+     * Construct a GS recoverable writer.
+     *
+     * @param storage The underlying blob storage instance
+     * @param options The GS file system options
+     */
+    public GSRecoverableWriter(BlobStorage storage, GSFileSystemOptions 
options) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream open(Path path) throws IOException {
+        Preconditions.checkNotNull(path);
+
+        BlobId finalBlobId = BlobUtils.parseUri(path.toUri());
+        GSRecoverableWriterState state = new 
GSRecoverableWriterState(finalBlobId);
+        return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException {
+        Preconditions.checkNotNull(resumable);
+
+        GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+        return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws 
IOException {
+        Preconditions.checkNotNull(resumable);
+
+        // determine the partial name for the temporary objects to be deleted
+        GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+        String temporaryBucketName = state.getTemporaryBucketName(options);
+        String temporaryObjectPartialName = 
state.getTemporaryObjectPartialName(options);
+
+        // this will hold the set of blob ids that were actually deleted
+        HashSet<BlobId> deletedBlobIds = new HashSet<>();
+
+        // find all the temp blobs by looking for anything that starts with 
the temporary
+        // object partial name. doing it this way finds any orphaned temp 
blobs that might
+        // have come about when resuming
+        List<BlobId> foundTempBlobIds =
+                storage.list(temporaryBucketName, temporaryObjectPartialName);
+        if (!foundTempBlobIds.isEmpty()) {
+
+            // delete all the temp blobs, and populate the set with ones that 
were actually deleted
+            // normalize in case the blob came back with a generation populated
+            List<Boolean> deleteResults = storage.delete(foundTempBlobIds);
+            for (int i = 0; i < deleteResults.size(); i++) {
+                if (deleteResults.get(i)) {
+                    
deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i)));
+                }
+            }
+        }

Review comment:
       Oh, I see. I wasn't understanding that ```cleanupRecoverableState``` 
might be called multiple times during a single recoverable write operation, to 
clean up some but not all of the temporary blobs -- rather, I was thinking it 
would just be called once, at the end of a failed recoverable write operation, 
to clean up everything.
   
   I agree, the scenario you describe could result in needed files being 
missing. So, to make sure I'm on the same page, is the right thing to do here 
to just delete those temp blobs directly referenced by the supplied 
```ResumeRecoverable```? That would actually simplify some things, e.g. there 
would be no need to support the ```list``` method on ```BlobStorage``` anymore, 
the BlobId normalization we discussed elsewhere wouldn't be needed ...

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer instance. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state for the commit operation. */
+    private final GSRecoverableWriterState state;
+
+    GSRecoverableWriterCommitter(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // compose all the component blob ids into the final blob id. if the 
component blob ids are
+        // in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+        // compose to a new temporary blob id in the same bucket as the 
component blob ids and
+        // then copy that blob to the final blob location
+        if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+            // compose directly to final blob
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    state.finalBlobId,
+                    options.writerContentType);
+
+        } else {
+
+            // compose to a temporary blob id, then copy to final blob id
+            BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    intermediateBlobId,
+                    options.writerContentType);
+            storage.copy(intermediateBlobId, state.finalBlobId);
+        }
+
+        // clean up after commit
+        writer.cleanupRecoverableState(state);

Review comment:
       I see. I don't actually recall seeing ```cleanupRecoverableState``` 
being called at any point on successful writes, even after the next checkpoint 
completed, but it's possible I'm wrong about that. I'll run another quick check 
to make sure I do see ```cleanupRecoverableState``` being called when expected, 
and if so, I'll remove that call during commit. I'll report back what I find.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;

Review comment:
       Makes sense, I'll do just the simple abstraction related to BlobId.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");

Review comment:
       It's not really necessary, I suppose. The use case I was envisioning 
here was this: When you specify a dedicatedt bucket to hold in-progress 
temporary blobs, the ".inprogress" prefix is sort of redundant. So this lets 
you set it to an empty string and have the files just appear directly in the 
root of the bucket, i.e. the files would look like:
   
   ```/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   ... instead of ...
   
   ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   Admittedly, this isn't really that important to be able to do. So I'm fine 
with removing this option if that's what you'd prefer.
   
   Also, this reminds me of a change I'm planning to submit with the next 
batch, which is to have the generated temporary blob names include both the 
final bucket name and the final object name. Currently, the temporary blob 
names only include the final object name, but not the bucket name. So what is 
now:
   
   ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   ... would become:
   
   ```/.inprogress/bucket_name/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   This keeps temporary blobs properly separated in the event that a) a 
dedicated bucket for temporary blobs is used and b) a blob with the same name 
(i.e. /foo/bar) is written into two different buckets by a StreamingFileSink at 
the same time.
   
   (And, it would actually technically still work *without* adding the bucket 
name, because the UUIDs would be distinct, but it just seemed to be potentially 
confusing to commingle the temporary blobs for different writes in the same 
storage tree.
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");

Review comment:
       It's not really necessary, I suppose. The use case I was envisioning 
here was this: When you specify a dedicatedt bucket to hold in-progress 
temporary blobs, the ".inprogress" prefix is sort of redundant. So this lets 
you set it to an empty string and have the files just appear directly in the 
root of the bucket, i.e. the files would look like:
   
   ```/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   ... instead of ...
   
   ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   Admittedly, this isn't really that important to be able to do. So I'm fine 
with removing this option if that's what you'd prefer.
   
   Also, this reminds me of a change I'm planning to submit with the next 
batch, which is to have the generated temporary blob names include both the 
final bucket name and the final object name. Currently, the temporary blob 
names only include the final object name, but not the bucket name. So what is 
now:
   
   ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   ... would become:
   
   ```/.inprogress/bucket_name/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51```
   
   This keeps temporary blobs properly separated in the event that a) a 
dedicated bucket for temporary blobs is used and b) a blob with the same name 
(i.e. /foo/bar) is written into two different buckets by a StreamingFileSink at 
the same time.
   
   (And, it would actually technically still work *without* adding the bucket 
name, because the UUIDs would be distinct, but it just seemed to be potentially 
confusing to commingle the temporary blobs for different writes in the same 
storage tree).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;
+
+    GSRecoverableWriterState(
+            BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> 
componentObjectIds) {
+        this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+        Preconditions.checkArgument(bytesWritten >= 0);
+        this.bytesWritten = bytesWritten;
+        this.closed = closed;
+
+        // shallow copy the component object ids to ensure this state object 
exclusively
+        // manages the list of component object ids
+        this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+    }
+
+    GSRecoverableWriterState(GSRecoverableWriterState state) {
+        this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+    }
+
+    GSRecoverableWriterState(BlobId finalBlobId) {
+        this(finalBlobId, 0, false, new ArrayList<>());
+    }
+
+    /**
+     * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+     * one; otherwise, we use the bucket name of the final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary bucket name
+     */
+    String getTemporaryBucketName(GSFileSystemOptions options) {
+        return options.writerTemporaryBucketName.isEmpty()
+                ? finalBlobId.getBucket()
+                : options.writerTemporaryBucketName;
+    }

Review comment:
       If a dedicated temp bucket is specified in the options, then, yes, the 
same temp bucket is used for all writers/states.
   
   But if no dedicated temp bucket is specified, then the bucket that is used 
for temp blobs is the same bucket to which the "final" blob is being written, 
so that would be different when different buckets are being written to by 
streaming file sinks, and it would depend on the finalBlobId that is part of 
the state.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;
+
+    GSRecoverableWriterState(
+            BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> 
componentObjectIds) {
+        this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+        Preconditions.checkArgument(bytesWritten >= 0);
+        this.bytesWritten = bytesWritten;
+        this.closed = closed;
+
+        // shallow copy the component object ids to ensure this state object 
exclusively
+        // manages the list of component object ids
+        this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+    }
+
+    GSRecoverableWriterState(GSRecoverableWriterState state) {
+        this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+    }
+
+    GSRecoverableWriterState(BlobId finalBlobId) {
+        this(finalBlobId, 0, false, new ArrayList<>());
+    }
+
+    /**
+     * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+     * one; otherwise, we use the bucket name of the final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary bucket name
+     */
+    String getTemporaryBucketName(GSFileSystemOptions options) {
+        return options.writerTemporaryBucketName.isEmpty()
+                ? finalBlobId.getBucket()
+                : options.writerTemporaryBucketName;
+    }
+
+    /**
+     * Returns a temporary object partial name, i.e. .inprogress/foo/bar/ for 
the final blob with
+     * object name "foo/bar". The included trailing slash is deliberate, so 
that we can be sure that
+     * object names that start with this partial name are, in fact, temporary 
files associated with
+     * the upload of the associated final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary object partial name
+     */
+    String getTemporaryObjectPartialName(GSFileSystemOptions options) {
+        String finalObjectName = finalBlobId.getName();
+        return String.format("%s%s/", options.writerTemporaryObjectPrefix, 
finalObjectName);
+    }
+
+    /**
+     * Returns a temporary object name, formed by appending the compact string 
version of the
+     * temporary object id to the temporary object partial name, i.e.
+     * .inprogress/foo/bar/EjgelvANQ525hLUW2S6DBA for the final blob with 
object name "foo/bar".
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return The temporary object name
+     */
+    String getTemporaryObjectName(UUID temporaryObjectId, GSFileSystemOptions 
options) {
+        return getTemporaryObjectPartialName(options) + 
temporaryObjectId.toString();
+    }
+
+    /**
+     * Creates a temporary blob id for a provided temporary object id.
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return
+     */
+    private BlobId createTemporaryBlobId(UUID temporaryObjectId, 
GSFileSystemOptions options) {
+        String temporaryBucketName = getTemporaryBucketName(options);
+        String temporaryObjectName = getTemporaryObjectName(temporaryObjectId, 
options);
+        return BlobId.of(temporaryBucketName, temporaryObjectName);
+    }
+
+    /**
+     * Creates a new temporary blob id.
+     *
+     * @param options The GS file system options
+     * @return The new temporary blob id.
+     */
+    BlobId createTemporaryBlobId(GSFileSystemOptions options) {
+        UUID temporaryObjectId = UUID.randomUUID();
+        return createTemporaryBlobId(temporaryObjectId, options);
+    }

Review comment:
       Good suggestion -- I'll move these out to be utils.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to