[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497882#comment-16497882
 ] 

ASF GitHub Bot commented on FLINK-9325:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5982#discussion_r192371034
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/TwoPhaseFSDataOutputStream.java
 ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +
    +/**
    + * Operates the output stream in two phrases, any exception during the 
operation of {@link TwoPhaseFSDataOutputStream} will
    + * lead the {@link #targetFile} to be invisible.
    + *
    + * <p>PHASE 1, write the data into the {@link #preparingFile}.
    + * PHASE 2, close the {@link #preparingFile} and rename it to the {@link 
#targetFile}.
    + */
    +@Internal
    +public class TwoPhaseFSDataOutputStream extends 
AtomicCreatingFsDataOutputStream {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseFSDataOutputStream.class);
    +
    +   /**
    +    * the target file system.
    +    */
    +   private final FileSystem fs;
    +
    +   /**
    +    * the target file which the preparing file will be renamed to in the 
{@link #closeAndPublish()}.
    +    */
    +   private final Path targetFile;
    +
    +   /**
    +    * the preparing file to store the on flying data.
    +    */
    +   private final Path preparingFile;
    +
    +   /**
    +    * the output stream of the preparing file.
    +    */
    +   private final FSDataOutputStream preparedOutputStream;
    +
    +   private volatile boolean closed;
    +
    +   public TwoPhaseFSDataOutputStream(FileSystem fs, Path f, 
FileSystem.WriteMode writeMode) throws IOException {
    +
    +           Preconditions.checkArgument(FileSystem.WriteMode.OVERWRITE != 
writeMode, "WriteMode.OVERWRITE is unsupported yet.");
    +
    +           this.fs = fs;
    +           this.targetFile = f;
    +           this.preparingFile = generateTemporaryFilename(f);
    +           this.closed = false;
    +
    +           if (writeMode == FileSystem.WriteMode.NO_OVERWRITE && 
fs.exists(targetFile)) {
    +                   throw new IOException("Target file " + targetFile + " 
is already exists.");
    +           }
    +
    +           this.preparedOutputStream = fs.create(this.preparingFile, 
writeMode);
    +   }
    +
    +   @Override
    +   public long getPos() throws IOException {
    +           return this.preparedOutputStream.getPos();
    +   }
    +
    +   @Override
    +   public void write(int b) throws IOException {
    +           this.preparedOutputStream.write(b);
    +   }
    +
    +   @Override
    +   public void flush() throws IOException {
    +           this.preparedOutputStream.flush();
    +   }
    +
    +   @Override
    +   public void sync() throws IOException {
    +           this.preparedOutputStream.sync();
    +   }
    +
    +   /**
    +    * Does the cleanup things, close the stream and delete the {@link 
#preparingFile}.
    +    */
    +   @Override
    +   public void close() throws IOException {
    +           if (!closed) {
    --- End diff --
    
    Same here.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-9325
>                 URL: https://issues.apache.org/jira/browse/FLINK-9325
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to