AlexYinHan commented on code in PR #23514: URL: https://github.com/apache/flink/pull/23514#discussion_r1403094467
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * A {@link CheckpointStateOutputStream} that writes into a segment of a file and returns a {@link + * SegmentFileStateHandle} upon closing. Multiple {@link FileMergingCheckpointStateOutputStream} + * objects can reuse the same underlying file, so that the checkpoint files are merged. + * + * <p><strong>Important</strong>: This implementation is NOT thread-safe. Multiple data streams + * multiplexing the same file should NOT write concurrently. Instead, it is expected that only after + * one data stream is closed, will other data streams reuse and write to the same underlying file. + */ +public class FileMergingCheckpointStateOutputStream extends CheckpointStateOutputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(FileMergingCheckpointStateOutputStream.class); + + /** + * A proxy of the {@link FileMergingSnapshotManager} that owns this {@link + * FileMergingCheckpointStateOutputStream}, with the interfaces for dealing with physical files. + */ + public abstract static class FileMergingSnapshotManagerProxy { + /** + * Provide a physical file. + * + * @return Output stream and path of the physical file. + * @throws IOException if the physical file cannot be created or opened. + */ + public abstract Tuple2<FSDataOutputStream, Path> providePhysicalFile() throws IOException; + + /** + * Close the stream and create a {@link SegmentFileStateHandle} for a file segment. + * + * @param filePath Path of the physical file. + * @param startPos Start position of the segment in the physical file. + * @param stateSize Size of the segment. + * @return The state handle of the segment. + * @throws IOException if any exception happens when closing the file. + */ + public abstract SegmentFileStateHandle closeStreamAndCreateStateHandle( + Path filePath, long startPos, long stateSize) throws IOException; + + /** + * Notify the {@link FileMergingSnapshotManager} that the stream is closed exceptionally. + * + * @throws IOException if any exception happens when deleting the file. + */ + public abstract void closeStreamExceptionally() throws IOException; + } + + private final FileMergingSnapshotManagerProxy fileMergingSnapshotManagerProxy; + + private volatile boolean closed; + + /** path of the underlying physical file. */ + private Path filePath; + + /** the stream that writes to the underlying physical file. */ + private @Nullable FSDataOutputStream outputStream; + + /** start position in the physical file. */ + private long startPos; + + /** current position relative to startPos. */ + long curPosRelative = 0; Review Comment: Yes. I have added the above information in the code comments. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle; +import org.apache.flink.runtime.state.CheckpointStateOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * A {@link CheckpointStateOutputStream} that writes into a segment of a file and returns a {@link + * SegmentFileStateHandle} upon closing. Multiple {@link FileMergingCheckpointStateOutputStream} + * objects can reuse the same underlying file, so that the checkpoint files are merged. + * + * <p><strong>Important</strong>: This implementation is NOT thread-safe. Multiple data streams + * multiplexing the same file should NOT write concurrently. Instead, it is expected that only after + * one data stream is closed, will other data streams reuse and write to the same underlying file. + */ +public class FileMergingCheckpointStateOutputStream extends CheckpointStateOutputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(FileMergingCheckpointStateOutputStream.class); + + /** + * A proxy of the {@link FileMergingSnapshotManager} that owns this {@link + * FileMergingCheckpointStateOutputStream}, with the interfaces for dealing with physical files. + */ + public abstract static class FileMergingSnapshotManagerProxy { Review Comment: Right. I've changed that into an interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
