[
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497882#comment-16497882
]
ASF GitHub Bot commented on FLINK-9325:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5982#discussion_r192371034
--- Diff:
flink-core/src/main/java/org/apache/flink/core/fs/TwoPhaseFSDataOutputStream.java
---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Operates the output stream in two phrases, any exception during the
operation of {@link TwoPhaseFSDataOutputStream} will
+ * lead the {@link #targetFile} to be invisible.
+ *
+ * <p>PHASE 1, write the data into the {@link #preparingFile}.
+ * PHASE 2, close the {@link #preparingFile} and rename it to the {@link
#targetFile}.
+ */
+@Internal
+public class TwoPhaseFSDataOutputStream extends
AtomicCreatingFsDataOutputStream {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TwoPhaseFSDataOutputStream.class);
+
+ /**
+ * the target file system.
+ */
+ private final FileSystem fs;
+
+ /**
+ * the target file which the preparing file will be renamed to in the
{@link #closeAndPublish()}.
+ */
+ private final Path targetFile;
+
+ /**
+ * the preparing file to store the on flying data.
+ */
+ private final Path preparingFile;
+
+ /**
+ * the output stream of the preparing file.
+ */
+ private final FSDataOutputStream preparedOutputStream;
+
+ private volatile boolean closed;
+
+ public TwoPhaseFSDataOutputStream(FileSystem fs, Path f,
FileSystem.WriteMode writeMode) throws IOException {
+
+ Preconditions.checkArgument(FileSystem.WriteMode.OVERWRITE !=
writeMode, "WriteMode.OVERWRITE is unsupported yet.");
+
+ this.fs = fs;
+ this.targetFile = f;
+ this.preparingFile = generateTemporaryFilename(f);
+ this.closed = false;
+
+ if (writeMode == FileSystem.WriteMode.NO_OVERWRITE &&
fs.exists(targetFile)) {
+ throw new IOException("Target file " + targetFile + "
is already exists.");
+ }
+
+ this.preparedOutputStream = fs.create(this.preparingFile,
writeMode);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.preparedOutputStream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ this.preparedOutputStream.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.preparedOutputStream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ this.preparedOutputStream.sync();
+ }
+
+ /**
+ * Does the cleanup things, close the stream and delete the {@link
#preparingFile}.
+ */
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
--- End diff --
Same here.
> generate the _meta file for checkpoint only when the writing is truly
> successful
> --------------------------------------------------------------------------------
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
> Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is
> totally successful. We should write the metadata file first to a temp file
> and then atomically rename it (with an equivalent workaround for S3).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)