galenwarren opened a new pull request #15599: URL: https://github.com/apache/flink/pull/15599
## What is the purpose of the change The goal here is to add [RecoverableWriter](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html) support for Google Storage (GS), to allow writing to GS buckets from Flink's [StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html). To do this, we must implement and register a [FileSystem](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html) implementation for GS and then provide a RecoverableWriter implementation via [createRecoverableWriter](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--). Fortunately, Google supplies a Hadoop [FileSystem](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileSystem.html) implementation via [GoogleHadoopFileSystem](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java), which can already be used with Flink. So we can wrap this with Flink's [HadoopFileSystem](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html) to implement the core file-system functionality. At a high level, to implement a recoverable writer, one must provide a RecoverableWriter implementation that does the following: * Creates an output data stream from a URI (i.e. gs://bucket/foo/bar), i.e. an implementation of [FSDataOutputStream](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FSDataOutputStream.html) * Allows writing to the output data stream in multiple chunks * Allows persisting the state of the output data stream while the stream is open * Allows recovering the persisted state of the output data stream, enabling writes to resume from that point * Supports atomic commit of the final file once the stream is closed This implementation accomplishes this for GS by writing files to a temporary location as data is written to the output stream, and then combining the various temporary files together upon commit (and deleting the temporary files). Each temporary file is written to GS using the [resumable upload](https://cloud.google.com/storage/docs/resumable-uploads) API. The recoverable writer state (GSRecoverableWriterState) keeps track of which temporary files, in what order, should be combined together to form the final file. (The recoverable writer state also keeps track of the final file location, the number of bytes written so far, and whether the output stream has been closed.) We considered but rejected the idea of using Google's resumable upload support to support the *entire* process of writing a temporary file, i.e. a design in which there would be exactly one temporary file uploaded for every final file written. We rejected this approach for two reasons: * Doing that would have required us to depend on Java serialization to persist the [WriteChannel](https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html) associated with the resumable upload * There is a nonconfigurable two-week limit on the duration of a single resumable upload, and we didn't want to have to live with that constraint Instead, our approach (potentially) writes multiple temporary files associated with each recoverable write; each time [persist](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#persist--) is called, any ongoing resumable upload is closed, causing a temporary file to be committed, and a new resumable upload is started if/when more bytes are written. We thus avoid having to persist WriteChannels and we avoid the two-week limit for a recoverable write. Note that each individual temporary file must be written within two weeks, which means that checkpoints need to be taken at least that frequently, but that doesn't seem like a problematic limitation in practice. When a recoverable write is cleaned up, either on commit or after a failure, all of the temporary files associated with that recoverable write are deleted. The naming scheme for the temporary files associated with a recoverable write is such that we can be sure to delete all temporary files -- even orphaned ones that might result from restarting from an earlier save/checkpoint. To simplify accessing the Google Storage API and to make it mockable for unit testing, this code includes a BlobStorage abstraction. This is implemented against GS in GSBlobStorage and against an in-memory store in MockBlobStorage. ## Brief change log Main changes are: * [fa060e9](https://github.com/apache/flink/commit/fa060e9135ed61f98670b2eb71b0f911a859ed3c) Add flink-gs-fs-hadoop project: Add new project for GS file system and recoverable writer with FileSystemFactory wireup. * [4b8c0d5](https://github.com/apache/flink/commit/4b8c0d53564357f3a87f36aa172e5255bda645ce) Add BlobStorage abstraction: Add interfaces to abstract away direct access to the Google Storage API, both to simplify that access and to make it mockable. * [f8bf558](https://github.com/apache/flink/commit/f8bf5581f2a46742a16150a18d764856fd6a5f9f) Implement BlobStorage for Google Storage: Add GSBlobStorage, an implementation of BlobStorage against the Google Storage API. * [f2399cd](https://github.com/apache/flink/commit/f2399cd725eeadbbab3abe821f4b43a730a9d17e) Add utility functions: Add some utility functions used by the recoverable writer. Includes unit tests. * [0f20b24](https://github.com/apache/flink/commit/0f20b24ef46b550f36519b5dbcc267bedacf260d) Implement BlobStorage for unit testing: Add MockBlobStorage, an implementation of BlobStorage against an in-memory store. * [1aac728](https://github.com/apache/flink/commit/1aac728941b3e755f2eb3b9eaac77222a96d6664) Implement recoverable writer: Implements RecoverableWriter against the BlobStorage abstraction. Includes unit tests. ## Verifying this change This change added tests and can be verified as follows: * Added unit tests for utility functions * Added unit tests for the recoverable writer against a mock in-memory BlobStorage implementation (MockBlobStorage), to validate various success and failure recoverable-write scenarios. Note that there are currently no unit tests that validate that GSBlobStorage (the concrete implementation of BlobStorage against the GS API) properly invokes the underlying API. This API is difficult to mock, as many return values are classes that can't be created or extended outside the package. Unit tests would be much easier here if we were to use something like Mockito, but that is discouraged in the coding guidelines so I'm looking for some guidance here. Also, I haven't implemented the FileSystemBehaviorTestSuite, since it seems to be testing the underlying FileSystem behavior which is provided directly by Flink's HadoopFileSystem wrapped around Google's GoogleHadoopFileSystem, and not really by any code in this PR. But if this should be added, I can do that -- just let me know. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) Yes, this adds dependencies on [google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage) and [gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector) in the flink-gs-fs-hadoop project. These dependencies require a newer version of [guava](https://mvnrepository.com/artifact/com.google.guava/guava) than is present in flink-fs-hadoop-shaded, so this project pulls in a newer version of guava and shades it. - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) No - The serializers: (yes / no / don't know) Yes, adds a new serializer (GsRecoverableWriterStateSerializer) to serialize persisted state of a recoverable write. - The runtime per-record code paths (performance sensitive): (yes / no / don't know) No - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know) No, except that we'll need to document how to properly deploy and use this new file system. - The S3 file system connector: (yes / no / don't know) No ## Documentation - Does this pull request introduce a new feature? (yes / no) Yes. - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) We will need some sort of GS documentation like [this](https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html) documentation for S3. I intend to provide that in a follow-up commit once any changes settle down as part of the code review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
