This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 713560bbc19 [IOTDB-6062] Pipe: Modify the de/ser method of 
PipeRuntimeMeta to support multi-version evolution (#10536)
713560bbc19 is described below

commit 713560bbc19797128cc42cd474001f269c7539aa
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 13 09:14:16 2023 +0800

    [IOTDB-6062] Pipe: Modify the de/ser method of PipeRuntimeMeta to support 
multi-version evolution (#10536)
---
 .../PipeRuntimeConnectorCriticalException.java     | 30 ++++++--
 .../pipe/PipeRuntimeCriticalException.java         | 28 ++++++--
 .../exception/pipe/PipeRuntimeExceptionType.java   | 19 ++---
 .../pipe/PipeRuntimeNonCriticalException.java      | 28 ++++++--
 .../commons/pipe/task/meta/PipeRuntimeMeta.java    | 76 ++++++++++++++++++--
 .../pipe/task/meta/PipeRuntimeMetaVersion.java     | 82 ++++++++++++++++++++++
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 17 +++--
 .../exception/pipe/PipeRuntimeExceptionTest.java   | 11 ++-
 8 files changed, 251 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
index 6d3315cfbf9..e5a8d71170c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -62,17 +63,32 @@ public class PipeRuntimeConnectorCriticalException extends 
PipeRuntimeCriticalEx
     ReadWriteIOUtils.write(getTimeStamp(), stream);
   }
 
-  public static PipeRuntimeConnectorCriticalException 
deserializeFrom(ByteBuffer byteBuffer) {
+  public static PipeRuntimeConnectorCriticalException deserializeFrom(
+      PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
     final String message = ReadWriteIOUtils.readString(byteBuffer);
-    final long timeStamp = ReadWriteIOUtils.readLong(byteBuffer);
-    return new PipeRuntimeConnectorCriticalException(message, timeStamp);
+    switch (version) {
+      case VERSION_1:
+        return new PipeRuntimeConnectorCriticalException(message);
+      case VERSION_2:
+        return new PipeRuntimeConnectorCriticalException(
+            message, ReadWriteIOUtils.readLong(byteBuffer));
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported 
version %s", version));
+    }
   }
 
-  public static PipeRuntimeConnectorCriticalException 
deserializeFrom(InputStream stream)
-      throws IOException {
+  public static PipeRuntimeConnectorCriticalException deserializeFrom(
+      PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
     final String message = ReadWriteIOUtils.readString(stream);
-    final long timeStamp = ReadWriteIOUtils.readLong(stream);
-    return new PipeRuntimeConnectorCriticalException(message, timeStamp);
+    switch (version) {
+      case VERSION_1:
+        return new PipeRuntimeConnectorCriticalException(message);
+      case VERSION_2:
+        return new PipeRuntimeConnectorCriticalException(
+            message, ReadWriteIOUtils.readLong(stream));
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported 
version %s", version));
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
index b24fe8c0ac3..9f9076be32d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -63,17 +64,30 @@ public class PipeRuntimeCriticalException extends 
PipeRuntimeException {
     ReadWriteIOUtils.write(getTimeStamp(), stream);
   }
 
-  public static PipeRuntimeCriticalException deserializeFrom(ByteBuffer 
byteBuffer) {
+  public static PipeRuntimeCriticalException deserializeFrom(
+      PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
     final String message = ReadWriteIOUtils.readString(byteBuffer);
-    final long timeStamp = ReadWriteIOUtils.readLong(byteBuffer);
-    return new PipeRuntimeCriticalException(message, timeStamp);
+    switch (version) {
+      case VERSION_1:
+        return new PipeRuntimeCriticalException(message);
+      case VERSION_2:
+        return new PipeRuntimeCriticalException(message, 
ReadWriteIOUtils.readLong(byteBuffer));
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported 
version %s", version));
+    }
   }
 
-  public static PipeRuntimeCriticalException deserializeFrom(InputStream 
stream)
-      throws IOException {
+  public static PipeRuntimeCriticalException deserializeFrom(
+      PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
     final String message = ReadWriteIOUtils.readString(stream);
-    final long timeStamp = ReadWriteIOUtils.readLong(stream);
-    return new PipeRuntimeCriticalException(message, timeStamp);
+    switch (version) {
+      case VERSION_1:
+        return new PipeRuntimeCriticalException(message);
+      case VERSION_2:
+        return new PipeRuntimeCriticalException(message, 
ReadWriteIOUtils.readLong(stream));
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported 
version %s", version));
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
index 17d6ec37827..4feb7a9c48f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -50,30 +51,32 @@ public enum PipeRuntimeExceptionType {
     ReadWriteIOUtils.write(type, stream);
   }
 
-  public static PipeRuntimeException deserializeFrom(ByteBuffer byteBuffer) {
+  public static PipeRuntimeException deserializeFrom(
+      PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
     final short type = ReadWriteIOUtils.readShort(byteBuffer);
     switch (type) {
       case 1:
-        return PipeRuntimeNonCriticalException.deserializeFrom(byteBuffer);
+        return PipeRuntimeNonCriticalException.deserializeFrom(version, 
byteBuffer);
       case 2:
-        return PipeRuntimeCriticalException.deserializeFrom(byteBuffer);
+        return PipeRuntimeCriticalException.deserializeFrom(version, 
byteBuffer);
       case 3:
-        return 
PipeRuntimeConnectorCriticalException.deserializeFrom(byteBuffer);
+        return PipeRuntimeConnectorCriticalException.deserializeFrom(version, 
byteBuffer);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported PipeRuntimeException type %s.", type));
     }
   }
 
-  public static PipeRuntimeException deserializeFrom(InputStream stream) 
throws IOException {
+  public static PipeRuntimeException deserializeFrom(
+      PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
     final short type = ReadWriteIOUtils.readShort(stream);
     switch (type) {
       case 1:
-        return PipeRuntimeNonCriticalException.deserializeFrom(stream);
+        return PipeRuntimeNonCriticalException.deserializeFrom(version, 
stream);
       case 2:
-        return PipeRuntimeCriticalException.deserializeFrom(stream);
+        return PipeRuntimeCriticalException.deserializeFrom(version, stream);
       case 3:
-        return PipeRuntimeConnectorCriticalException.deserializeFrom(stream);
+        return PipeRuntimeConnectorCriticalException.deserializeFrom(version, 
stream);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported PipeRuntimeException type %s.", type));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
index 675d1a87f35..64c7a66b2d8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -63,17 +64,30 @@ public class PipeRuntimeNonCriticalException extends 
PipeRuntimeException {
     ReadWriteIOUtils.write(getTimeStamp(), stream);
   }
 
-  public static PipeRuntimeNonCriticalException deserializeFrom(ByteBuffer 
byteBuffer) {
+  public static PipeRuntimeNonCriticalException deserializeFrom(
+      PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
     final String message = ReadWriteIOUtils.readString(byteBuffer);
-    final long timeStamp = ReadWriteIOUtils.readLong(byteBuffer);
-    return new PipeRuntimeNonCriticalException(message, timeStamp);
+    switch (version) {
+      case VERSION_1:
+        return new PipeRuntimeNonCriticalException(message);
+      case VERSION_2:
+        return new PipeRuntimeNonCriticalException(message, 
ReadWriteIOUtils.readLong(byteBuffer));
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported 
version %s", version));
+    }
   }
 
-  public static PipeRuntimeNonCriticalException deserializeFrom(InputStream 
stream)
-      throws IOException {
+  public static PipeRuntimeNonCriticalException deserializeFrom(
+      PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
     final String message = ReadWriteIOUtils.readString(stream);
-    final long timeStamp = ReadWriteIOUtils.readLong(stream);
-    return new PipeRuntimeNonCriticalException(message, timeStamp);
+    switch (version) {
+      case VERSION_1:
+        return new PipeRuntimeNonCriticalException(message);
+      case VERSION_2:
+        return new PipeRuntimeNonCriticalException(message, 
ReadWriteIOUtils.readLong(stream));
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported 
version %s", version));
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
index 056bf3178e6..6de10a08edd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
@@ -100,6 +100,8 @@ public class PipeRuntimeMeta {
   }
 
   public void serialize(DataOutputStream outputStream) throws IOException {
+    PipeRuntimeMetaVersion.VERSION_2.serialize(outputStream);
+
     ReadWriteIOUtils.write(status.get().getType(), outputStream);
 
     // Avoid concurrent modification
@@ -126,6 +128,8 @@ public class PipeRuntimeMeta {
   }
 
   public void serialize(FileOutputStream outputStream) throws IOException {
+    PipeRuntimeMetaVersion.VERSION_2.serialize(outputStream);
+
     ReadWriteIOUtils.write(status.get().getType(), outputStream);
 
     // Avoid concurrent modification
@@ -152,6 +156,38 @@ public class PipeRuntimeMeta {
   }
 
   public static PipeRuntimeMeta deserialize(InputStream inputStream) throws 
IOException {
+    final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(inputStream);
+    final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
+        PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
+    switch (pipeRuntimeMetaVersion) {
+      case VERSION_1:
+        return deserializeVersion1(inputStream, pipeRuntimeVersionByte);
+      case VERSION_2:
+        return deserializeVersion2(inputStream);
+      default:
+        throw new UnsupportedOperationException(
+            "Unknown pipe runtime meta version: " + 
pipeRuntimeMetaVersion.getVersion());
+    }
+  }
+
+  private static PipeRuntimeMeta deserializeVersion1(InputStream inputStream, 
byte pipeStatusByte)
+      throws IOException {
+    final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+    pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeStatusByte));
+
+    final int size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; ++i) {
+      pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
+          new TConsensusGroupId(
+              TConsensusGroupType.DataRegion, 
ReadWriteIOUtils.readInt(inputStream)),
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, 
inputStream));
+    }
+
+    return pipeRuntimeMeta;
+  }
+
+  private static PipeRuntimeMeta deserializeVersion2(InputStream inputStream) 
throws IOException {
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
 
     
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(inputStream)));
@@ -161,14 +197,14 @@ public class PipeRuntimeMeta {
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
           new TConsensusGroupId(
               TConsensusGroupType.DataRegion, 
ReadWriteIOUtils.readInt(inputStream)),
-          PipeTaskMeta.deserialize(inputStream));
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, 
inputStream));
     }
 
     size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       pipeRuntimeMeta.dataNodeId2PipeRuntimeExceptionMap.put(
           ReadWriteIOUtils.readInt(inputStream),
-          PipeRuntimeExceptionType.deserializeFrom(inputStream));
+          
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2, 
inputStream));
     }
 
     
pipeRuntimeMeta.exceptionsClearTime.set(ReadWriteIOUtils.readLong(inputStream));
@@ -177,6 +213,38 @@ public class PipeRuntimeMeta {
   }
 
   public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
+    final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(byteBuffer);
+    final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
+        PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
+    switch (pipeRuntimeMetaVersion) {
+      case VERSION_1:
+        return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte);
+      case VERSION_2:
+        return deserializeVersion2(byteBuffer);
+      default:
+        throw new UnsupportedOperationException(
+            "Unknown pipe runtime meta version: " + 
pipeRuntimeMetaVersion.getVersion());
+    }
+  }
+
+  private static PipeRuntimeMeta deserializeVersion1(
+      ByteBuffer byteBuffer, byte pipeRuntimeVersionByte) {
+    final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+    
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeRuntimeVersionByte));
+
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
+    for (int i = 0; i < size; ++i) {
+      pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
+          new TConsensusGroupId(
+              TConsensusGroupType.DataRegion, 
ReadWriteIOUtils.readInt(byteBuffer)),
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, 
byteBuffer));
+    }
+
+    return pipeRuntimeMeta;
+  }
+
+  public static PipeRuntimeMeta deserializeVersion2(ByteBuffer byteBuffer) {
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
 
     
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
@@ -186,14 +254,14 @@ public class PipeRuntimeMeta {
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
           new TConsensusGroupId(
               TConsensusGroupType.DataRegion, 
ReadWriteIOUtils.readInt(byteBuffer)),
-          PipeTaskMeta.deserialize(byteBuffer));
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, 
byteBuffer));
     }
 
     size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       pipeRuntimeMeta.dataNodeId2PipeRuntimeExceptionMap.put(
           ReadWriteIOUtils.readInt(byteBuffer),
-          PipeRuntimeExceptionType.deserializeFrom(byteBuffer));
+          
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2, 
byteBuffer));
     }
 
     
pipeRuntimeMeta.exceptionsClearTime.set(ReadWriteIOUtils.readLong(byteBuffer));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMetaVersion.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMetaVersion.java
new file mode 100644
index 00000000000..4e92d72c77c
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMetaVersion.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeRuntimeMetaVersion {
+
+  // for compatibility use
+  VERSION_1(PipeStatus.RUNNING.getType()),
+
+  VERSION_2(Byte.MAX_VALUE),
+  ;
+
+  private static final Map<Byte, PipeRuntimeMetaVersion> VERSION_MAP = new 
HashMap<>();
+
+  static {
+    // for compatibility use
+    for (final PipeStatus status : PipeStatus.values()) {
+      VERSION_MAP.put(status.getType(), VERSION_1);
+    }
+
+    for (final PipeRuntimeMetaVersion version : 
PipeRuntimeMetaVersion.values()) {
+      VERSION_MAP.put(version.getVersion(), version);
+    }
+  }
+
+  private final byte version;
+
+  PipeRuntimeMetaVersion(byte version) {
+    this.version = version;
+  }
+
+  public byte getVersion() {
+    return version;
+  }
+
+  public void serialize(FileOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(version, outputStream);
+  }
+
+  public void serialize(DataOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(version, outputStream);
+  }
+
+  public static PipeRuntimeMetaVersion deserialize(InputStream inputStream) 
throws IOException {
+    return deserialize(ReadWriteIOUtils.readByte(inputStream));
+  }
+
+  public static PipeRuntimeMetaVersion deserialize(ByteBuffer byteBuffer) {
+    return deserialize(ReadWriteIOUtils.readByte(byteBuffer));
+  }
+
+  public static PipeRuntimeMetaVersion deserialize(byte version) {
+    return VERSION_MAP.get(version);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 854ea9e7bef..d24ed912bf6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -93,7 +93,9 @@ public class PipeTaskMeta {
 
   public synchronized void serialize(DataOutputStream outputStream) throws 
IOException {
     progressIndex.get().serialize(outputStream);
+
     ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
+
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
     for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
       pipeRuntimeException.serialize(outputStream);
@@ -102,34 +104,41 @@ public class PipeTaskMeta {
 
   public synchronized void serialize(FileOutputStream outputStream) throws 
IOException {
     progressIndex.get().serialize(outputStream);
+
     ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
+
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
     for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
       pipeRuntimeException.serialize(outputStream);
     }
   }
 
-  public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
+  public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version, 
ByteBuffer byteBuffer) {
     final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(byteBuffer);
+
     final int leaderDataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+
     final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderDataNodeId);
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
-          PipeRuntimeExceptionType.deserializeFrom(byteBuffer);
+          PipeRuntimeExceptionType.deserializeFrom(version, byteBuffer);
       pipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
     }
     return pipeTaskMeta;
   }
 
-  public static PipeTaskMeta deserialize(InputStream inputStream) throws 
IOException {
+  public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version, 
InputStream inputStream)
+      throws IOException {
     final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(inputStream);
+
     final int leaderDataNodeId = ReadWriteIOUtils.readInt(inputStream);
+
     final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderDataNodeId);
     final int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
-          PipeRuntimeExceptionType.deserializeFrom(inputStream);
+          PipeRuntimeExceptionType.deserializeFrom(version, inputStream);
       pipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
     }
     return pipeTaskMeta;
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
index 307bdbc1d72..c40903cd1cc 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,7 +37,8 @@ public class PipeRuntimeExceptionTest {
     buffer.position(0);
     try {
       PipeRuntimeNonCriticalException e1 =
-          (PipeRuntimeNonCriticalException) 
PipeRuntimeExceptionType.deserializeFrom(buffer);
+          (PipeRuntimeNonCriticalException)
+              
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2, 
buffer);
       Assert.assertEquals(e.hashCode(), e1.hashCode());
     } catch (ClassCastException classCastException) {
       Assert.fail();
@@ -52,7 +55,8 @@ public class PipeRuntimeExceptionTest {
     buffer.position(0);
     try {
       PipeRuntimeCriticalException e1 =
-          (PipeRuntimeCriticalException) 
PipeRuntimeExceptionType.deserializeFrom(buffer);
+          (PipeRuntimeCriticalException)
+              
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2, 
buffer);
       Assert.assertEquals(e.hashCode(), e1.hashCode());
     } catch (ClassCastException classCastException) {
       Assert.fail();
@@ -70,7 +74,8 @@ public class PipeRuntimeExceptionTest {
     buffer.position(0);
     try {
       PipeRuntimeConnectorCriticalException e1 =
-          (PipeRuntimeConnectorCriticalException) 
PipeRuntimeExceptionType.deserializeFrom(buffer);
+          (PipeRuntimeConnectorCriticalException)
+              
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2, 
buffer);
       Assert.assertEquals(e.hashCode(), e1.hashCode());
     } catch (ClassCastException classCastException) {
       Assert.fail();

Reply via email to