galenwarren commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r619674758
########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.utils; + +import com.google.cloud.storage.BlobId; + +import java.net.URI; + +/** Utility functions related to blobs. */ +public class BlobUtils { + + /** The maximum number of blobs that can be composed in a single operation. */ + public static final int COMPOSE_MAX_BLOBS = 32; + + /** + * Normalizes a blob id, ensuring that the generation is null. + * + * @param blobId The blob id + * @return The blob id with the generation set to null + */ + public static BlobId normalizeBlobId(BlobId blobId) { + return BlobId.of(blobId.getBucket(), blobId.getName()); + } Review comment: A BlobId has three parts -- a bucket name, an object name, and a generation. If the generation is null, that means "latest generation". So the normalization serves to essentially remove the explicit generation from a blob, i.e. making it null which means "latest generation." This is done so that we can compare "expected" temporary blob ids, which don't have generations specified, with blob ids returned from the API, which do have generations set. This comparison is done in exactly one place, in ```cleanupRecoverableState``` where we're matching up the list of expected BlobIds from the recoverable writer state with the list of temporary blobs that were found in storage, in order to be able to return true/false from that function to indicate whether everything that was expected to be deleted was, in fact, deleted. Temporary blobs should never be overwritten -- the object name contains a UUID that is generated each time one is created -- so the generation of a temporary blob should always be zero. So, I figured it would be safe to compare them in this way. However, as you pointed out in a different comment, I probably shouldn't be deleting all of those blobs based on partial name match anyway, so I think the need for this normalization goes away. So I'll just plan to remove it. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.storage; + +import com.google.cloud.storage.BlobId; Review comment: Good suggestions on naming, will do. Regarding BlobId, I thought about this, too, while writing the code. You're right, the only thing truly specific to google storage in this interface is BlobId, and the only parts we use of that are bucket name and object name, so an abstraction would certainly be possible. I'm not sure it this is where you're going with this, but if we did that, in principle the BlobStorage abstraction could be used to implement a recoverable writer over any storage that could support BlobStorage. Looking at the interface, it seems plausible that other bucket-based storages could support that, though I think I'd need to rethink how options are supplied (i.e. ```setChunkSize``` is somewhat google specific), and I'd probably want to add a method to generate a checksum from a byte array, so that the specifics of how a checksum is generated could be provider specific and not fixed (in ChecksumUtils). One option I considered was making BlobStorage a generic interface, with a BLOBID type parameter, constrained appropriately to extend an interface that exposes bucket and object name. That would be really easy to do, maybe I should go ahead do at least that part? That would get BlobId out of there. Whether it makes sense to abstract away *everything* google-specific (i.e. options, checksum) such that this could be used with non-google storage, I'll leave that up to you. That wouldn't be hard to do either, but probably not worth it unless it's realistic that this might be used elsewhere. Honestly, my primary reason for this interface was for testability. But if you think it's worthwhile to make the interface more general, it would be pretty straightforward and I'd be happy to make that change. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + private static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME = + ConfigOptions.key("gs.writer.temporary.bucket.name") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) + .withDescription( + "This option sets the bucket name used by the recoverable writer to store temporary files. " + + "If empty, temporary files are stored in the same bucket as the final file being written."); + + public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX = + ConfigOptions.key("gs.writer.temporary.object.prefix") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) + .withDescription( + "This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " + + "final object name to form the base name for temporary files."); Review comment: The main reason to consider adding this knob is to allow one to use google storage's [object lifecycle management](https://cloud.google.com/storage/docs/lifecycle) to assign TTLs to temporary files. In google storage, TTLs can only be assigned at the bucket level, not the individual object level. So it would be impossible to assign TTLs to temporary objects that reside in the same bucket as the "final" objects without also applying the TTL to the final objects. I do think this could be useful, especially when I make the changes you've suggested to make the deletion of temporary files more focused; the TTL mechanism might be the only way orphaned temporary files would ever get deleted. What do you think? ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableFsDataOutputStream.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + +/** The data output stream implementation for the GS recoverable writer. */ +class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { + + /** The underlying blob storage. */ + private final BlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable writer. */ + private final GSRecoverableWriter writer; + + /** The recoverable writer state. */ + private final GSRecoverableWriterState state; + + /** + * The current write channel, if one exists. A channel is created when one doesn't exist and + * bytes are written, and the channel is closed/destroyed when explicitly closed by the consumer + * (via close or closeForCommit) or when the data output stream is persisted (via persist). + * Calling persist does not close the data output stream, so it's possible that more bytes will + * be written, which will cause another channel to be created. So, multiple write channels may + * be created and destroyed during the lifetime of the data output stream. + */ + @Nullable GSChecksumWriteChannel currentWriteChannel; + + GSRecoverableFsDataOutputStream( + BlobStorage storage, + GSFileSystemOptions options, + GSRecoverableWriter writer, + GSRecoverableWriterState state) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.writer = Preconditions.checkNotNull(writer); + this.state = Preconditions.checkNotNull(state); + } + + @Override + public long getPos() throws IOException { + return state.bytesWritten; + } + + @Override + public void write(int byteValue) throws IOException { + byte[] bytes = new byte[] {(byte) byteValue}; + write(bytes); + } + + @Override + public void write(@Nonnull byte[] content) throws IOException { + Preconditions.checkNotNull(content); + + write(content, 0, content.length); + } + + @Override + public void write(@Nonnull byte[] content, int start, int length) throws IOException { + Preconditions.checkNotNull(content); + Preconditions.checkArgument(start >= 0); + Preconditions.checkArgument(length >= 0); + + // if the data stream is already closed, throw an exception + if (state.closed) { + throw new IOException("Illegal attempt to write to closed output stream"); + } + + // if necessary, create a write channel + if (currentWriteChannel == null) { + currentWriteChannel = createWriteChannel(); + } + + // write to the stream. the docs say that, in some circumstances, though an attempt will be + // made to write all of the requested bytes, there are some cases where only some bytes will + // be written. it's not clear whether this could ever happen with a Google storage + // WriteChannel; in any case, recoverable writers don't support partial writes, so if this + // ever happens, we must fail the write.: + // https://docs.oracle.com/javase/7/docs/api/java/nio/channels/WritableByteChannel.html#write(java.nio.ByteBuffer) + int bytesWritten = currentWriteChannel.write(content, start, length); + if (bytesWritten != length) { + throw new IOException( + String.format( + "WriteChannel.write wrote %d of %d requested bytes, failing.", + bytesWritten, length)); + } + + // update count of total bytes written + state.bytesWritten += length; + } + + @Override + public void flush() throws IOException { + // not supported for GS, flushing frequency is controlled by the chunk size setting + // https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int- + } + + @Override + public void sync() throws IOException { + // not supported for GS, flushing frequency is controlled by the chunk size setting + // https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int- + } Review comment: Agreed, these methods don't seem to be called -- I set breakpoints and watched it run, and I never saw them get hit. Now that I look at this again, I realize that the current noop implementation is really a holdover from the original implementation we considered, where a single ```WriteChannel``` would be used for the entire upload. In that mode, there's really no way to flush or sync. However, since we've changed it so that we use multiple write channels, I think I could implement ```flush``` and ```sync``` by closing the active write channel (if it exists). So, basically just implement each of those methods by calling the private method ```closeChannelIfExists```. Shall I just do that? That way, if the core recoverable writer code ever does start calling one of those methods, we'll have an appropriate implementation in place. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + + /** The underlying blob storage. */ + private final BlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable writer instance. */ + private final GSRecoverableWriter writer; + + /** The recoverable writer state for the commit operation. */ + private final GSRecoverableWriterState state; + + GSRecoverableWriterCommitter( + BlobStorage storage, + GSFileSystemOptions options, + GSRecoverableWriter writer, + GSRecoverableWriterState state) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.writer = Preconditions.checkNotNull(writer); + this.state = Preconditions.checkNotNull(state); + } + + @Override + public void commit() throws IOException { + + // compose all the component blob ids into the final blob id. if the component blob ids are + // in the same bucket as the final blob id, this can be done directly. otherwise, we must + // compose to a new temporary blob id in the same bucket as the component blob ids and + // then copy that blob to the final blob location + if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + + // compose directly to final blob + composeBlobs( + state.getComponentBlobIds(options), + state.finalBlobId, + options.writerContentType); + + } else { + + // compose to a temporary blob id, then copy to final blob id + BlobId intermediateBlobId = state.createTemporaryBlobId(options); + composeBlobs( + state.getComponentBlobIds(options), + intermediateBlobId, + options.writerContentType); + storage.copy(intermediateBlobId, state.finalBlobId); + } + + // clean up after commit + writer.cleanupRecoverableState(state); Review comment: I wasn't sure exactly when ```cleanupRecoverableState``` was called in the normal flow of things, so I ran a test where I set a breakpoint on that method and wrote data through a StreamingFileSink. What I observed was that ```cleanupRecoverableState``` did not get called as blobs were successfully committed, so no cleanup was occurring. From the description of the [method](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-), it seemed plausible to me that this was only called on failure: > Frees up any resources that were previously occupied in order to be able to recover from a (potential) failure ... but I wasn't sure. But yes, I agree, it would make more sense to me if ```cleanupRecoverableState``` were called on both successes and failures, and in that case the call in ```commit``` should be removed. If it's certainly possible that I made a mistake in my testing. Should ```cleanupRecoverabeState``` be getting called after successful commits? ########## File path: flink-filesystems/flink-gs-fs-hadoop/pom.xml ########## @@ -0,0 +1,208 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-filesystems</artifactId> + <version>1.13-SNAPSHOT</version> Review comment: Will do. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + private static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME = + ConfigOptions.key("gs.writer.temporary.bucket.name") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) Review comment: Good suggestion -- will do. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + private static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME = + ConfigOptions.key("gs.writer.temporary.bucket.name") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) + .withDescription( + "This option sets the bucket name used by the recoverable writer to store temporary files. " + + "If empty, temporary files are stored in the same bucket as the final file being written."); + + public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX = + ConfigOptions.key("gs.writer.temporary.object.prefix") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) + .withDescription( + "This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " + + "final object name to form the base name for temporary files."); + + public static final ConfigOption<String> WRITER_CONTENT_TYPE = + ConfigOptions.key("gs.writer.content.type") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE) + .withDescription( + "This option sets the content type applied to files written by the recoverable writer."); Review comment: Yes, I suppose this isn't really needed as an option. I don't see anything similar in the S3 recoverable writer either. I'll remove it. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + private static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME = + ConfigOptions.key("gs.writer.temporary.bucket.name") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) + .withDescription( + "This option sets the bucket name used by the recoverable writer to store temporary files. " + + "If empty, temporary files are stored in the same bucket as the final file being written."); + + public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX = + ConfigOptions.key("gs.writer.temporary.object.prefix") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) + .withDescription( + "This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " + + "final object name to form the base name for temporary files."); + + public static final ConfigOption<String> WRITER_CONTENT_TYPE = + ConfigOptions.key("gs.writer.content.type") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE) + .withDescription( + "This option sets the content type applied to files written by the recoverable writer."); + + public static final ConfigOption<Integer> WRITER_CHUNK_SIZE = + ConfigOptions.key("gs.writer.chunk.size") + .intType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE) Review comment: Will fix. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + private static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME = + ConfigOptions.key("gs.writer.temporary.bucket.name") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) + .withDescription( + "This option sets the bucket name used by the recoverable writer to store temporary files. " + + "If empty, temporary files are stored in the same bucket as the final file being written."); + + public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX = + ConfigOptions.key("gs.writer.temporary.object.prefix") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) + .withDescription( + "This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " + + "final object name to form the base name for temporary files."); + + public static final ConfigOption<String> WRITER_CONTENT_TYPE = + ConfigOptions.key("gs.writer.content.type") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE) + .withDescription( + "This option sets the content type applied to files written by the recoverable writer."); + + public static final ConfigOption<Integer> WRITER_CHUNK_SIZE = + ConfigOptions.key("gs.writer.chunk.size") Review comment: Good suggestion, will do. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** The state of a recoverable write. */ +class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, Cloneable { + + /** The blob id to which the recoverable write operation is writing. */ + public final BlobId finalBlobId; + + /** The number of bytes that have been written so far. */ + public long bytesWritten; + + /** Indicates if the write has been closed. */ + public boolean closed; + + /** The object ids for the temporary objects that should be composed to form the final blob. */ + public final List<UUID> componentObjectIds; + + GSRecoverableWriterState( + BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> componentObjectIds) { + this.finalBlobId = Preconditions.checkNotNull(finalBlobId); + Preconditions.checkArgument(bytesWritten >= 0); + this.bytesWritten = bytesWritten; + this.closed = closed; + + // shallow copy the component object ids to ensure this state object exclusively + // manages the list of component object ids + this.componentObjectIds = new ArrayList<>(Preconditions.checkNotNull(componentObjectIds)); + } + + GSRecoverableWriterState(GSRecoverableWriterState state) { + this(state.finalBlobId, state.bytesWritten, state.closed, state.componentObjectIds); + } + + GSRecoverableWriterState(BlobId finalBlobId) { + this(finalBlobId, 0, false, new ArrayList<>()); + } + + /** + * Returns the temporary bucket name. If options specifies a temporary bucket name, we use that + * one; otherwise, we use the bucket name of the final blob. + * + * @param options The GS file system options + * @return The temporary bucket name + */ + String getTemporaryBucketName(GSFileSystemOptions options) { + return options.writerTemporaryBucketName.isEmpty() + ? finalBlobId.getBucket() + : options.writerTemporaryBucketName; + } + + /** + * Returns a temporary object partial name, i.e. .inprogress/foo/bar/ for the final blob with + * object name "foo/bar". The included trailing slash is deliberate, so that we can be sure that + * object names that start with this partial name are, in fact, temporary files associated with + * the upload of the associated final blob. + * + * @param options The GS file system options + * @return The temporary object partial name + */ + String getTemporaryObjectPartialName(GSFileSystemOptions options) { + String finalObjectName = finalBlobId.getName(); + return String.format("%s%s/", options.writerTemporaryObjectPrefix, finalObjectName); + } + + /** + * Returns a temporary object name, formed by appending the compact string version of the + * temporary object id to the temporary object partial name, i.e. + * .inprogress/foo/bar/EjgelvANQ525hLUW2S6DBA for the final blob with object name "foo/bar". + * + * @param temporaryObjectId The temporary object id + * @param options The GS file system options + * @return The temporary object name + */ + String getTemporaryObjectName(UUID temporaryObjectId, GSFileSystemOptions options) { + return getTemporaryObjectPartialName(options) + temporaryObjectId.toString(); + } + + /** + * Creates a temporary blob id for a provided temporary object id. + * + * @param temporaryObjectId The temporary object id + * @param options The GS file system options + * @return + */ + private BlobId createTemporaryBlobId(UUID temporaryObjectId, GSFileSystemOptions options) { + String temporaryBucketName = getTemporaryBucketName(options); + String temporaryObjectName = getTemporaryObjectName(temporaryObjectId, options); + return BlobId.of(temporaryBucketName, temporaryObjectName); + } + + /** + * Creates a new temporary blob id. + * + * @param options The GS file system options + * @return The new temporary blob id. + */ + BlobId createTemporaryBlobId(GSFileSystemOptions options) { + UUID temporaryObjectId = UUID.randomUUID(); + return createTemporaryBlobId(temporaryObjectId, options); + } + + /** + * Create a new temporary blob id and add to the list of components. + * + * @param options The GS file system options + * @return The new component blob id. + */ + BlobId createComponentBlobId(GSFileSystemOptions options) { Review comment: Good point -- I'll take a look here and make it more intuitive. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** The state of a recoverable write. */ +class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, Cloneable { + + /** The blob id to which the recoverable write operation is writing. */ + public final BlobId finalBlobId; + + /** The number of bytes that have been written so far. */ + public long bytesWritten; + + /** Indicates if the write has been closed. */ + public boolean closed; + + /** The object ids for the temporary objects that should be composed to form the final blob. */ + public final List<UUID> componentObjectIds; Review comment: Good suggestion -- will do. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableFsDataOutputStream.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + +/** The data output stream implementation for the GS recoverable writer. */ +class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { + + /** The underlying blob storage. */ + private final BlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable writer. */ + private final GSRecoverableWriter writer; + + /** The recoverable writer state. */ + private final GSRecoverableWriterState state; + + /** + * The current write channel, if one exists. A channel is created when one doesn't exist and + * bytes are written, and the channel is closed/destroyed when explicitly closed by the consumer + * (via close or closeForCommit) or when the data output stream is persisted (via persist). + * Calling persist does not close the data output stream, so it's possible that more bytes will + * be written, which will cause another channel to be created. So, multiple write channels may + * be created and destroyed during the lifetime of the data output stream. + */ + @Nullable GSChecksumWriteChannel currentWriteChannel; + + GSRecoverableFsDataOutputStream( + BlobStorage storage, + GSFileSystemOptions options, + GSRecoverableWriter writer, + GSRecoverableWriterState state) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.writer = Preconditions.checkNotNull(writer); + this.state = Preconditions.checkNotNull(state); + } + + @Override + public long getPos() throws IOException { + return state.bytesWritten; + } + + @Override + public void write(int byteValue) throws IOException { + byte[] bytes = new byte[] {(byte) byteValue}; + write(bytes); + } + + @Override + public void write(@Nonnull byte[] content) throws IOException { + Preconditions.checkNotNull(content); + + write(content, 0, content.length); + } + + @Override + public void write(@Nonnull byte[] content, int start, int length) throws IOException { + Preconditions.checkNotNull(content); + Preconditions.checkArgument(start >= 0); + Preconditions.checkArgument(length >= 0); + + // if the data stream is already closed, throw an exception + if (state.closed) { + throw new IOException("Illegal attempt to write to closed output stream"); + } + + // if necessary, create a write channel + if (currentWriteChannel == null) { + currentWriteChannel = createWriteChannel(); + } + + // write to the stream. the docs say that, in some circumstances, though an attempt will be + // made to write all of the requested bytes, there are some cases where only some bytes will + // be written. it's not clear whether this could ever happen with a Google storage + // WriteChannel; in any case, recoverable writers don't support partial writes, so if this + // ever happens, we must fail the write.: + // https://docs.oracle.com/javase/7/docs/api/java/nio/channels/WritableByteChannel.html#write(java.nio.ByteBuffer) + int bytesWritten = currentWriteChannel.write(content, start, length); + if (bytesWritten != length) { + throw new IOException( + String.format( + "WriteChannel.write wrote %d of %d requested bytes, failing.", + bytesWritten, length)); + } + + // update count of total bytes written + state.bytesWritten += length; + } + + @Override + public void flush() throws IOException { + // not supported for GS, flushing frequency is controlled by the chunk size setting + // https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int- + } + + @Override + public void sync() throws IOException { + // not supported for GS, flushing frequency is controlled by the chunk size setting + // https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int- + } Review comment: They'd be trivial to implement, so I'll just go ahead and do that. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; + +/** The recoverable writer implementation for Google storage. */ +public class GSRecoverableWriter implements RecoverableWriter { + + /** The underlying blob storage. */ + private final BlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** + * Construct a GS recoverable writer. + * + * @param storage The underlying blob storage instance + * @param options The GS file system options + */ + public GSRecoverableWriter(BlobStorage storage, GSFileSystemOptions options) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + } + + @Override + public boolean requiresCleanupOfRecoverableState() { + return true; + } + + @Override + public boolean supportsResume() { + return true; + } + + @Override + public RecoverableFsDataOutputStream open(Path path) throws IOException { + Preconditions.checkNotNull(path); + + BlobId finalBlobId = BlobUtils.parseUri(path.toUri()); + GSRecoverableWriterState state = new GSRecoverableWriterState(finalBlobId); + return new GSRecoverableFsDataOutputStream(storage, options, this, state); + } + + @Override + public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException { + Preconditions.checkNotNull(resumable); + + GSRecoverableWriterState state = (GSRecoverableWriterState) resumable; + return new GSRecoverableFsDataOutputStream(storage, options, this, state); + } + + @Override + public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { + Preconditions.checkNotNull(resumable); + + // determine the partial name for the temporary objects to be deleted + GSRecoverableWriterState state = (GSRecoverableWriterState) resumable; + String temporaryBucketName = state.getTemporaryBucketName(options); + String temporaryObjectPartialName = state.getTemporaryObjectPartialName(options); + + // this will hold the set of blob ids that were actually deleted + HashSet<BlobId> deletedBlobIds = new HashSet<>(); + + // find all the temp blobs by looking for anything that starts with the temporary + // object partial name. doing it this way finds any orphaned temp blobs that might + // have come about when resuming + List<BlobId> foundTempBlobIds = + storage.list(temporaryBucketName, temporaryObjectPartialName); + if (!foundTempBlobIds.isEmpty()) { + + // delete all the temp blobs, and populate the set with ones that were actually deleted + // normalize in case the blob came back with a generation populated + List<Boolean> deleteResults = storage.delete(foundTempBlobIds); + for (int i = 0; i < deleteResults.size(); i++) { + if (deleteResults.get(i)) { + deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i))); + } + } + } Review comment: Oh, I see. I wasn't understanding that ```cleanupRecoverableState``` might be called multiple times during a single recoverable write operation, to clean up some but not all of the temporary blobs -- rather, I was thinking it would just be called once, at the end of a failed recoverable write operation, to clean up everything. I agree, the scenario you describe could result in needed files being missing. So, to make sure I'm on the same page, is the right thing to do here to just delete those temp blobs directly referenced by the supplied ```ResumeRecoverable```? That would actually simplify some things, e.g. there would be no need to support the ```list``` method on ```BlobStorage``` anymore, the BlobId normalization we discussed elsewhere wouldn't be needed ... ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + + /** The underlying blob storage. */ + private final BlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable writer instance. */ + private final GSRecoverableWriter writer; + + /** The recoverable writer state for the commit operation. */ + private final GSRecoverableWriterState state; + + GSRecoverableWriterCommitter( + BlobStorage storage, + GSFileSystemOptions options, + GSRecoverableWriter writer, + GSRecoverableWriterState state) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.writer = Preconditions.checkNotNull(writer); + this.state = Preconditions.checkNotNull(state); + } + + @Override + public void commit() throws IOException { + + // compose all the component blob ids into the final blob id. if the component blob ids are + // in the same bucket as the final blob id, this can be done directly. otherwise, we must + // compose to a new temporary blob id in the same bucket as the component blob ids and + // then copy that blob to the final blob location + if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + + // compose directly to final blob + composeBlobs( + state.getComponentBlobIds(options), + state.finalBlobId, + options.writerContentType); + + } else { + + // compose to a temporary blob id, then copy to final blob id + BlobId intermediateBlobId = state.createTemporaryBlobId(options); + composeBlobs( + state.getComponentBlobIds(options), + intermediateBlobId, + options.writerContentType); + storage.copy(intermediateBlobId, state.finalBlobId); + } + + // clean up after commit + writer.cleanupRecoverableState(state); Review comment: I see. I don't actually recall seeing ```cleanupRecoverableState``` being called at any point on successful writes, even after the next checkpoint completed, but it's possible I'm wrong about that. I'll run another quick check to make sure I do see ```cleanupRecoverableState``` being called when expected, and if so, I'll remove that call during commit. I'll report back what I find. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.storage; + +import com.google.cloud.storage.BlobId; Review comment: Makes sense, I'll do just the simple abstraction related to BlobId. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + private static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME = + ConfigOptions.key("gs.writer.temporary.bucket.name") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) + .withDescription( + "This option sets the bucket name used by the recoverable writer to store temporary files. " + + "If empty, temporary files are stored in the same bucket as the final file being written."); + + public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX = + ConfigOptions.key("gs.writer.temporary.object.prefix") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) + .withDescription( + "This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " + + "final object name to form the base name for temporary files."); Review comment: It's not really necessary, I suppose. The use case I was envisioning here was this: When you specify a dedicatedt bucket to hold in-progress temporary blobs, the ".inprogress" prefix is sort of redundant. So this lets you set it to an empty string and have the files just appear directly in the root of the bucket, i.e. the files would look like: ```/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` ... instead of ... ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` Admittedly, this isn't really that important to be able to do. So I'm fine with removing this option if that's what you'd prefer. Also, this reminds me of a change I'm planning to submit with the next batch, which is to have the generated temporary blob names include both the final bucket name and the final object name. Currently, the temporary blob names only include the final object name, but not the bucket name. So what is now: ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` ... would become: ```/.inprogress/bucket_name/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` This keeps temporary blobs properly separated in the event that a) a dedicated bucket for temporary blobs is used and b) a blob with the same name (i.e. /foo/bar) is written into two different buckets by a StreamingFileSink at the same time. (And, it would actually technically still work *without* adding the bucket name, because the UUIDs would be distinct, but it just seemed to be potentially confusing to commingle the temporary blobs for different writes in the same storage tree. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + private static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME = + ConfigOptions.key("gs.writer.temporary.bucket.name") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) + .withDescription( + "This option sets the bucket name used by the recoverable writer to store temporary files. " + + "If empty, temporary files are stored in the same bucket as the final file being written."); + + public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX = + ConfigOptions.key("gs.writer.temporary.object.prefix") + .stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) + .withDescription( + "This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " + + "final object name to form the base name for temporary files."); Review comment: It's not really necessary, I suppose. The use case I was envisioning here was this: When you specify a dedicatedt bucket to hold in-progress temporary blobs, the ".inprogress" prefix is sort of redundant. So this lets you set it to an empty string and have the files just appear directly in the root of the bucket, i.e. the files would look like: ```/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` ... instead of ... ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` Admittedly, this isn't really that important to be able to do. So I'm fine with removing this option if that's what you'd prefer. Also, this reminds me of a change I'm planning to submit with the next batch, which is to have the generated temporary blob names include both the final bucket name and the final object name. Currently, the temporary blob names only include the final object name, but not the bucket name. So what is now: ```/.inprogress/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` ... would become: ```/.inprogress/bucket_name/foo/bar/7b342499-6918-48f0-bcf9-11cf2bc18c51``` This keeps temporary blobs properly separated in the event that a) a dedicated bucket for temporary blobs is used and b) a blob with the same name (i.e. /foo/bar) is written into two different buckets by a StreamingFileSink at the same time. (And, it would actually technically still work *without* adding the bucket name, because the UUIDs would be distinct, but it just seemed to be potentially confusing to commingle the temporary blobs for different writes in the same storage tree). ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** The state of a recoverable write. */ +class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, Cloneable { + + /** The blob id to which the recoverable write operation is writing. */ + public final BlobId finalBlobId; + + /** The number of bytes that have been written so far. */ + public long bytesWritten; + + /** Indicates if the write has been closed. */ + public boolean closed; + + /** The object ids for the temporary objects that should be composed to form the final blob. */ + public final List<UUID> componentObjectIds; + + GSRecoverableWriterState( + BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> componentObjectIds) { + this.finalBlobId = Preconditions.checkNotNull(finalBlobId); + Preconditions.checkArgument(bytesWritten >= 0); + this.bytesWritten = bytesWritten; + this.closed = closed; + + // shallow copy the component object ids to ensure this state object exclusively + // manages the list of component object ids + this.componentObjectIds = new ArrayList<>(Preconditions.checkNotNull(componentObjectIds)); + } + + GSRecoverableWriterState(GSRecoverableWriterState state) { + this(state.finalBlobId, state.bytesWritten, state.closed, state.componentObjectIds); + } + + GSRecoverableWriterState(BlobId finalBlobId) { + this(finalBlobId, 0, false, new ArrayList<>()); + } + + /** + * Returns the temporary bucket name. If options specifies a temporary bucket name, we use that + * one; otherwise, we use the bucket name of the final blob. + * + * @param options The GS file system options + * @return The temporary bucket name + */ + String getTemporaryBucketName(GSFileSystemOptions options) { + return options.writerTemporaryBucketName.isEmpty() + ? finalBlobId.getBucket() + : options.writerTemporaryBucketName; + } Review comment: If a dedicated temp bucket is specified in the options, then, yes, the same temp bucket is used for all writers/states. But if no dedicated temp bucket is specified, then the bucket that is used for temp blobs is the same bucket to which the "final" blob is being written, so that would be different when different buckets are being written to by streaming file sinks, and it would depend on the finalBlobId that is part of the state. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** The state of a recoverable write. */ +class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, Cloneable { + + /** The blob id to which the recoverable write operation is writing. */ + public final BlobId finalBlobId; + + /** The number of bytes that have been written so far. */ + public long bytesWritten; + + /** Indicates if the write has been closed. */ + public boolean closed; + + /** The object ids for the temporary objects that should be composed to form the final blob. */ + public final List<UUID> componentObjectIds; + + GSRecoverableWriterState( + BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> componentObjectIds) { + this.finalBlobId = Preconditions.checkNotNull(finalBlobId); + Preconditions.checkArgument(bytesWritten >= 0); + this.bytesWritten = bytesWritten; + this.closed = closed; + + // shallow copy the component object ids to ensure this state object exclusively + // manages the list of component object ids + this.componentObjectIds = new ArrayList<>(Preconditions.checkNotNull(componentObjectIds)); + } + + GSRecoverableWriterState(GSRecoverableWriterState state) { + this(state.finalBlobId, state.bytesWritten, state.closed, state.componentObjectIds); + } + + GSRecoverableWriterState(BlobId finalBlobId) { + this(finalBlobId, 0, false, new ArrayList<>()); + } + + /** + * Returns the temporary bucket name. If options specifies a temporary bucket name, we use that + * one; otherwise, we use the bucket name of the final blob. + * + * @param options The GS file system options + * @return The temporary bucket name + */ + String getTemporaryBucketName(GSFileSystemOptions options) { + return options.writerTemporaryBucketName.isEmpty() + ? finalBlobId.getBucket() + : options.writerTemporaryBucketName; + } + + /** + * Returns a temporary object partial name, i.e. .inprogress/foo/bar/ for the final blob with + * object name "foo/bar". The included trailing slash is deliberate, so that we can be sure that + * object names that start with this partial name are, in fact, temporary files associated with + * the upload of the associated final blob. + * + * @param options The GS file system options + * @return The temporary object partial name + */ + String getTemporaryObjectPartialName(GSFileSystemOptions options) { + String finalObjectName = finalBlobId.getName(); + return String.format("%s%s/", options.writerTemporaryObjectPrefix, finalObjectName); + } + + /** + * Returns a temporary object name, formed by appending the compact string version of the + * temporary object id to the temporary object partial name, i.e. + * .inprogress/foo/bar/EjgelvANQ525hLUW2S6DBA for the final blob with object name "foo/bar". + * + * @param temporaryObjectId The temporary object id + * @param options The GS file system options + * @return The temporary object name + */ + String getTemporaryObjectName(UUID temporaryObjectId, GSFileSystemOptions options) { + return getTemporaryObjectPartialName(options) + temporaryObjectId.toString(); + } + + /** + * Creates a temporary blob id for a provided temporary object id. + * + * @param temporaryObjectId The temporary object id + * @param options The GS file system options + * @return + */ + private BlobId createTemporaryBlobId(UUID temporaryObjectId, GSFileSystemOptions options) { + String temporaryBucketName = getTemporaryBucketName(options); + String temporaryObjectName = getTemporaryObjectName(temporaryObjectId, options); + return BlobId.of(temporaryBucketName, temporaryObjectName); + } + + /** + * Creates a new temporary blob id. + * + * @param options The GS file system options + * @return The new temporary blob id. + */ + BlobId createTemporaryBlobId(GSFileSystemOptions options) { + UUID temporaryObjectId = UUID.randomUUID(); + return createTemporaryBlobId(temporaryObjectId, options); + } Review comment: Good suggestion -- I'll move these out to be utils. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
