This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea4e4981e4e49a59d8013743ea7740909a03f537
Author: Hangxiang Yu <[email protected]>
AuthorDate: Mon Mar 11 21:06:41 2024 +0800

    [FLINK-32079][checkpoint] Support to read/write checkpoint metadata of 
merged files (#24480)
---
 .../metadata/MetadataV2V3SerializerBase.java       |  85 +++++++++++++-
 .../FileMergingOperatorStreamStateHandle.java      |   8 ++
 .../state/filemerging/SegmentFileStateHandle.java  |   4 +-
 .../checkpoint/metadata/CheckpointTestUtils.java   | 128 ++++++++++++++++-----
 4 files changed, 192 insertions(+), 33 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index bd91e8e1fbb..3867ceed0bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
@@ -47,12 +48,18 @@ import 
org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle;
+import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
+import 
org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.EmptySegmentFileStateHandle;
+import 
org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
 
@@ -65,6 +72,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -124,6 +132,15 @@ public abstract class MetadataV2V3SerializerBase {
     // CHANGELOG_HANDLE_V2 is introduced to add new field of checkpointId.
     private static final byte CHANGELOG_HANDLE_V2 = 14;
 
+    // SEGMENT_FILE_HANDLE is introduced to support file merging.
+    private static final byte SEGMENT_FILE_HANDLE = 15;
+
+    // EMPTY_SEGMENT_FILE_HANDLE is introduced as a placeholder for file 
merging.
+    private static final byte EMPTY_SEGMENT_FILE_HANDLE = 16;
+
+    // SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE is introduced for file 
merging of operator state.
+    private static final byte SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE = 17;
+
     // ------------------------------------------------------------------------
     //  (De)serialization entry points
     // ------------------------------------------------------------------------
@@ -582,7 +599,10 @@ public abstract class MetadataV2V3SerializerBase {
     void serializeOperatorStateHandle(OperatorStateHandle stateHandle, 
DataOutputStream dos)
             throws IOException {
         if (stateHandle != null) {
-            dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+            dos.writeByte(
+                    stateHandle instanceof FileMergingOperatorStreamStateHandle
+                            ? SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE
+                            : PARTITIONABLE_OPERATOR_STATE_HANDLE);
             Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap 
=
                     stateHandle.getStateNameToPartitionOffsets();
             dos.writeInt(partitionOffsetsMap.size());
@@ -601,6 +621,19 @@ public abstract class MetadataV2V3SerializerBase {
                     dos.writeLong(offset);
                 }
             }
+            if (stateHandle instanceof FileMergingOperatorStreamStateHandle) {
+                dos.writeUTF(
+                        ((FileMergingOperatorStreamStateHandle) stateHandle)
+                                .getTaskOwnedDirHandle()
+                                .getDirectory()
+                                .toString());
+                dos.writeUTF(
+                        ((FileMergingOperatorStreamStateHandle) stateHandle)
+                                .getSharedDirHandle()
+                                .getDirectory()
+                                .toString());
+                dos.writeBoolean(stateHandle instanceof 
EmptyFileMergingOperatorStreamStateHandle);
+            }
             serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), 
dos);
         } else {
             dos.writeByte(NULL_HANDLE);
@@ -613,7 +646,8 @@ public abstract class MetadataV2V3SerializerBase {
         final int type = dis.readByte();
         if (NULL_HANDLE == type) {
             return null;
-        } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+        } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type
+                || SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
             int mapSize = dis.readInt();
             Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap =
                     CollectionUtil.newHashMapWithExpectedSize(mapSize);
@@ -632,8 +666,31 @@ public abstract class MetadataV2V3SerializerBase {
                         new OperatorStateHandle.StateMetaInfo(offsets, mode);
                 offsetsMap.put(key, metaInfo);
             }
-            StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, 
context);
-            return new OperatorStreamStateHandle(offsetsMap, stateHandle);
+            if (SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+                String taskOwnedDirPathStr = dis.readUTF();
+                String sharedDirPathStr = dis.readUTF();
+                boolean isEmpty = dis.readBoolean();
+                StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis, context);
+                Preconditions.checkArgument(stateHandle instanceof 
SegmentFileStateHandle);
+                return isEmpty
+                        ? new EmptyFileMergingOperatorStreamStateHandle(
+                                DirectoryStreamStateHandle.forPathWithZeroSize(
+                                        new 
File(taskOwnedDirPathStr).toPath()),
+                                DirectoryStreamStateHandle.forPathWithZeroSize(
+                                        new File(sharedDirPathStr).toPath()),
+                                offsetsMap,
+                                stateHandle)
+                        : new FileMergingOperatorStreamStateHandle(
+                                DirectoryStreamStateHandle.forPathWithZeroSize(
+                                        new 
File(taskOwnedDirPathStr).toPath()),
+                                DirectoryStreamStateHandle.forPathWithZeroSize(
+                                        new File(sharedDirPathStr).toPath()),
+                                offsetsMap,
+                                stateHandle);
+            } else {
+                StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis, context);
+                return new OperatorStreamStateHandle(offsetsMap, stateHandle);
+            }
         } else {
             throw new IllegalStateException("Reading invalid 
OperatorStateHandle, type: " + type);
         }
@@ -677,6 +734,18 @@ public abstract class MetadataV2V3SerializerBase {
             RelativeFileStateHandle relativeFileStateHandle = 
(RelativeFileStateHandle) stateHandle;
             dos.writeUTF(relativeFileStateHandle.getRelativePath());
             dos.writeLong(relativeFileStateHandle.getStateSize());
+        } else if (stateHandle instanceof SegmentFileStateHandle) {
+            if (stateHandle instanceof EmptySegmentFileStateHandle) {
+                dos.writeByte(EMPTY_SEGMENT_FILE_HANDLE);
+            } else {
+                dos.writeByte(SEGMENT_FILE_HANDLE);
+                SegmentFileStateHandle segmentFileStateHandle =
+                        (SegmentFileStateHandle) stateHandle;
+                dos.writeLong(segmentFileStateHandle.getStartPos());
+                dos.writeLong(segmentFileStateHandle.getStateSize());
+                dos.writeInt(segmentFileStateHandle.getScope().ordinal());
+                dos.writeUTF(segmentFileStateHandle.getFilePath().toString());
+            }
         } else if (stateHandle instanceof FileStateHandle) {
             dos.writeByte(FILE_STREAM_STATE_HANDLE);
             FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
@@ -745,6 +814,14 @@ public abstract class MetadataV2V3SerializerBase {
                     new KeyGroupRangeOffsets(keyGroupRange, offsets);
             StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, 
context);
             return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+        } else if (SEGMENT_FILE_HANDLE == type) {
+            long startPos = dis.readLong();
+            long stateSize = dis.readLong();
+            CheckpointedStateScope scope = 
CheckpointedStateScope.values()[dis.readInt()];
+            Path physicalFilePath = new Path(dis.readUTF());
+            return new SegmentFileStateHandle(physicalFilePath, startPos, 
stateSize, scope);
+        } else if (EMPTY_SEGMENT_FILE_HANDLE == type) {
+            return EmptySegmentFileStateHandle.INSTANCE;
         } else {
             throw new IOException("Unknown implementation of 
StreamStateHandle, code: " + type);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java
index ada378f2d03..11cbefb664d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java
@@ -116,6 +116,14 @@ public class FileMergingOperatorStreamStateHandle extends 
OperatorStreamStateHan
         return getDelegateStateHandle().getStateSize();
     }
 
+    public DirectoryStreamStateHandle getSharedDirHandle() {
+        return sharedDirHandle;
+    }
+
+    public DirectoryStreamStateHandle getTaskOwnedDirHandle() {
+        return taskOwnedDirHandle;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
index 2a3aa2f49fd..9c5d5a47caf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
@@ -35,8 +35,6 @@ import java.util.Optional;
  * {@link FileStateHandle} for state that was written to a file segment. A 
{@link
  * SegmentFileStateHandle} represents a {@link LogicalFile}, which has already 
been written to a
  * segment in a physical file.
- *
- * <p>TODO (FLINK-32079): serialization and deserialization of {@link 
SegmentFileStateHandle}.
  */
 public class SegmentFileStateHandle implements StreamStateHandle {
 
@@ -133,7 +131,7 @@ public class SegmentFileStateHandle implements 
StreamStateHandle {
             return true;
         }
 
-        if (o == null || getClass() != o.getClass()) {
+        if (!(o instanceof SegmentFileStateHandle)) {
             return false;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index 8cb160310fe..079fd16c2d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DiscardRecordedStateObject;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -38,11 +40,16 @@ import 
org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestingRelativeFileStateHandle;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
+import 
org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
+import 
org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -140,36 +147,13 @@ public class CheckpointTestUtils {
         boolean isIncremental = random.nextInt(3) == 0;
 
         for (int subtaskIdx : subtasksToSet) {
-            StreamStateHandle operatorStateBackend =
-                    new ByteStreamStateHandle(
-                            "b", 
("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
-            StreamStateHandle operatorStateStream =
-                    new ByteStreamStateHandle(
-                            "b", 
("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
-
-            Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new 
HashMap<>();
-            offsetsMap.put(
-                    "A",
-                    new OperatorStateHandle.StateMetaInfo(
-                            new long[] {0, 10, 20}, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-            offsetsMap.put(
-                    "B",
-                    new OperatorStateHandle.StateMetaInfo(
-                            new long[] {30, 40, 50}, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-            offsetsMap.put(
-                    "C",
-                    new OperatorStateHandle.StateMetaInfo(
-                            new long[] {60, 70, 80}, 
OperatorStateHandle.Mode.UNION));
-
             final OperatorSubtaskState.Builder state = 
OperatorSubtaskState.builder();
             if (hasOperatorStateBackend) {
-                state.setManagedOperatorState(
-                        new OperatorStreamStateHandle(offsetsMap, 
operatorStateBackend));
+                
state.setManagedOperatorState(createDummyOperatorStreamStateHandle(random));
             }
 
             if (hasOperatorStateStream) {
-                state.setRawOperatorState(
-                        new OperatorStreamStateHandle(offsetsMap, 
operatorStateStream));
+                
state.setRawOperatorState(createDummyOperatorStreamStateHandle(random));
             }
 
             if (hasKeyedBackend) {
@@ -209,6 +193,45 @@ public class CheckpointTestUtils {
         }
     }
 
+    private static OperatorStreamStateHandle 
createDummyOperatorStreamStateHandle(Random rnd) {
+        Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new 
HashMap<>();
+        offsetsMap.put(
+                "A",
+                new OperatorStateHandle.StateMetaInfo(
+                        new long[] {0, 10, 20}, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+        offsetsMap.put(
+                "B",
+                new OperatorStateHandle.StateMetaInfo(
+                        new long[] {30, 40, 50}, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+        offsetsMap.put(
+                "C",
+                new OperatorStateHandle.StateMetaInfo(
+                        new long[] {60, 70, 80}, 
OperatorStateHandle.Mode.UNION));
+
+        boolean enableFileMerging = rnd.nextBoolean();
+        if (enableFileMerging) {
+            DirectoryStreamStateHandle taskOwnedDirHandle =
+                    DirectoryStreamStateHandle.forPathWithZeroSize(
+                            new 
File(String.valueOf(createRandomUUID(rnd))).toPath());
+            DirectoryStreamStateHandle sharedDirHandle =
+                    DirectoryStreamStateHandle.forPathWithZeroSize(
+                            new 
File(String.valueOf(createRandomUUID(rnd))).toPath());
+            return rnd.nextBoolean()
+                    ? new FileMergingOperatorStreamStateHandle(
+                            taskOwnedDirHandle,
+                            sharedDirHandle,
+                            offsetsMap,
+                            createDummySegmentFileStateHandle(rnd, false))
+                    : EmptyFileMergingOperatorStreamStateHandle.create(
+                            taskOwnedDirHandle, sharedDirHandle);
+        } else {
+            StreamStateHandle operatorStateStream =
+                    new ByteStreamStateHandle(
+                            "b", 
("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
+            return new OperatorStreamStateHandle(offsetsMap, 
operatorStateStream);
+        }
+    }
+
     private static boolean isSavepoint(String basePath) {
         return basePath != null;
     }
@@ -263,11 +286,15 @@ public class CheckpointTestUtils {
     }
 
     public static List<HandleAndLocalPath> 
createRandomHandleAndLocalPathList(Random rnd) {
+        boolean enableFileMerging = rnd.nextBoolean();
         final int size = rnd.nextInt(4);
         List<HandleAndLocalPath> result = new ArrayList<>(size);
         for (int i = 0; i < size; ++i) {
             String localPath = createRandomUUID(rnd).toString();
-            StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd, 
null);
+            StreamStateHandle stateHandle =
+                    enableFileMerging
+                            ? createDummySegmentFileStateHandle(rnd)
+                            : createDummyStreamStateHandle(rnd, null);
             result.add(HandleAndLocalPath.of(stateHandle, localPath));
         }
 
@@ -308,6 +335,55 @@ public class CheckpointTestUtils {
         }
     }
 
+    private static StreamStateHandle createDummySegmentFileStateHandle(Random 
rnd) {
+        return createDummySegmentFileStateHandle(rnd, rnd.nextBoolean());
+    }
+
+    private static StreamStateHandle createDummySegmentFileStateHandle(
+            Random rnd, boolean isEmpty) {
+        return isEmpty
+                ? TestingSegmentFileStateHandle.EMPTY_INSTANCE
+                : new TestingSegmentFileStateHandle(
+                        new Path(String.valueOf(createRandomUUID(rnd))),
+                        0,
+                        1,
+                        CheckpointedStateScope.SHARED);
+    }
+
+    private static class TestingSegmentFileStateHandle extends 
SegmentFileStateHandle
+            implements DiscardRecordedStateObject {
+
+        private static final long serialVersionUID = 1L;
+
+        private static final TestingSegmentFileStateHandle EMPTY_INSTANCE =
+                new TestingSegmentFileStateHandle(
+                        new Path("empty"), 0, 0, 
CheckpointedStateScope.EXCLUSIVE);
+
+        private boolean disposed;
+
+        public TestingSegmentFileStateHandle(
+                Path filePath, long startPos, long stateSize, 
CheckpointedStateScope scope) {
+            super(filePath, startPos, stateSize, scope);
+        }
+
+        @Override
+        public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+            // Collect to LOCAL_MEMORY for test
+            collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize());
+        }
+
+        @Override
+        public void discardState() {
+            super.discardState();
+            disposed = true;
+        }
+
+        @Override
+        public boolean isDisposed() {
+            return disposed;
+        }
+    }
+
     private static UUID createRandomUUID(Random rnd) {
         return new UUID(rnd.nextLong(), rnd.nextLong());
     }

Reply via email to