Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r170238910
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
 ---
    @@ -0,0 +1,287 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +
    +/**
    + * A CheckpointStateOutputStream that wraps a primary and a secondary 
CheckpointStateOutputStream and duplicates
    + * all writes into both streams. This stream applies buffering to reduce 
the amount of dual-method calling. Furthermore,
    + * exceptions that happen in interactions with the secondary stream are 
not exposed, until the user calls
    + * {@link #closeAndGetSecondaryHandle()}. In contrast to that, exceptions 
from interactions with the primary stream
    + * are immediately returned to the user. This class is used to write state 
for local recovery as a local (secondary)
    + * copy of the (primary) state snapshot that is written to a (slower but 
highly-available) remote filesystem.
    + */
    +public class DuplicatingCheckpointOutputStream extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
    +
    +   /** Flag if the positional alignment of both streams is checked after 
each operation. */
    +   private static final boolean STRICT_ALIGNMENT_CHECKS = false;
    +
    +   /** Default buffer size of 8KB. */
    +   private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
    +
    +   /** Write buffer. */
    +   private final byte[] buffer;
    +
    +   /** Position in the write buffer. */
    +   private int bufferIdx;
    +
    +   /**
    +    * Primary stream for writing the checkpoint data. Failures from this 
stream are forwarded.
    +    */
    +   private final CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream;
    +
    +   /**
    +    * Primary stream for writing the checkpoint data. Failures from this 
stream are not forwarded until
    +    * {@link #closeAndGetSecondaryHandle()}.
    +    */
    +   private final CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream;
    +
    +   /**
    +    * Stores a potential exception that occurred while interacting with 
{@link #secondaryOutputStream}
    +    */
    +   private IOException secondaryStreamException;
    +
    +   public DuplicatingCheckpointOutputStream(
    +           CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
    +           CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream) throws IOException {
    +           this(primaryOutputStream, secondaryOutputStream, 
DEFAULT_BUFFER_SIZER);
    +   }
    +
    +   public DuplicatingCheckpointOutputStream(
    +           CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
    +           CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream,
    +           int bufferSize) throws IOException {
    +
    +           this.primaryOutputStream = 
Preconditions.checkNotNull(primaryOutputStream);
    +           this.secondaryOutputStream = 
Preconditions.checkNotNull(secondaryOutputStream);
    +
    +           this.buffer = new byte[bufferSize];
    +           this.bufferIdx = 0;
    +
    +           this.secondaryStreamException = null;
    +
    +           checkForAlignedStreamPositions();
    +   }
    +
    +   @Override
    +   public void write(int b) throws IOException {
    +
    +           if (buffer.length <= bufferIdx) {
    +                   flushInternalBuffer();
    +           }
    +
    +           buffer[bufferIdx] = (byte) b;
    +           ++bufferIdx;
    +   }
    +
    +   @Override
    +   public void write(byte[] b) throws IOException {
    +
    +           write(b, 0, b.length);
    +   }
    +
    +   @Override
    +   public void write(byte[] b, int off, int len) throws IOException {
    +
    +           if (buffer.length <= len) {
    +
    +                   flushInternalBuffer();
    +                   writeThroughInternal(b, off, len);
    +           } else {
    +
    +                   if (buffer.length < len + bufferIdx) {
    +                           flushInternalBuffer();
    +                   }
    +
    +                   System.arraycopy(b, off, buffer, bufferIdx, len);
    +                   bufferIdx += len;
    +           }
    +   }
    +
    +   @Override
    +   public long getPos() throws IOException {
    +           final long referencePos = primaryOutputStream.getPos();
    +           return referencePos + bufferIdx;
    +   }
    +
    +   @Override
    +   public void flush() throws IOException {
    +
    +           flushInternalBuffer();
    +           primaryOutputStream.flush();
    +
    +           if (secondaryStreamException == null) {
    +                   try {
    +                           secondaryOutputStream.flush();
    +                   } catch (IOException flushEx) {
    +                           handleSecondaryStreamOnException(flushEx);
    --- End diff --
    
    Why not just catch `throwable` for `secondaryOutputStream` ? Maybe some 
`RuntimeException` could throw out from it.


---

Reply via email to