galenwarren opened a new pull request #15599:
URL: https://github.com/apache/flink/pull/15599


   ## What is the purpose of the change
   
   The goal here is to add 
[RecoverableWriter](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html)
 support for Google Storage (GS), to allow writing to GS buckets from Flink's 
[StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html).
 To do this, we must implement  and register a 
[FileSystem](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html)
 implementation for GS and then provide a RecoverableWriter implementation via 
[createRecoverableWriter](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--).
   
   Fortunately, Google supplies a Hadoop 
[FileSystem](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileSystem.html)
 implementation via 
[GoogleHadoopFileSystem](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java),
 which can already be used with Flink. So we can wrap this with Flink's 
[HadoopFileSystem](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html)
 to implement the core file-system functionality.
   
   At a high level, to implement a recoverable writer, one must provide a 
RecoverableWriter implementation that does the following:
   * Creates an output data stream from a URI (i.e. gs://bucket/foo/bar), i.e. 
an implementation of 
[FSDataOutputStream](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FSDataOutputStream.html)
   * Allows writing to the output data stream in multiple chunks
   * Allows persisting the state of the output data stream while the stream is 
open
   * Allows recovering the persisted state of the output data stream, enabling 
writes to resume from that point
   * Supports atomic commit of the final file once the stream is closed
   
   This implementation accomplishes this for GS by writing files to a temporary 
location as data is written to the output stream, and then combining the 
various temporary files together upon commit (and deleting the temporary 
files). Each temporary file is written to GS using the [resumable 
upload](https://cloud.google.com/storage/docs/resumable-uploads) API. The 
recoverable writer state (GSRecoverableWriterState) keeps track of which 
temporary files, in what order, should be combined together to form the final 
file. (The recoverable writer state also keeps track of the final file 
location, the number of bytes written so far, and whether the output stream has 
been closed.)
   
   We considered but rejected the idea of using Google's resumable upload 
support to support the *entire* process of writing a temporary file, i.e. a 
design in which there would be exactly one temporary file uploaded for every 
final file written. We rejected this approach for two reasons:
   * Doing that would have required us to depend on Java serialization to 
persist the 
[WriteChannel](https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html)
 associated with the resumable upload
   * There is a nonconfigurable two-week limit on the duration of a single 
resumable upload, and we didn't want to have to live with that constraint
   
   Instead, our approach (potentially) writes multiple temporary files 
associated with each recoverable write; each time 
[persist](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#persist--)
 is called, any ongoing resumable upload is closed, causing a temporary file to 
be committed, and a new resumable upload is started if/when more bytes are 
written. We thus avoid having to persist WriteChannels and we avoid the 
two-week limit for a recoverable write. Note that each individual temporary 
file must be written within two weeks, which means that checkpoints need to be 
taken at least that frequently, but that doesn't seem like a problematic 
limitation in practice.
   
   When a recoverable write is cleaned up, either on commit or after a failure, 
all of the temporary files associated with that recoverable write are deleted. 
The naming scheme for the temporary files associated with a recoverable write 
is such that we can be sure to delete all temporary files -- even orphaned ones 
that might result from restarting from an earlier save/checkpoint.
   
   To simplify accessing the Google Storage API and to make it mockable for 
unit testing, this code includes a BlobStorage abstraction. This is implemented 
against GS in GSBlobStorage and against an in-memory store in MockBlobStorage.
   
   ## Brief change log
   
   Main changes are:
   * 
[fa060e9](https://github.com/apache/flink/commit/fa060e9135ed61f98670b2eb71b0f911a859ed3c)
 Add flink-gs-fs-hadoop project: Add new project for GS file system and 
recoverable writer with FileSystemFactory wireup.
   * 
[4b8c0d5](https://github.com/apache/flink/commit/4b8c0d53564357f3a87f36aa172e5255bda645ce)
 Add BlobStorage abstraction: Add interfaces to abstract away direct access to 
the Google Storage API, both to simplify that access and to make it mockable.
   * 
[f8bf558](https://github.com/apache/flink/commit/f8bf5581f2a46742a16150a18d764856fd6a5f9f)
 Implement BlobStorage for Google Storage: Add GSBlobStorage, an implementation 
of BlobStorage against the Google Storage API.
   * 
[f2399cd](https://github.com/apache/flink/commit/f2399cd725eeadbbab3abe821f4b43a730a9d17e)
 Add utility functions: Add some utility functions used by the recoverable 
writer. Includes unit tests.
   * 
[0f20b24](https://github.com/apache/flink/commit/0f20b24ef46b550f36519b5dbcc267bedacf260d)
 Implement BlobStorage for unit testing: Add MockBlobStorage, an implementation 
of BlobStorage against an in-memory store.
   * 
[1aac728](https://github.com/apache/flink/commit/1aac728941b3e755f2eb3b9eaac77222a96d6664)
 Implement recoverable writer: Implements RecoverableWriter against the 
BlobStorage abstraction. Includes unit tests.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   * Added unit tests for utility functions
   * Added unit tests for the recoverable writer against a mock in-memory 
BlobStorage implementation (MockBlobStorage), to validate various success and 
failure recoverable-write scenarios.
   
   Note that there are currently no unit tests that validate that GSBlobStorage 
(the concrete implementation of BlobStorage against the GS API) properly 
invokes the underlying API. This API is difficult to mock, as many return 
values are classes that can't be created or extended outside the package. Unit 
tests would be much easier here if we were to use something like Mockito, but 
that is discouraged in the coding guidelines so I'm looking for some guidance 
here.
   
   Also, I haven't implemented the FileSystemBehaviorTestSuite, since it seems 
to be testing the underlying FileSystem behavior which is provided directly by 
Flink's HadoopFileSystem wrapped around Google's GoogleHadoopFileSystem, and 
not really by any code in this PR. But if this should be added, I can do that 
-- just let me know.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     Yes, this adds dependencies on 
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage)
 and 
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector)
 in the flink-gs-fs-hadoop project. These dependencies require a newer version 
of [guava](https://mvnrepository.com/artifact/com.google.guava/guava) than is 
present in flink-fs-hadoop-shaded, so this project pulls in a newer version of 
guava and shades it.  
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
     No
     - The serializers: (yes / no / don't know)
     Yes, adds a new serializer (GsRecoverableWriterStateSerializer) to 
serialize persisted state of a recoverable write.
     - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
     No
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't 
know)
     No, except that we'll need to document how to properly deploy and use this 
new file system.
     - The S3 file system connector: (yes / no / don't know)
     No
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     Yes.
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
     We will need some sort of GS documentation like 
[this](https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html)
 documentation for S3. I intend to provide that in a follow-up commit once any 
changes settle down as part of the code review.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to