This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 713560bbc19 [IOTDB-6062] Pipe: Modify the de/ser method of
PipeRuntimeMeta to support multi-version evolution (#10536)
713560bbc19 is described below
commit 713560bbc19797128cc42cd474001f269c7539aa
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 13 09:14:16 2023 +0800
[IOTDB-6062] Pipe: Modify the de/ser method of PipeRuntimeMeta to support
multi-version evolution (#10536)
---
.../PipeRuntimeConnectorCriticalException.java | 30 ++++++--
.../pipe/PipeRuntimeCriticalException.java | 28 ++++++--
.../exception/pipe/PipeRuntimeExceptionType.java | 19 ++---
.../pipe/PipeRuntimeNonCriticalException.java | 28 ++++++--
.../commons/pipe/task/meta/PipeRuntimeMeta.java | 76 ++++++++++++++++++--
.../pipe/task/meta/PipeRuntimeMetaVersion.java | 82 ++++++++++++++++++++++
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 17 +++--
.../exception/pipe/PipeRuntimeExceptionTest.java | 11 ++-
8 files changed, 251 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
index 6d3315cfbf9..e5a8d71170c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -62,17 +63,32 @@ public class PipeRuntimeConnectorCriticalException extends
PipeRuntimeCriticalEx
ReadWriteIOUtils.write(getTimeStamp(), stream);
}
- public static PipeRuntimeConnectorCriticalException
deserializeFrom(ByteBuffer byteBuffer) {
+ public static PipeRuntimeConnectorCriticalException deserializeFrom(
+ PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
final String message = ReadWriteIOUtils.readString(byteBuffer);
- final long timeStamp = ReadWriteIOUtils.readLong(byteBuffer);
- return new PipeRuntimeConnectorCriticalException(message, timeStamp);
+ switch (version) {
+ case VERSION_1:
+ return new PipeRuntimeConnectorCriticalException(message);
+ case VERSION_2:
+ return new PipeRuntimeConnectorCriticalException(
+ message, ReadWriteIOUtils.readLong(byteBuffer));
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupported
version %s", version));
+ }
}
- public static PipeRuntimeConnectorCriticalException
deserializeFrom(InputStream stream)
- throws IOException {
+ public static PipeRuntimeConnectorCriticalException deserializeFrom(
+ PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
final String message = ReadWriteIOUtils.readString(stream);
- final long timeStamp = ReadWriteIOUtils.readLong(stream);
- return new PipeRuntimeConnectorCriticalException(message, timeStamp);
+ switch (version) {
+ case VERSION_1:
+ return new PipeRuntimeConnectorCriticalException(message);
+ case VERSION_2:
+ return new PipeRuntimeConnectorCriticalException(
+ message, ReadWriteIOUtils.readLong(stream));
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupported
version %s", version));
+ }
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
index b24fe8c0ac3..9f9076be32d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -63,17 +64,30 @@ public class PipeRuntimeCriticalException extends
PipeRuntimeException {
ReadWriteIOUtils.write(getTimeStamp(), stream);
}
- public static PipeRuntimeCriticalException deserializeFrom(ByteBuffer
byteBuffer) {
+ public static PipeRuntimeCriticalException deserializeFrom(
+ PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
final String message = ReadWriteIOUtils.readString(byteBuffer);
- final long timeStamp = ReadWriteIOUtils.readLong(byteBuffer);
- return new PipeRuntimeCriticalException(message, timeStamp);
+ switch (version) {
+ case VERSION_1:
+ return new PipeRuntimeCriticalException(message);
+ case VERSION_2:
+ return new PipeRuntimeCriticalException(message,
ReadWriteIOUtils.readLong(byteBuffer));
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupported
version %s", version));
+ }
}
- public static PipeRuntimeCriticalException deserializeFrom(InputStream
stream)
- throws IOException {
+ public static PipeRuntimeCriticalException deserializeFrom(
+ PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
final String message = ReadWriteIOUtils.readString(stream);
- final long timeStamp = ReadWriteIOUtils.readLong(stream);
- return new PipeRuntimeCriticalException(message, timeStamp);
+ switch (version) {
+ case VERSION_1:
+ return new PipeRuntimeCriticalException(message);
+ case VERSION_2:
+ return new PipeRuntimeCriticalException(message,
ReadWriteIOUtils.readLong(stream));
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupported
version %s", version));
+ }
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
index 17d6ec37827..4feb7a9c48f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -50,30 +51,32 @@ public enum PipeRuntimeExceptionType {
ReadWriteIOUtils.write(type, stream);
}
- public static PipeRuntimeException deserializeFrom(ByteBuffer byteBuffer) {
+ public static PipeRuntimeException deserializeFrom(
+ PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
final short type = ReadWriteIOUtils.readShort(byteBuffer);
switch (type) {
case 1:
- return PipeRuntimeNonCriticalException.deserializeFrom(byteBuffer);
+ return PipeRuntimeNonCriticalException.deserializeFrom(version,
byteBuffer);
case 2:
- return PipeRuntimeCriticalException.deserializeFrom(byteBuffer);
+ return PipeRuntimeCriticalException.deserializeFrom(version,
byteBuffer);
case 3:
- return
PipeRuntimeConnectorCriticalException.deserializeFrom(byteBuffer);
+ return PipeRuntimeConnectorCriticalException.deserializeFrom(version,
byteBuffer);
default:
throw new UnsupportedOperationException(
String.format("Unsupported PipeRuntimeException type %s.", type));
}
}
- public static PipeRuntimeException deserializeFrom(InputStream stream)
throws IOException {
+ public static PipeRuntimeException deserializeFrom(
+ PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
final short type = ReadWriteIOUtils.readShort(stream);
switch (type) {
case 1:
- return PipeRuntimeNonCriticalException.deserializeFrom(stream);
+ return PipeRuntimeNonCriticalException.deserializeFrom(version,
stream);
case 2:
- return PipeRuntimeCriticalException.deserializeFrom(stream);
+ return PipeRuntimeCriticalException.deserializeFrom(version, stream);
case 3:
- return PipeRuntimeConnectorCriticalException.deserializeFrom(stream);
+ return PipeRuntimeConnectorCriticalException.deserializeFrom(version,
stream);
default:
throw new UnsupportedOperationException(
String.format("Unsupported PipeRuntimeException type %s.", type));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
index 675d1a87f35..64c7a66b2d8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -63,17 +64,30 @@ public class PipeRuntimeNonCriticalException extends
PipeRuntimeException {
ReadWriteIOUtils.write(getTimeStamp(), stream);
}
- public static PipeRuntimeNonCriticalException deserializeFrom(ByteBuffer
byteBuffer) {
+ public static PipeRuntimeNonCriticalException deserializeFrom(
+ PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
final String message = ReadWriteIOUtils.readString(byteBuffer);
- final long timeStamp = ReadWriteIOUtils.readLong(byteBuffer);
- return new PipeRuntimeNonCriticalException(message, timeStamp);
+ switch (version) {
+ case VERSION_1:
+ return new PipeRuntimeNonCriticalException(message);
+ case VERSION_2:
+ return new PipeRuntimeNonCriticalException(message,
ReadWriteIOUtils.readLong(byteBuffer));
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupported
version %s", version));
+ }
}
- public static PipeRuntimeNonCriticalException deserializeFrom(InputStream
stream)
- throws IOException {
+ public static PipeRuntimeNonCriticalException deserializeFrom(
+ PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
final String message = ReadWriteIOUtils.readString(stream);
- final long timeStamp = ReadWriteIOUtils.readLong(stream);
- return new PipeRuntimeNonCriticalException(message, timeStamp);
+ switch (version) {
+ case VERSION_1:
+ return new PipeRuntimeNonCriticalException(message);
+ case VERSION_2:
+ return new PipeRuntimeNonCriticalException(message,
ReadWriteIOUtils.readLong(stream));
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupported
version %s", version));
+ }
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
index 056bf3178e6..6de10a08edd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
@@ -100,6 +100,8 @@ public class PipeRuntimeMeta {
}
public void serialize(DataOutputStream outputStream) throws IOException {
+ PipeRuntimeMetaVersion.VERSION_2.serialize(outputStream);
+
ReadWriteIOUtils.write(status.get().getType(), outputStream);
// Avoid concurrent modification
@@ -126,6 +128,8 @@ public class PipeRuntimeMeta {
}
public void serialize(FileOutputStream outputStream) throws IOException {
+ PipeRuntimeMetaVersion.VERSION_2.serialize(outputStream);
+
ReadWriteIOUtils.write(status.get().getType(), outputStream);
// Avoid concurrent modification
@@ -152,6 +156,38 @@ public class PipeRuntimeMeta {
}
public static PipeRuntimeMeta deserialize(InputStream inputStream) throws
IOException {
+ final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(inputStream);
+ final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
+ PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
+ switch (pipeRuntimeMetaVersion) {
+ case VERSION_1:
+ return deserializeVersion1(inputStream, pipeRuntimeVersionByte);
+ case VERSION_2:
+ return deserializeVersion2(inputStream);
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown pipe runtime meta version: " +
pipeRuntimeMetaVersion.getVersion());
+ }
+ }
+
+ private static PipeRuntimeMeta deserializeVersion1(InputStream inputStream,
byte pipeStatusByte)
+ throws IOException {
+ final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+ pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeStatusByte));
+
+ final int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; ++i) {
+ pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion,
ReadWriteIOUtils.readInt(inputStream)),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1,
inputStream));
+ }
+
+ return pipeRuntimeMeta;
+ }
+
+ private static PipeRuntimeMeta deserializeVersion2(InputStream inputStream)
throws IOException {
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(inputStream)));
@@ -161,14 +197,14 @@ public class PipeRuntimeMeta {
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
new TConsensusGroupId(
TConsensusGroupType.DataRegion,
ReadWriteIOUtils.readInt(inputStream)),
- PipeTaskMeta.deserialize(inputStream));
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2,
inputStream));
}
size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
pipeRuntimeMeta.dataNodeId2PipeRuntimeExceptionMap.put(
ReadWriteIOUtils.readInt(inputStream),
- PipeRuntimeExceptionType.deserializeFrom(inputStream));
+
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2,
inputStream));
}
pipeRuntimeMeta.exceptionsClearTime.set(ReadWriteIOUtils.readLong(inputStream));
@@ -177,6 +213,38 @@ public class PipeRuntimeMeta {
}
public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
+ final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(byteBuffer);
+ final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
+ PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
+ switch (pipeRuntimeMetaVersion) {
+ case VERSION_1:
+ return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte);
+ case VERSION_2:
+ return deserializeVersion2(byteBuffer);
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown pipe runtime meta version: " +
pipeRuntimeMetaVersion.getVersion());
+ }
+ }
+
+ private static PipeRuntimeMeta deserializeVersion1(
+ ByteBuffer byteBuffer, byte pipeRuntimeVersionByte) {
+ final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeRuntimeVersionByte));
+
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; ++i) {
+ pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion,
ReadWriteIOUtils.readInt(byteBuffer)),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1,
byteBuffer));
+ }
+
+ return pipeRuntimeMeta;
+ }
+
+ public static PipeRuntimeMeta deserializeVersion2(ByteBuffer byteBuffer) {
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
@@ -186,14 +254,14 @@ public class PipeRuntimeMeta {
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
new TConsensusGroupId(
TConsensusGroupType.DataRegion,
ReadWriteIOUtils.readInt(byteBuffer)),
- PipeTaskMeta.deserialize(byteBuffer));
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2,
byteBuffer));
}
size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
pipeRuntimeMeta.dataNodeId2PipeRuntimeExceptionMap.put(
ReadWriteIOUtils.readInt(byteBuffer),
- PipeRuntimeExceptionType.deserializeFrom(byteBuffer));
+
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2,
byteBuffer));
}
pipeRuntimeMeta.exceptionsClearTime.set(ReadWriteIOUtils.readLong(byteBuffer));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMetaVersion.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMetaVersion.java
new file mode 100644
index 00000000000..4e92d72c77c
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMetaVersion.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeRuntimeMetaVersion {
+
+ // for compatibility use
+ VERSION_1(PipeStatus.RUNNING.getType()),
+
+ VERSION_2(Byte.MAX_VALUE),
+ ;
+
+ private static final Map<Byte, PipeRuntimeMetaVersion> VERSION_MAP = new
HashMap<>();
+
+ static {
+ // for compatibility use
+ for (final PipeStatus status : PipeStatus.values()) {
+ VERSION_MAP.put(status.getType(), VERSION_1);
+ }
+
+ for (final PipeRuntimeMetaVersion version :
PipeRuntimeMetaVersion.values()) {
+ VERSION_MAP.put(version.getVersion(), version);
+ }
+ }
+
+ private final byte version;
+
+ PipeRuntimeMetaVersion(byte version) {
+ this.version = version;
+ }
+
+ public byte getVersion() {
+ return version;
+ }
+
+ public void serialize(FileOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(version, outputStream);
+ }
+
+ public void serialize(DataOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(version, outputStream);
+ }
+
+ public static PipeRuntimeMetaVersion deserialize(InputStream inputStream)
throws IOException {
+ return deserialize(ReadWriteIOUtils.readByte(inputStream));
+ }
+
+ public static PipeRuntimeMetaVersion deserialize(ByteBuffer byteBuffer) {
+ return deserialize(ReadWriteIOUtils.readByte(byteBuffer));
+ }
+
+ public static PipeRuntimeMetaVersion deserialize(byte version) {
+ return VERSION_MAP.get(version);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 854ea9e7bef..d24ed912bf6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -93,7 +93,9 @@ public class PipeTaskMeta {
public synchronized void serialize(DataOutputStream outputStream) throws
IOException {
progressIndex.get().serialize(outputStream);
+
ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
+
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
pipeRuntimeException.serialize(outputStream);
@@ -102,34 +104,41 @@ public class PipeTaskMeta {
public synchronized void serialize(FileOutputStream outputStream) throws
IOException {
progressIndex.get().serialize(outputStream);
+
ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
+
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
pipeRuntimeException.serialize(outputStream);
}
}
- public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
+ public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version,
ByteBuffer byteBuffer) {
final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(byteBuffer);
+
final int leaderDataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+
final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderDataNodeId);
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final PipeRuntimeException pipeRuntimeException =
- PipeRuntimeExceptionType.deserializeFrom(byteBuffer);
+ PipeRuntimeExceptionType.deserializeFrom(version, byteBuffer);
pipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
}
return pipeTaskMeta;
}
- public static PipeTaskMeta deserialize(InputStream inputStream) throws
IOException {
+ public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version,
InputStream inputStream)
+ throws IOException {
final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(inputStream);
+
final int leaderDataNodeId = ReadWriteIOUtils.readInt(inputStream);
+
final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderDataNodeId);
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final PipeRuntimeException pipeRuntimeException =
- PipeRuntimeExceptionType.deserializeFrom(inputStream);
+ PipeRuntimeExceptionType.deserializeFrom(version, inputStream);
pipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
}
return pipeTaskMeta;
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
index 307bdbc1d72..c40903cd1cc 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMetaVersion;
+
import org.junit.Assert;
import org.junit.Test;
@@ -35,7 +37,8 @@ public class PipeRuntimeExceptionTest {
buffer.position(0);
try {
PipeRuntimeNonCriticalException e1 =
- (PipeRuntimeNonCriticalException)
PipeRuntimeExceptionType.deserializeFrom(buffer);
+ (PipeRuntimeNonCriticalException)
+
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2,
buffer);
Assert.assertEquals(e.hashCode(), e1.hashCode());
} catch (ClassCastException classCastException) {
Assert.fail();
@@ -52,7 +55,8 @@ public class PipeRuntimeExceptionTest {
buffer.position(0);
try {
PipeRuntimeCriticalException e1 =
- (PipeRuntimeCriticalException)
PipeRuntimeExceptionType.deserializeFrom(buffer);
+ (PipeRuntimeCriticalException)
+
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2,
buffer);
Assert.assertEquals(e.hashCode(), e1.hashCode());
} catch (ClassCastException classCastException) {
Assert.fail();
@@ -70,7 +74,8 @@ public class PipeRuntimeExceptionTest {
buffer.position(0);
try {
PipeRuntimeConnectorCriticalException e1 =
- (PipeRuntimeConnectorCriticalException)
PipeRuntimeExceptionType.deserializeFrom(buffer);
+ (PipeRuntimeConnectorCriticalException)
+
PipeRuntimeExceptionType.deserializeFrom(PipeRuntimeMetaVersion.VERSION_2,
buffer);
Assert.assertEquals(e.hashCode(), e1.hashCode());
} catch (ClassCastException classCastException) {
Assert.fail();