galenwarren commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r622377812
########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + + /** The underlying blob storage. */ + private final BlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable writer instance. */ + private final GSRecoverableWriter writer; + + /** The recoverable writer state for the commit operation. */ + private final GSRecoverableWriterState state; + + GSRecoverableWriterCommitter( + BlobStorage storage, + GSFileSystemOptions options, + GSRecoverableWriter writer, + GSRecoverableWriterState state) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.writer = Preconditions.checkNotNull(writer); + this.state = Preconditions.checkNotNull(state); + } + + @Override + public void commit() throws IOException { + + // compose all the component blob ids into the final blob id. if the component blob ids are + // in the same bucket as the final blob id, this can be done directly. otherwise, we must + // compose to a new temporary blob id in the same bucket as the component blob ids and + // then copy that blob to the final blob location + if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + + // compose directly to final blob + composeBlobs( + state.getComponentBlobIds(options), + state.finalBlobId, + options.writerContentType); + + } else { + + // compose to a temporary blob id, then copy to final blob id + BlobId intermediateBlobId = state.createTemporaryBlobId(options); + composeBlobs( + state.getComponentBlobIds(options), + intermediateBlobId, + options.writerContentType); + storage.copy(intermediateBlobId, state.finalBlobId); + } + + // clean up after commit + writer.cleanupRecoverableState(state); Review comment: Yes, agreed. I've set up a few test cases locally and watched how the various methods (```cleanupRecoverableState```, ```commit```, etc.) get called using both a [DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html) and a [CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html) with a [StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html). I think I see what's going on here -- I'll share my findings with you and please let me know what you think. First, a class of interest is the [Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java) class, which is written to by a streaming file sink and is responsible for interacting with the underlying storage, via a [BucketWriter](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.html), applying the appropriate [RollingFilePolicy](https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html). Essentially, a bucket accepts elements via ```write``` and writes them to the currently active (or "in progress") writer. The bucket also decides when to "roll" the file, i.e. when to finish writing to the currently in-progress file and start writing to a new one. To keep track of things, internally the bucket maintains two maps: * [inProgressFileRecoverablesPerCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L63), a map from checkpoint id to an [InProgressFileRecoverable](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java#L57). Conceptually, this holds the recoverable for the in-progress writer -- if there is one -- associated with a given checkpoint, and it's populated here, in in [onReceptionOfCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275): ``` if (inProgressPart != null) { inProgressFileRecoverable = inProgressPart.persist(); inProgressFileCreationTime = inProgressPart.getCreationTime(); this.inProgressFileRecoverablesPerCheckpoint.put( checkpointId, inProgressFileRecoverable); } ``` So, recoverables in ```inProgressFileRecoverablesPerCheckpoint``` will result from calls to ```persist```. * [pendingFileRecoverablesPerCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L67), a map from checkpoint id to a list of [PendingFileRecoverable](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java#L60). Conceptually, this holds all recoverables for "pending files", which means files that are no longer being written to, having been rolled. This happens in [closePartFile](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L259): ``` if (inProgressPart != null) { pendingFileRecoverable = inProgressPart.closeForCommit(); pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable); inProgressPart = null; } ``` So, recoverables in ```pendingFileRecoverablesPerCheckpoint``` will result from calls to ```closeForCommit```. Last, consider what happens in [onSuccessfulCompletionOfCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L31): ``` void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException { checkNotNull(bucketWriter); Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it = pendingFileRecoverablesPerCheckpoint .headMap(checkpointId, true) .entrySet() .iterator(); while (it.hasNext()) { Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next(); for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) { bucketWriter.recoverPendingFile(pendingFileRecoverable).commit(); } it.remove(); } cleanupInProgressFileRecoverables(checkpointId); } ``` ... and [cleanupInProgressFileRecoverables](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L336): ``` private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException { Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it = inProgressFileRecoverablesPerCheckpoint .headMap(checkpointId, false) .entrySet() .iterator(); while (it.hasNext()) { final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue(); // this check is redundant, as we only put entries in the // inProgressFileRecoverablesPerCheckpoint map // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but // having it makes // the code more readable. final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable); if (LOG.isDebugEnabled() && successfullyDeleted) { LOG.debug( "Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId); } it.remove(); } }``` ``` First, any pending recoverables (meaning ones resulting from calls to ```closeForCommit```, for rolled files) are committed. This is done for the current checkpoint id and any prior checkpoint ids. Then, any in-progress recoverables (meaning ones resulting from calls to ```persist```, for the then in-progress writer) are cleaned up. This is done for any checkpoints prior to the current one. Importantly, recoverables that result from calls to ```closeForCommit``` do not seem to be cleaned up; only the in-progress ones do. I had been using a ```CheckpointRollingPolicy``` in my tests, and I never saw ```cleanupRecoverableState``` being called on my recoverable writer, even after letting many checkpoints complete. This makes sense to me now -- since the policy I was using closed any in-progress writers on checkpoints, there was never an in-progress writer when the post-checkpoint handling ran, and so that cleanup didn't occur. I tried using a default rolling policy, set to roll faster than the checkpoint interval, and in that case, I did see ```cleanupRecoverableState``` get called, but only for the ones that were in progress at the time of the checkpoint (i.e. not the ones that had rolled). So, this suggests to me that it may be expected that ```commit``` perform its own cleanup, even though I don't see that explicitly stated anywhere. Unless we think this is a bug and that ```cleanupRecoverableState``` should be being called after successful commits ... In any case, you're absolutely right that ```cleanupRecoverableState``` can get called along the way for an active recoverable write, and so we can't delete anything there that we might need later. So my current implementation is wrong. How to fix depends on how we resolve the commit/cleanup issue: * If we think that ```commit``` should perform its own cleanup, then I could remove the cleanup code from ```cleanupRecoverableState``` and put it somewhere where ```commit``` can call it. * If we think that ```cleanupRecoverableState``` should be being called for the commit recoverables, and we feel we can change the existing behavior, then we could leave the code where it is but only perform the cleanup if the supplied recoverable is, in fact, a *commit* recoverable (not a *resume* recoverable) Either way, I'd think we could keep the existing cleanup algorithm, which deletes for non-orphaned and orphaned temporary blobs, based on the name prefix, i.e. if it's safe to delete the non-orphaned temporary blobs, it should be safe to delete orphaned ones, too. Last, you said: > This might have a higher priority than supporting manually recover from an early checkpoint. The conflict you're referring to -- not being able to manually recover from an early checkpoint after a commit if the commit deletes the temporary blobs -- seems like it would be a problem for *any* cleanup scheme where the commit operation, to succeed, strictly requires the temporary files to be present. It was actually this sort of scenario that made me consider an alternate way of handling commit, before, which was to compose the final blob from the temporary blobs *only if it didn't already exist*. My thought was that, if we assume idempotency, it should be ok to skip recreating the final blob if it already exists. And this would allow restoring from earlier checkpoints without failure even after the temporary blobs were deleted on commit. I just thought I'd mention that again in case it could help with this issue ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
