This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b1224722d6 [IOTDB-4488] Implement ClusterSyncInfo snapshot by
SyncLogWritter and SyncLogReader (#7452)
b1224722d6 is described below
commit b1224722d6aaf72b9ab855cab46d3138f72190ed
Author: Chen YZ <[email protected]>
AuthorDate: Wed Sep 28 16:44:36 2022 +0800
[IOTDB-4488] Implement ClusterSyncInfo snapshot by SyncLogWritter and
SyncLogReader (#7452)
---
.../persistence/sync/ClusterSyncInfo.java | 13 +-
.../persistence/ClusterSyncInfoTest.java | 116 ++++++++++++++++++
.../iotdb/commons/sync/metadata/SyncMetadata.java | 65 +++++++++-
.../commons/sync/persistence/SyncLogReader.java | 119 ++++++++++++++++++
.../commons/sync/persistence/SyncLogWriter.java | 95 ++++++++++++++
.../apache/iotdb/commons/sync/pipe/PipeInfo.java | 50 ++++++++
.../iotdb/commons/sync/pipe/SyncOperation.java | 3 +
.../iotdb/commons/sync/pipe/TsFilePipeInfo.java | 27 ++++
.../iotdb/commons/sync/pipesink/IoTDBPipeSink.java | 29 ++++-
.../iotdb/commons/sync/pipesink/PipeSink.java | 25 ++++
.../apache/iotdb/db/sync/common/LocalSyncInfo.java | 21 ++--
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 5 +
.../db/sync/common/persistence/SyncLogReader.java | 128 -------------------
.../db/sync/common/persistence/SyncLogWriter.java | 136 ---------------------
.../db/sync/sender/pipe/ExternalPipeSink.java | 25 +++-
.../db/sync/receiver/recovery/SyncLogTest.java | 45 ++++---
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +-
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 16 +++
18 files changed, 615 insertions(+), 307 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
index 5838fb28a3..c42dd0d75f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
@@ -101,17 +101,20 @@ public class ClusterSyncInfo implements SnapshotProcessor
{
return resp;
}
+ // endregion
+
+ // ======================================================
+ // region Implement of Snapshot
+ // ======================================================
+
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
- // TODO: merge snapshot logic into SyncLogWritter and SyncLogReader
- // TODO: add ClusterSyncInfoTest later
- return true;
+ return syncMetadata.processTakeSnapshot(snapshotDir);
}
@Override
public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
- // TODO: merge snapshot logic into SyncLogWritter and SyncLogReader
- // TODO: add ClusterSyncInfoTest later
+ syncMetadata.processLoadSnapshot(snapshotDir);
}
// endregion
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
new file mode 100644
index 0000000000..dd9abc5222
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import
org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class ClusterSyncInfoTest {
+
+ private ClusterSyncInfo clusterSyncInfo;
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH,
"snapshot");
+
+ @Before
+ public void setup() throws IOException {
+ clusterSyncInfo = new ClusterSyncInfo();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ private void prepareClusterSyncInfo() {
+ Map<String, String> attributes1 = new HashMap<>();
+ attributes1.put("ip", "192.168.11.11");
+ attributes1.put("port", "7766");
+ TPipeSinkInfo pipeSinkInfo1 =
+ new TPipeSinkInfo()
+ .setPipeSinkName("demo1")
+ .setPipeSinkType("IoTDB")
+ .setAttributes(attributes1);
+ Map<String, String> attributes2 = new HashMap<>();
+ attributes2.put("ip", "192.168.22.2");
+ attributes2.put("port", "7777");
+ TPipeSinkInfo pipeSinkInfo2 =
+ new TPipeSinkInfo()
+ .setPipeSinkName("demo2")
+ .setPipeSinkType("IoTDB")
+ .setAttributes(attributes2);
+
+ clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo1));
+ clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo2));
+ }
+
+ @Test
+ public void testSnapshot() throws Exception {
+ prepareClusterSyncInfo();
+
+ clusterSyncInfo.processTakeSnapshot(snapshotDir);
+ ClusterSyncInfo clusterSyncInfo2 = new ClusterSyncInfo();
+ clusterSyncInfo2.processLoadSnapshot(snapshotDir);
+
+ List<PipeSink> expectedPipeSink =
+ clusterSyncInfo.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
+ List<PipeSink> actualPipeSink =
+ clusterSyncInfo2.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
+ Assert.assertEquals(expectedPipeSink, actualPipeSink);
+ }
+
+ @Test
+ public void testPipeSinkOperation() {
+ prepareClusterSyncInfo();
+ try {
+ clusterSyncInfo.checkAddPipeSink("demo1");
+ clusterSyncInfo.checkDropPipeSink("demo3");
+ Assert.fail("checkOperatePipeSink ignore failure.");
+ } catch (PipeSinkException e) {
+ // nothing
+ }
+ try {
+ clusterSyncInfo.checkAddPipeSink("demo3");
+ clusterSyncInfo.checkDropPipeSink("demo1");
+ } catch (PipeSinkException e) {
+ Assert.fail("checkOperatePipeSink should not throw exception.");
+ }
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
index fee6963be9..57043817f7 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
@@ -20,18 +20,31 @@ package org.apache.iotdb.commons.sync.metadata;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-public class SyncMetadata {
+public class SyncMetadata implements SnapshotProcessor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SyncMetadata.class);
// <PipeSinkName, PipeSink>
private Map<String, PipeSink> pipeSinks;
@@ -207,6 +220,56 @@ public class SyncMetadata {
}
}
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot file [{}] is already
exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+ File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" +
UUID.randomUUID());
+ try (SyncLogWriter writer = new SyncLogWriter(snapshotDir,
tmpFile.getName())) {
+ for (PipeSink pipeSink : pipeSinks.values()) {
+ writer.addPipeSink(pipeSink);
+ }
+ for (Map<Long, PipeInfo> map : pipes.values()) {
+ for (PipeInfo pipeInfo : map.values()) {
+ writer.addPipe(pipeInfo);
+ switch (pipeInfo.getStatus()) {
+ case RUNNING:
+ writer.operatePipe(pipeInfo.getPipeName(),
SyncOperation.START_PIPE);
+ break;
+ case STOP:
+ writer.operatePipe(pipeInfo.getPipeName(),
SyncOperation.STOP_PIPE);
+ break;
+ case DROP:
+ writer.operatePipe(pipeInfo.getPipeName(),
SyncOperation.DROP_PIPE);
+ break;
+ }
+ }
+ }
+ }
+ return tmpFile.renameTo(snapshotFile);
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, SyncConstant.SYNC_LOG_NAME);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+ SyncLogReader reader = new SyncLogReader(snapshotDir);
+ reader.recover();
+ setPipes(reader.getAllPipeInfos());
+ setPipeSinks(reader.getAllPipeSinks());
+ setRunningPipe(reader.getRunningPipeInfo());
+ }
+
// endregion
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
new file mode 100644
index 0000000000..2f402caf6d
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.sync.persistence;
+
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SyncLogReader {
+ private static final Logger logger =
LoggerFactory.getLogger(SyncLogReader.class);
+ // <pipeSinkName, PipeSink>
+ private final Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>();
+ private final Map<String, Map<Long, PipeInfo>> pipes = new
ConcurrentHashMap<>();
+ private PipeInfo runningPipe;
+ private final File dir;
+ private final String fileName;
+
+ public SyncLogReader(File dir) {
+ this.dir = dir;
+ this.fileName = SyncConstant.SYNC_LOG_NAME;
+ }
+
+ public SyncLogReader(File dir, String fileName) {
+ this.dir = dir;
+ this.fileName = fileName;
+ }
+
+ public void recover() throws IOException {
+ logger.info("Start to recover all sync state for sync.");
+ File serviceLogFile = new File(dir, fileName);
+ if (!serviceLogFile.exists()) {
+ logger.warn("Sync service log file not found");
+ } else {
+ try (InputStream inputStream = new FileInputStream(serviceLogFile)) {
+ recoverPipe(inputStream);
+ }
+ }
+ }
+
+ public Map<String, PipeSink> getAllPipeSinks() {
+ return pipeSinks;
+ }
+
+ public Map<String, Map<Long, PipeInfo>> getAllPipeInfos() {
+ return pipes;
+ }
+
+ public PipeInfo getRunningPipeInfo() {
+ return runningPipe;
+ }
+
+ private void recoverPipe(InputStream inputStream) throws IOException {
+ byte nextByte;
+ while ((nextByte = ReadWriteIOUtils.readByte(inputStream)) != -1) {
+ SyncOperation operationType = SyncOperation.values()[nextByte];
+ switch (operationType) {
+ case CREATE_PIPESINK:
+ PipeSink pipeSink = PipeSink.deserializePipeSink(inputStream);
+ pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
+ break;
+ case DROP_PIPESINK:
+ pipeSinks.remove(ReadWriteIOUtils.readString(inputStream));
+ break;
+ case CREATE_PIPE:
+ runningPipe = PipeInfo.deserializePipeInfo(inputStream);
+ pipes
+ .computeIfAbsent(runningPipe.getPipeName(), i -> new
ConcurrentHashMap<>())
+ .computeIfAbsent(runningPipe.getCreateTime(), i -> runningPipe);
+ break;
+ case STOP_PIPE:
+ // TODO: support multiple pipe
+ ReadWriteIOUtils.readString(inputStream);
+ runningPipe.stop();
+ break;
+ case START_PIPE:
+ // TODO: support multiple pipe
+ ReadWriteIOUtils.readString(inputStream);
+ runningPipe.start();
+ break;
+ case DROP_PIPE:
+ // TODO: support multiple pipe
+ ReadWriteIOUtils.readString(inputStream);
+ runningPipe.drop();
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize SyncOperation %s.",
operationType.name()));
+ }
+ }
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java
new file mode 100644
index 0000000000..84fe42f840
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.sync.persistence;
+
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncConstant;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * SyncLogger is used to manage the persistent information in the sync module.
Persistent
+ * information can be recovered on reboot via {@linkplain SyncLogReader}.
+ */
+public class SyncLogWriter implements AutoCloseable {
+ // record pipe meta info
+ private OutputStream outputStream;
+ private final File dir;
+ private final String fileName;
+
+ public SyncLogWriter(File dir) {
+ this.dir = dir;
+ this.fileName = SyncConstant.SYNC_LOG_NAME;
+ }
+
+ public SyncLogWriter(File dir, String fileName) {
+ this.dir = dir;
+ this.fileName = fileName;
+ }
+
+ public void getOutputStream() throws IOException {
+ if (outputStream == null) {
+ // File logFile = new File(SyncPathUtil.getSysDir(),
SyncConstant.SYNC_LOG_NAME);
+ File logFile = new File(dir, fileName);
+ if (!logFile.getParentFile().exists()) {
+ logFile.getParentFile().mkdirs();
+ }
+ outputStream = new FileOutputStream(logFile, true);
+ }
+ }
+
+ public synchronized void addPipeSink(PipeSink pipeSink) throws IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPESINK.ordinal(),
outputStream);
+ pipeSink.serialize(outputStream);
+ }
+
+ public synchronized void dropPipeSink(String pipeSinkName) throws
IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) SyncOperation.DROP_PIPESINK.ordinal(),
outputStream);
+ ReadWriteIOUtils.write(pipeSinkName, outputStream);
+ }
+
+ public synchronized void addPipe(PipeInfo pipeInfo) throws IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) SyncOperation.CREATE_PIPE.ordinal(),
outputStream);
+ pipeInfo.serialize(outputStream);
+ }
+
+ public synchronized void operatePipe(String pipeName, SyncOperation
syncOperation)
+ throws IOException {
+ getOutputStream();
+ ReadWriteIOUtils.write((byte) syncOperation.ordinal(), outputStream);
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
index 206714563c..b93da6a86a 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
@@ -18,13 +18,23 @@
*/
package org.apache.iotdb.commons.sync.pipe;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
public abstract class PipeInfo {
+
protected String pipeName;
protected String pipeSinkName;
protected PipeStatus status;
protected long createTime;
protected PipeMessage.PipeMessageType messageType;
+ // only used for serialization
+ protected PipeInfo() {}
+
public PipeInfo(String pipeName, String pipeSinkName, long createTime) {
this.pipeName = pipeName;
this.pipeSinkName = pipeSinkName;
@@ -41,6 +51,8 @@ public abstract class PipeInfo {
this.messageType = PipeMessage.PipeMessageType.NORMAL;
}
+ abstract PipeType getType();
+
public String getPipeName() {
return pipeName;
}
@@ -92,4 +104,42 @@ public abstract class PipeInfo {
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write((byte) getType().ordinal(), outputStream);
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(pipeSinkName, outputStream);
+ ReadWriteIOUtils.write((byte) status.ordinal(), outputStream);
+ ReadWriteIOUtils.write(createTime, outputStream);
+ ReadWriteIOUtils.write((byte) messageType.ordinal(), outputStream);
+ }
+
+ protected void deserialize(InputStream inputStream) throws IOException {
+ pipeName = ReadWriteIOUtils.readString(inputStream);
+ pipeSinkName = ReadWriteIOUtils.readString(inputStream);
+ status = PipeStatus.values()[ReadWriteIOUtils.readByte(inputStream)];
+ createTime = ReadWriteIOUtils.readLong(inputStream);
+ messageType =
PipeMessage.PipeMessageType.values()[ReadWriteIOUtils.readByte(inputStream)];
+ }
+
+ public static PipeInfo deserializePipeInfo(InputStream inputStream) throws
IOException {
+ PipeType pipeType =
PipeType.values()[ReadWriteIOUtils.readByte(inputStream)];
+ PipeInfo pipeInfo;
+ switch (pipeType) {
+ case TsFilePipe:
+ pipeInfo = new TsFilePipeInfo();
+ pipeInfo.deserialize(inputStream);
+ break;
+ case WALPipe:
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize PipeType %s.", pipeType.name()));
+ }
+ return pipeInfo;
+ }
+
+ enum PipeType {
+ TsFilePipe,
+ WALPipe
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
index 618e0ebfe8..d2bfdf743b 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.commons.sync.pipe;
public enum SyncOperation {
+ CREATE_PIPESINK,
+ DROP_PIPESINK,
+ CREATE_PIPE,
START_PIPE,
STOP_PIPE,
DROP_PIPE
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
index 81feed0cc5..391d4c65fd 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
@@ -18,12 +18,20 @@
*/
package org.apache.iotdb.commons.sync.pipe;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Objects;
public class TsFilePipeInfo extends PipeInfo {
private boolean syncDelOp;
private long dataStartTimestamp;
+ // only used for serialization
+ protected TsFilePipeInfo() {}
+
public TsFilePipeInfo(
String pipeName,
String pipeSinkName,
@@ -63,6 +71,25 @@ public class TsFilePipeInfo extends PipeInfo {
this.dataStartTimestamp = dataStartTimestamp;
}
+ @Override
+ PipeType getType() {
+ return PipeType.TsFilePipe;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ super.serialize(outputStream);
+ ReadWriteIOUtils.write(syncDelOp, outputStream);
+ ReadWriteIOUtils.write(dataStartTimestamp, outputStream);
+ }
+
+ @Override
+ protected void deserialize(InputStream inputStream) throws IOException {
+ super.deserialize(inputStream);
+ syncDelOp = ReadWriteIOUtils.readBool(inputStream);
+ dataStartTimestamp = ReadWriteIOUtils.readLong(inputStream);
+ }
+
@Override
public String toString() {
return "TsFilePipeInfo{"
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
index 56736a46d0..b0cbf2aef6 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
@@ -24,14 +24,18 @@ import org.apache.iotdb.commons.sync.utils.SyncConstant;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class IoTDBPipeSink implements PipeSink {
- private final PipeSinkType pipeSinkType;
+ private final PipeSinkType pipeSinkType = PipeSinkType.IoTDB;
private String name;
private String ip;
@@ -40,11 +44,13 @@ public class IoTDBPipeSink implements PipeSink {
private static final String ATTRIBUTE_IP_KEY = "ip";
private static final String ATTRIBUTE_PORT_KEY = "port";
+ public IoTDBPipeSink() {}
+
public IoTDBPipeSink(String name) {
- ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
- port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
+ this();
+ this.ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
+ this.port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
this.name = name;
- pipeSinkType = PipeSinkType.IoTDB;
}
@Override
@@ -122,6 +128,21 @@ public class IoTDBPipeSink implements PipeSink {
return new TPipeSinkInfo(this.name,
this.pipeSinkType.name()).setAttributes(attributes);
}
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream);
+ ReadWriteIOUtils.write(name, outputStream);
+ ReadWriteIOUtils.write(ip, outputStream);
+ ReadWriteIOUtils.write(port, outputStream);
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IOException {
+ name = ReadWriteIOUtils.readString(inputStream);
+ ip = ReadWriteIOUtils.readString(inputStream);
+ port = ReadWriteIOUtils.readInt(inputStream);
+ }
+
@Override
public String toString() {
return "IoTDBPipeSink{"
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
index d68a35dc95..588618d96e 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
@@ -22,7 +22,11 @@ package org.apache.iotdb.commons.sync.pipesink;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
@@ -41,6 +45,27 @@ public interface PipeSink {
TPipeSinkInfo getTPipeSinkInfo();
+ void serialize(OutputStream outputStream) throws IOException;
+
+ void deserialize(InputStream inputStream) throws IOException;
+
+ static PipeSink deserializePipeSink(InputStream inputStream) throws
IOException {
+ PipeSinkType pipeSinkType =
PipeSinkType.values()[ReadWriteIOUtils.readByte(inputStream)];
+ PipeSink pipeSink;
+ switch (pipeSinkType) {
+ case IoTDB:
+ pipeSink = new IoTDBPipeSink();
+ pipeSink.deserialize(inputStream);
+ break;
+ case ExternalPipe:
+ // TODO(ext-pipe): deserialize external pipesink here
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize PipeSinkType %s.",
pipeSinkType.name()));
+ }
+ return pipeSink;
+ }
+
enum PipeSinkType {
IoTDB,
ExternalPipe
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
index 6f44f50032..a90a67ba9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
@@ -18,25 +18,26 @@
*/
package org.apache.iotdb.db.sync.common;
-import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.metadata.SyncMetadata;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -49,15 +50,15 @@ public class LocalSyncInfo {
private final SyncMetadata syncMetadata;
public LocalSyncInfo() {
- syncLogWriter = SyncLogWriter.getInstance();
+ syncLogWriter = new SyncLogWriter(new File(SyncPathUtil.getSysDir()));
syncMetadata = new SyncMetadata();
- SyncLogReader logReader = new SyncLogReader();
+ SyncLogReader logReader = new SyncLogReader(new
File(SyncPathUtil.getSysDir()));
try {
logReader.recover();
syncMetadata.setPipes(logReader.getAllPipeInfos());
syncMetadata.setPipeSinks(logReader.getAllPipeSinks());
syncMetadata.setRunningPipe(logReader.getRunningPipeInfo());
- } catch (StartupException e) {
+ } catch (IOException e) {
LOGGER.error(
"Cannot recover ReceiverInfo because {}. Use default info values.",
e.getMessage());
}
@@ -75,7 +76,7 @@ public class LocalSyncInfo {
PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
// should guarantee the adding pipesink is not exist.
syncMetadata.addPipeSink(pipeSink);
- syncLogWriter.addPipeSink(plan);
+ syncLogWriter.addPipeSink(pipeSink);
}
public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
@@ -84,7 +85,7 @@ public class LocalSyncInfo {
PipeSink pipeSink =
SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement);
// should guarantee the adding pipesink is not exist.
syncMetadata.addPipeSink(pipeSink);
- syncLogWriter.addPipeSink(createPipeSinkStatement);
+ syncLogWriter.addPipeSink(pipeSink);
}
public void dropPipeSink(String name) throws PipeSinkException, IOException {
@@ -112,7 +113,7 @@ public class LocalSyncInfo {
PipeSink pipeSink = getPipeSink(plan.getPipeSinkName());
PipeInfo pipeInfo = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan,
pipeSink, createTime);
syncMetadata.addPipe(pipeInfo, pipeSink);
- syncLogWriter.addPipe(plan, createTime);
+ syncLogWriter.addPipe(pipeInfo);
}
public void addPipe(CreatePipeStatement createPipeStatement, long createTime)
@@ -125,7 +126,7 @@ public class LocalSyncInfo {
PipeInfo pipeInfo =
SyncPipeUtil.parseCreatePipePlanAsPipeInfo(createPipeStatement,
pipeSink, createTime);
syncMetadata.addPipe(pipeInfo, pipeSink);
- syncLogWriter.addPipe(createPipeStatement, createTime);
+ syncLogWriter.addPipe(pipeInfo);
}
public void operatePipe(String pipeName, SyncOperation syncOperation)
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 46f23e39ea..0a6700f5bc 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -185,4 +185,9 @@ public class LocalSyncInfoFetcher implements
ISyncInfoFetcher {
public void reset() {
localSyncInfo = new LocalSyncInfo();
}
+
+ @TestOnly
+ public void close() throws IOException {
+ localSyncInfo.close();
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
deleted file mode 100644
index eb5de298a6..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.common.persistence;
-
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SyncLogReader {
- private static final Logger logger =
LoggerFactory.getLogger(SyncLogReader.class);
- // <pipeSinkName, PipeSink>
- private Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>();
- private Map<String, Map<Long, PipeInfo>> pipes = new ConcurrentHashMap<>();
- private PipeInfo runningPipe;
-
- public void recover() throws StartupException {
- logger.info("Start to recover all sync state for sync.");
- File serviceLogFile = new File(SyncPathUtil.getSysDir(),
SyncConstant.SYNC_LOG_NAME);
- try (BufferedReader br = new BufferedReader(new
FileReader(serviceLogFile))) {
- recoverPipe(br);
- } catch (IOException e) {
- logger.warn("Sync service log file not found");
- }
- }
-
- public Map<String, PipeSink> getAllPipeSinks() {
- return pipeSinks;
- }
-
- public Map<String, Map<Long, PipeInfo>> getAllPipeInfos() {
- return pipes;
- }
-
- public PipeInfo getRunningPipeInfo() {
- return runningPipe;
- }
-
- private void recoverPipe(BufferedReader br) throws IOException {
- int lineNumber =
- 0; // line index shown in sender log starts from 1, so lineNumber
starts from 0.
- String readLine = "";
- String[] parseStrings;
-
- try {
- while ((readLine = br.readLine()) != null) {
- lineNumber += 1;
- parseStrings = readLine.split(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
-
- StatementType type = StatementType.valueOf(parseStrings[0]);
-
- switch (type) {
- case CREATE_PIPESINK:
- readLine = br.readLine();
- lineNumber += 1;
- CreatePipeSinkStatement createPipeSinkStatement =
- CreatePipeSinkStatement.parseString(readLine);
- pipeSinks.put(
- createPipeSinkStatement.getPipeSinkName(),
-
SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement));
- break;
- case DROP_PIPESINK:
- pipeSinks.remove(parseStrings[1]);
- break;
- case CREATE_PIPE:
- readLine = br.readLine();
- lineNumber += 1;
- CreatePipeStatement createPipeStatement =
CreatePipeStatement.parseString(readLine);
- runningPipe =
- SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
- createPipeStatement,
- pipeSinks.get(createPipeStatement.getPipeSinkName()),
- Long.parseLong(parseStrings[1]));
- pipes
- .computeIfAbsent(runningPipe.getPipeName(), i -> new
ConcurrentHashMap<>())
- .computeIfAbsent(runningPipe.getCreateTime(), i ->
runningPipe);
- break;
- case STOP_PIPE:
- runningPipe.stop();
- break;
- case START_PIPE:
- runningPipe.start();
- break;
- case DROP_PIPE:
- runningPipe.drop();
- break;
- default:
- throw new UnsupportedOperationException(
- String.format("Can not recognize type %s.", type.name()));
- }
- }
- } catch (Exception e) {
- throw new IOException(
- String.format("Recover error in line %d : %s, because %s",
lineNumber, readLine, e));
- }
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
deleted file mode 100644
index e6920e9e38..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.common.persistence;
-
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-
-/**
- * SyncLogger is used to manage the persistent information in the sync module.
Persistent
- * information can be recovered on reboot via {@linkplain SyncLogReader}.
- */
-public class SyncLogWriter {
- // record pipe meta info
- private BufferedWriter pipeInfoWriter;
-
- private SyncLogWriter() {}
-
- public void getBufferedWriter() throws IOException {
- if (pipeInfoWriter == null) {
- File logFile = new File(SyncPathUtil.getSysDir(),
SyncConstant.SYNC_LOG_NAME);
- if (!logFile.getParentFile().exists()) {
- logFile.getParentFile().mkdirs();
- }
- pipeInfoWriter = new BufferedWriter(new FileWriter(logFile, true));
- }
- }
-
- // TODO(sync): delete this in new-standalone version
- public synchronized void addPipeSink(CreatePipeSinkPlan plan) throws
IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPESINK.name());
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(plan.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void addPipeSink(CreatePipeSinkStatement
createPipeSinkStatement)
- throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(createPipeSinkStatement.getType().name());
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(createPipeSinkStatement.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void dropPipeSink(String pipeSinkName) throws
IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.DROP_PIPESINK.name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(pipeSinkName);
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- // TODO(sync): delete this in new-standalone version
- public synchronized void addPipe(CreatePipePlan plan, long pipeCreateTime)
throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPE.name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(String.valueOf(pipeCreateTime));
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(plan.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void addPipe(CreatePipeStatement createPipeStatement,
long pipeCreateTime)
- throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(createPipeStatement.getType().name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(String.valueOf(pipeCreateTime));
- pipeInfoWriter.newLine();
- pipeInfoWriter.write(createPipeStatement.toString());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public synchronized void operatePipe(String pipeName, SyncOperation
syncOperation)
- throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(syncOperation.name());
- pipeInfoWriter.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
- pipeInfoWriter.write(pipeName);
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public void close() throws IOException {
- if (pipeInfoWriter != null) {
- pipeInfoWriter.close();
- pipeInfoWriter = null;
- }
- }
-
- private static class SyncLoggerHolder {
- private static final SyncLogWriter INSTANCE = new SyncLogWriter();
-
- private SyncLoggerHolder() {
- // empty constructor
- }
- }
-
- public static SyncLogWriter getInstance() {
- return SyncLogWriter.SyncLoggerHolder.INSTANCE;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
index b7c4a1c35b..b926ac1bca 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
@@ -24,10 +24,14 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -38,11 +42,13 @@ public class ExternalPipeSink implements PipeSink {
private final PipeSinkType pipeSinkType = PipeSinkType.ExternalPipe;
- private final String pipeSinkName;
- private final String extPipeSinkTypeName;
+ private String pipeSinkName;
+ private String extPipeSinkTypeName;
private Map<String, String> sinkParams;
+ public ExternalPipeSink() {}
+
public ExternalPipeSink(String pipeSinkName, String extPipeSinkTypeName) {
this.pipeSinkName = pipeSinkName;
this.extPipeSinkTypeName = extPipeSinkTypeName;
@@ -111,6 +117,21 @@ public class ExternalPipeSink implements PipeSink {
return new TPipeSinkInfo(this.pipeSinkName,
this.pipeSinkType.name()).setAttributes(sinkParams);
}
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream);
+ ReadWriteIOUtils.write(pipeSinkName, outputStream);
+ ReadWriteIOUtils.write(extPipeSinkTypeName, outputStream);
+ ReadWriteIOUtils.write(sinkParams, outputStream);
+ }
+
+ @Override
+ public void deserialize(InputStream inputStream) throws IOException {
+ pipeSinkName = ReadWriteIOUtils.readString(inputStream);
+ extPipeSinkTypeName = ReadWriteIOUtils.readString(inputStream);
+ sinkParams = ReadWriteIOUtils.readMap(inputStream);
+ }
+
public Map<String, String> getSinkParams() {
return sinkParams;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index b907dd0752..23fc443426 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -18,17 +18,18 @@
*/
package org.apache.iotdb.db.sync.receiver.recovery;
+import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
+import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
+import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.SyncTestUtils;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;
@@ -36,7 +37,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
/** This test is for ReceiverLog and ReceiverLogAnalyzer */
@@ -62,19 +65,23 @@ public class SyncLogTest {
@Test
public void testServiceLog() {
try {
- SyncLogWriter log = SyncLogWriter.getInstance();
- CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo",
"iotdb");
- createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
- createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
- log.addPipeSink(createPipeSinkPlan);
- log.addPipe(new CreatePipePlan(pipe1, "demo"), createdTime1);
+ SyncLogWriter log = new SyncLogWriter(new
File(SyncPathUtil.getSysDir()));
+ PipeSink pipeSink = new IoTDBPipeSink("demo");
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("ip", "192.168.11.11");
+ attributes.put("port", "7766");
+ pipeSink.setAttribute(attributes);
+ log.addPipeSink(pipeSink);
+ PipeInfo pipeInfo1 = new TsFilePipeInfo(pipe1, "demo", createdTime1, 0,
true);
+ PipeInfo pipeInfo2 = new TsFilePipeInfo(pipe2, "demo", createdTime2, 99,
false);
+ log.addPipe(pipeInfo1);
log.operatePipe(pipe1, SyncOperation.DROP_PIPE);
- log.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
- log.operatePipe(pipe1, SyncOperation.STOP_PIPE);
- log.operatePipe(pipe1, SyncOperation.START_PIPE);
+ log.addPipe(pipeInfo2);
+ log.operatePipe(pipe2, SyncOperation.STOP_PIPE);
+ log.operatePipe(pipe2, SyncOperation.START_PIPE);
log.close();
- SyncLogReader syncLogReader = new SyncLogReader();
+ SyncLogReader syncLogReader = new SyncLogReader(new
File(SyncPathUtil.getSysDir()));
syncLogReader.recover();
@@ -92,25 +99,25 @@ public class SyncLogTest {
createdTime2,
PipeMessage.PipeMessageType.NORMAL);
Map<String, Map<Long, PipeInfo>> pipes = syncLogReader.getAllPipeInfos();
- PipeInfo pipeInfo1 = pipes.get(pipe1).get(createdTime1);
+ PipeInfo pipeInfoRecover1 = pipes.get(pipe1).get(createdTime1);
SyncTestUtils.checkPipeInfo(
- pipeInfo1,
+ pipeInfoRecover1,
pipe1,
"demo",
PipeStatus.DROP,
createdTime1,
PipeMessage.PipeMessageType.NORMAL);
- PipeInfo pipeInfo2 = pipes.get(pipe2).get(createdTime2);
+ PipeInfo pipeInfoRecover2 = pipes.get(pipe2).get(createdTime2);
SyncTestUtils.checkPipeInfo(
- pipeInfo2,
+ pipeInfoRecover2,
pipe2,
"demo",
PipeStatus.RUNNING,
createdTime2,
PipeMessage.PipeMessageType.NORMAL);
} catch (Exception e) {
- Assert.fail();
e.printStackTrace();
+ Assert.fail();
}
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 0b179443c6..9298697653 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
+import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -193,7 +193,7 @@ public class EnvironmentUtils {
LastQueryExecutor.clear();
// clear SyncLogger
- SyncLogWriter.getInstance().close();
+ LocalSyncInfoFetcher.getInstance().close();
// delete all directory
cleanAllDir();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 1ad3b5fbec..5959ba0d42 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -692,6 +692,22 @@ public class ReadWriteIOUtils {
return map;
}
+ public static Map<String, String> readMap(InputStream inputStream) throws
IOException {
+ int length = readInt(inputStream);
+ if (length == NO_BYTE_TO_READ) {
+ return null;
+ }
+ Map<String, String> map = new HashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ // key
+ String key = readString(inputStream);
+ // value
+ String value = readString(inputStream);
+ map.put(key, value);
+ }
+ return map;
+ }
+
public static LinkedHashMap<String, String> readLinkedHashMap(ByteBuffer
buffer) {
int length = readInt(buffer);
if (length == NO_BYTE_TO_READ) {