xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r620837563



##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer instance. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state for the commit operation. */
+    private final GSRecoverableWriterState state;
+
+    GSRecoverableWriterCommitter(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // compose all the component blob ids into the final blob id. if the 
component blob ids are
+        // in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+        // compose to a new temporary blob id in the same bucket as the 
component blob ids and
+        // then copy that blob to the final blob location
+        if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+            // compose directly to final blob
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    state.finalBlobId,
+                    options.writerContentType);
+
+        } else {
+
+            // compose to a temporary blob id, then copy to final blob id
+            BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    intermediateBlobId,
+                    options.writerContentType);
+            storage.copy(intermediateBlobId, state.finalBlobId);
+        }
+
+        // clean up after commit
+        writer.cleanupRecoverableState(state);

Review comment:
       On second thought, if we cannot clean anything in 
`cleanupRecoverableState`, it might be our only chance to clean the temporary 
blobs on committing. This might have a higher priority than supporting manually 
recover from an early checkpoint.
   
   I think the following issues are closely related and really need to be 
clarified consistently.
   - What are the relationships between snapshots (resumables) and temporary 
blobs
   - Actions for `cleanupRecoverableState`
   - Actions for `commit`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to