This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5723 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5d8765d939929a9c0c4ad8a4f2ffe9f86805cf20 Author: Steve Yurong Su <[email protected]> AuthorDate: Sun May 28 22:41:52 2023 +0800 progress index impl for simple consensus --- .../commons/consensus/index/ProgressIndexType.java | 6 + .../consensus/index/impl/SimpleProgressIndex.java | 160 +++++++++++++++++++++ .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 17 ++- .../SimpleConsensusProgressIndexAssigner.java | 108 ++++++++++++++ .../java/org/apache/iotdb/db/service/DataNode.java | 2 +- 5 files changed, 291 insertions(+), 2 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java index 37d83a52efb..02afa4045df 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.consensus.index; import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; @@ -31,6 +32,7 @@ import java.nio.ByteBuffer; public enum ProgressIndexType { MINIMUM_CONSENSUS_INDEX((short) 1), IOT_CONSENSUS_INDEX((short) 2), + SIMPLE_CONSENSUS_INDEX((short) 3), ; private final short type; @@ -58,6 +60,8 @@ public enum ProgressIndexType { return MinimumProgressIndex.deserializeFrom(byteBuffer); case 2: return IoTProgressIndex.deserializeFrom(byteBuffer); + case 3: + return SimpleProgressIndex.deserializeFrom(byteBuffer); default: throw new UnsupportedOperationException( String.format("Unsupported progress index type %s.", indexType)); @@ -71,6 +75,8 @@ public enum ProgressIndexType { return MinimumProgressIndex.deserializeFrom(stream); case 2: return IoTProgressIndex.deserializeFrom(stream); + case 3: + return SimpleProgressIndex.deserializeFrom(stream); default: throw new UnsupportedOperationException( String.format("Unsupported progress index type %s.", indexType)); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java new file mode 100644 index 00000000000..6571d90759a --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.consensus.index.impl; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class SimpleProgressIndex implements ProgressIndex { + + private final int rebootTimes; + private final long memtableFlushOrderId; + + public SimpleProgressIndex(int rebootTimes, long memtableFlushOrderId) { + this.rebootTimes = rebootTimes; + this.memtableFlushOrderId = memtableFlushOrderId; + } + + @Override + public void serialize(ByteBuffer byteBuffer) { + ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(byteBuffer); + + ReadWriteIOUtils.write(rebootTimes, byteBuffer); + ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer); + } + + @Override + public void serialize(OutputStream stream) throws IOException { + ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(stream); + + ReadWriteIOUtils.write(rebootTimes, stream); + ReadWriteIOUtils.write(memtableFlushOrderId, stream); + } + + @Override + public boolean isAfter(ProgressIndex progressIndex) { + if (progressIndex instanceof MinimumProgressIndex) { + return true; + } + + if (!(progressIndex instanceof SimpleProgressIndex)) { + return false; + } + + final SimpleProgressIndex thisSimpleProgressIndex = this; + final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex; + if (thisSimpleProgressIndex.rebootTimes > thatSimpleProgressIndex.rebootTimes) { + return true; + } + if (thisSimpleProgressIndex.rebootTimes < thatSimpleProgressIndex.rebootTimes) { + return false; + } + // thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes + return thisSimpleProgressIndex.memtableFlushOrderId + > thatSimpleProgressIndex.memtableFlushOrderId; + } + + @Override + public boolean equals(ProgressIndex progressIndex) { + if (!(progressIndex instanceof SimpleProgressIndex)) { + return false; + } + + final SimpleProgressIndex thisSimpleProgressIndex = this; + final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex; + return thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes + && thisSimpleProgressIndex.memtableFlushOrderId + == thatSimpleProgressIndex.memtableFlushOrderId; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + if (!(obj instanceof SimpleProgressIndex)) { + return false; + } + return this.equals((SimpleProgressIndex) obj); + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex progressIndex) { + if (!(progressIndex instanceof SimpleProgressIndex)) { + return this; + } + + final SimpleProgressIndex thisSimpleProgressIndex = this; + final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex; + if (thisSimpleProgressIndex.rebootTimes > thatSimpleProgressIndex.rebootTimes) { + return this; + } + if (thisSimpleProgressIndex.rebootTimes < thatSimpleProgressIndex.rebootTimes) { + return progressIndex; + } + // thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes + if (thisSimpleProgressIndex.memtableFlushOrderId + > thatSimpleProgressIndex.memtableFlushOrderId) { + return this; + } + if (thisSimpleProgressIndex.memtableFlushOrderId + < thatSimpleProgressIndex.memtableFlushOrderId) { + return progressIndex; + } + // thisSimpleProgressIndex.memtableFlushOrderId == thatSimpleProgressIndex.memtableFlushOrderId + return this; + } + + public static SimpleProgressIndex deserializeFrom(ByteBuffer byteBuffer) { + final int rebootTimes = ReadWriteIOUtils.readInt(byteBuffer); + final long memtableFlushOrderId = ReadWriteIOUtils.readLong(byteBuffer); + return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId); + } + + public static SimpleProgressIndex deserializeFrom(InputStream stream) throws IOException { + final int rebootTimes = ReadWriteIOUtils.readInt(stream); + final long memtableFlushOrderId = ReadWriteIOUtils.readLong(stream); + return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId); + } + + @Override + public String toString() { + return "SimpleProgressIndex{" + + "rebootTimes=" + + rebootTimes + + ", memtableFlushOrderId=" + + memtableFlushOrderId + + '}'; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 5253d5a2db2..486afe825e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.agent.runtime; +import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; @@ -38,9 +39,15 @@ public class PipeRuntimeAgent implements IService { private static final AtomicBoolean isShutdown = new AtomicBoolean(false); - public synchronized void launchPipePluginAgent( + private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner = + new SimpleConsensusProgressIndexAssigner(); + + //////////////////////////// System Service Interface //////////////////////////// + + public synchronized void preparePipeResources( ResourcesInformationHolder resourcesInformationHolder) throws StartupException { PipeLauncher.launchPipePluginAgent(resourcesInformationHolder); + simpleConsensusProgressIndexAssigner.start(); } @Override @@ -69,6 +76,14 @@ public class PipeRuntimeAgent implements IService { return ServiceType.PIPE_RUNTIME_AGENT; } + ////////////////////// SimpleConsensus ProgressIndex Assigner ////////////////////// + + public SimpleProgressIndex assignSimpleProgressIndex() { + return simpleConsensusProgressIndexAssigner.assign(); + } + + //////////////////////////// Runtime Exception Handlers //////////////////////////// + public void report(PipeSubtask subtask) { // TODO: terminate the task by the given taskID LOGGER.warn( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java new file mode 100644 index 00000000000..195637a63de --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.runtime; + +import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; + +public class SimpleConsensusProgressIndexAssigner { + + private static final Logger LOGGER = + LoggerFactory.getLogger(SimpleConsensusProgressIndexAssigner.class); + + private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + private static final String PIPE_SYSTEM_DIR = + IoTDBDescriptor.getInstance().getConfig().getSystemDir() + + File.separator + + "pipe" + + File.separator; + private static final String REBOOT_TIMES_FILE_NAME = "reboot_times.txt"; + + private boolean isEnable = false; + + private int rebootTimes = 0; + private final AtomicLong memtableFlushOrderId = new AtomicLong(0); + + public void start() throws StartupException { + // only works for simple consensus + if (!IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) { + return; + } + + isEnable = true; + LOGGER.info("Start SimpleConsensusProgressIndexAssigner ..."); + + try { + makeDirIfNecessary(); + parseRebootTimes(); + recordRebootTimes(); + } catch (Exception e) { + throw new StartupException(e); + } + } + + private void makeDirIfNecessary() throws IOException { + File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR); + if (file.exists() && file.isDirectory()) { + return; + } + FileUtils.forceMkdir(file); + } + + private void parseRebootTimes() { + File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME); + if (!file.exists()) { + rebootTimes = 0; + return; + } + try { + String content = FileUtils.readFileToString(file, "UTF-8"); + rebootTimes = Integer.parseInt(content); + } catch (IOException e) { + LOGGER.error("Cannot parse reboot times from file {}", file.getAbsolutePath(), e); + rebootTimes = 0; + } + } + + private void recordRebootTimes() throws IOException { + File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME); + FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), "UTF-8"); + } + + public SimpleProgressIndex assign() { + return isEnable + ? new SimpleProgressIndex(rebootTimes, memtableFlushOrderId.getAndIncrement()) + : null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index db28843d7c7..e153f902fbd 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -836,7 +836,7 @@ public class DataNode implements DataNodeMBean { } private void preparePipeResources() throws StartupException { - PipeAgent.runtime().launchPipePluginAgent(resourcesInformationHolder); + PipeAgent.runtime().preparePipeResources(resourcesInformationHolder); } private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
