galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r622377812



##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer instance. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state for the commit operation. */
+    private final GSRecoverableWriterState state;
+
+    GSRecoverableWriterCommitter(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // compose all the component blob ids into the final blob id. if the 
component blob ids are
+        // in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+        // compose to a new temporary blob id in the same bucket as the 
component blob ids and
+        // then copy that blob to the final blob location
+        if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+            // compose directly to final blob
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    state.finalBlobId,
+                    options.writerContentType);
+
+        } else {
+
+            // compose to a temporary blob id, then copy to final blob id
+            BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    intermediateBlobId,
+                    options.writerContentType);
+            storage.copy(intermediateBlobId, state.finalBlobId);
+        }
+
+        // clean up after commit
+        writer.cleanupRecoverableState(state);

Review comment:
       Yes, agreed. I've set up a few test cases locally and watched how the 
various methods (```cleanupRecoverableState```, ```commit```, etc.) get called 
using both a 
[DefaultRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)
 and a 
[CheckpointRollingPolicy](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.html)
 with a 
[StreamingFileSink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html).
 I think I see what's going on here -- I'll share my findings with you and 
please let me know what you think.
   
   First, a class of interest is the 
[Bucket](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java)
 class, which is written to by a streaming file sink and is responsible for 
interacting with the underlying storage, via a 
[BucketWriter](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.html),
 applying the appropriate 
[RollingFilePolicy](https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html).
 
   
   Essentially, a bucket accepts elements via ```write``` and writes them to 
the currently active (or "in progress") writer. The bucket also decides when to 
"roll" the file, i.e. when to finish writing to the currently in-progress file 
and start writing to a new one. To keep track of things, internally the bucket 
maintains two maps:
   
   * 
[inProgressFileRecoverablesPerCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L63),
 a map from checkpoint id to an 
[InProgressFileRecoverable](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java#L57).
 Conceptually, this holds the recoverable for the in-progress writer -- if 
there is one -- associated with a given checkpoint, and it's populated here, in 
in 
[onReceptionOfCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275):
   
   ```
           if (inProgressPart != null) {
               inProgressFileRecoverable = inProgressPart.persist();
               inProgressFileCreationTime = inProgressPart.getCreationTime();
               this.inProgressFileRecoverablesPerCheckpoint.put(
                       checkpointId, inProgressFileRecoverable);
           }
       
   ```
   
   So, recoverables in ```inProgressFileRecoverablesPerCheckpoint``` will 
result from calls to ```persist```.
   
   * 
[pendingFileRecoverablesPerCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L67),
 a map from checkpoint id to a list of 
[PendingFileRecoverable](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java#L60).
 Conceptually, this holds all recoverables for "pending files", which means 
files that are no longer being written to, having been rolled. This happens in 
[closePartFile](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L259):
   
   ```
           if (inProgressPart != null) {
               pendingFileRecoverable = inProgressPart.closeForCommit();
               
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
               inProgressPart = null;
           }
   ```
   
   So, recoverables in ```pendingFileRecoverablesPerCheckpoint``` will result 
from calls to ```closeForCommit```.
   
   Last, consider what happens in 
[onSuccessfulCompletionOfCheckpoint](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L31):
   
   ```
       void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws 
IOException {
           checkNotNull(bucketWriter);
   
           Iterator<Map.Entry<Long, 
List<InProgressFileWriter.PendingFileRecoverable>>> it =
                   pendingFileRecoverablesPerCheckpoint
                           .headMap(checkpointId, true)
                           .entrySet()
                           .iterator();
   
           while (it.hasNext()) {
               Map.Entry<Long, 
List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
   
               for (InProgressFileWriter.PendingFileRecoverable 
pendingFileRecoverable :
                       entry.getValue()) {
                   
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
               }
               it.remove();
           }
   
           cleanupInProgressFileRecoverables(checkpointId);
       }
   ```
   
   ... and 
[cleanupInProgressFileRecoverables](https://github.com/apache/flink/blob/11f5abc41057cf119548cb03dfcc97d755e0029a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L336):
   
   ```
       private void cleanupInProgressFileRecoverables(long checkpointId) throws 
IOException {
           Iterator<Map.Entry<Long, 
InProgressFileWriter.InProgressFileRecoverable>> it =
                   inProgressFileRecoverablesPerCheckpoint
                           .headMap(checkpointId, false)
                           .entrySet()
                           .iterator();
   
           while (it.hasNext()) {
               final InProgressFileWriter.InProgressFileRecoverable 
inProgressFileRecoverable =
                       it.next().getValue();
   
               // this check is redundant, as we only put entries in the
               // inProgressFileRecoverablesPerCheckpoint map
               // list when the 
requiresCleanupOfInProgressFileRecoverableState() returns true, but
               // having it makes
               // the code more readable.
   
               final boolean successfullyDeleted =
                       
bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
               if (LOG.isDebugEnabled() && successfullyDeleted) {
                   LOG.debug(
                           "Subtask {} successfully deleted incomplete part for 
bucket id={}.",
                           subtaskIndex,
                           bucketId);
               }
               it.remove();
           }
       }```
   ```
   
   First, any pending recoverables (meaning ones resulting from calls to 
```closeForCommit```, for rolled files) are committed. This is done for the 
current checkpoint id and any prior checkpoint ids. Then, any in-progress 
recoverables (meaning ones resulting from calls to ```persist```, for the then 
in-progress writer) are cleaned up. This is done for any checkpoints prior to 
the current one. Importantly, pending recoverables -- i.e. ones that result 
from calls to ```closeForCommit``` -- do not seem to be cleaned up; only the 
in-progress ones do.
   
   I had been using a ```CheckpointRollingPolicy``` in my tests, and I never 
saw ```cleanupRecoverableState``` being called on my recoverable writer, even 
after letting many checkpoints complete. This makes sense to me now -- since 
the policy I was using closed any in-progress writers on checkpoints, there was 
never an in-progress writer when the post-checkpoint handling ran, and so that 
cleanup didn't occur.
   
   I tried using a default rolling policy, set to roll faster than the 
checkpoint interval, and in that case, I did see ```cleanupRecoverableState``` 
get called, but only for the ones that were in progress at the time of the 
checkpoint (i.e. not the ones that had rolled).
   
   So, this suggests to me that it may be expected that ```commit``` perform 
its own cleanup, even though I don't see that explicitly stated anywhere. 
Unless we think this is a bug and that ```cleanupRecoverableState``` should be 
being called after successful commits ...
   
   In any case, you're absolutely right that ```cleanupRecoverableState``` can 
get called along the way for an active recoverable write, and so we can't 
delete anything there that we might need later.  So my current implementation 
is wrong. How to fix depends on how we resolve the commit/cleanup issue:
   * If we think that ```commit``` should perform its own cleanup, then I could 
remove the cleanup code from ```cleanupRecoverableState``` and put it somewhere 
where ```commit``` can call it.
   * If we think that ```cleanupRecoverableState``` should be being called for 
the commit recoverables, and we feel we can change the existing behavior, then 
we could leave the code where it is but only perform the cleanup if the 
supplied recoverable is, in fact, a *commit* recoverable (not a *resume* 
recoverable)
   
   Either way, I'd think we could keep the existing cleanup algorithm, which 
deletes for non-orphaned and orphaned temporary blobs, based on the name 
prefix, i.e. if it's safe to delete the non-orphaned temporary blobs, it should 
be safe to delete orphaned ones, too.
   
   Last, you said:
   
   > This might have a higher priority than supporting manually recover from an 
early checkpoint.
   
   The conflict you're referring to -- not being able to manually recover from 
an early checkpoint after a commit if the commit deletes the temporary blobs -- 
seems like it would be a problem for *any* cleanup scheme where the commit 
operation, to succeed, strictly requires the temporary files to be present. 
   
   It was actually this sort of scenario that made me consider an alternate way 
of handling commit, before, which was to compose the final blob from the 
temporary blobs *only if the final blob didn't already exist*. My thought was 
that, if we assume idempotency, it should be ok to skip recreating the final 
blob if it already exists. And this would allow restoring from earlier 
checkpoints without failure even after the temporary blobs were deleted on 
commit. 
   
   I just thought I'd mention that again in case it could help with this issue 
...
   
   
   
   
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to