This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5723
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5d8765d939929a9c0c4ad8a4f2ffe9f86805cf20
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun May 28 22:41:52 2023 +0800

    progress index impl for simple consensus
---
 .../commons/consensus/index/ProgressIndexType.java |   6 +
 .../consensus/index/impl/SimpleProgressIndex.java  | 160 +++++++++++++++++++++
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  17 ++-
 .../SimpleConsensusProgressIndexAssigner.java      | 108 ++++++++++++++
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +-
 5 files changed, 291 insertions(+), 2 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index 37d83a52efb..02afa4045df 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.consensus.index;
 
 import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -31,6 +32,7 @@ import java.nio.ByteBuffer;
 public enum ProgressIndexType {
   MINIMUM_CONSENSUS_INDEX((short) 1),
   IOT_CONSENSUS_INDEX((short) 2),
+  SIMPLE_CONSENSUS_INDEX((short) 3),
   ;
 
   private final short type;
@@ -58,6 +60,8 @@ public enum ProgressIndexType {
         return MinimumProgressIndex.deserializeFrom(byteBuffer);
       case 2:
         return IoTProgressIndex.deserializeFrom(byteBuffer);
+      case 3:
+        return SimpleProgressIndex.deserializeFrom(byteBuffer);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
@@ -71,6 +75,8 @@ public enum ProgressIndexType {
         return MinimumProgressIndex.deserializeFrom(stream);
       case 2:
         return IoTProgressIndex.deserializeFrom(stream);
+      case 3:
+        return SimpleProgressIndex.deserializeFrom(stream);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
new file mode 100644
index 00000000000..6571d90759a
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class SimpleProgressIndex implements ProgressIndex {
+
+  private final int rebootTimes;
+  private final long memtableFlushOrderId;
+
+  public SimpleProgressIndex(int rebootTimes, long memtableFlushOrderId) {
+    this.rebootTimes = rebootTimes;
+    this.memtableFlushOrderId = memtableFlushOrderId;
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(byteBuffer);
+
+    ReadWriteIOUtils.write(rebootTimes, byteBuffer);
+    ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(stream);
+
+    ReadWriteIOUtils.write(rebootTimes, stream);
+    ReadWriteIOUtils.write(memtableFlushOrderId, stream);
+  }
+
+  @Override
+  public boolean isAfter(ProgressIndex progressIndex) {
+    if (progressIndex instanceof MinimumProgressIndex) {
+      return true;
+    }
+
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return false;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) 
progressIndex;
+    if (thisSimpleProgressIndex.rebootTimes > 
thatSimpleProgressIndex.rebootTimes) {
+      return true;
+    }
+    if (thisSimpleProgressIndex.rebootTimes < 
thatSimpleProgressIndex.rebootTimes) {
+      return false;
+    }
+    // thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
+    return thisSimpleProgressIndex.memtableFlushOrderId
+        > thatSimpleProgressIndex.memtableFlushOrderId;
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return false;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) 
progressIndex;
+    return thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
+        && thisSimpleProgressIndex.memtableFlushOrderId
+            == thatSimpleProgressIndex.memtableFlushOrderId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof SimpleProgressIndex)) {
+      return false;
+    }
+    return this.equals((SimpleProgressIndex) obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return this;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) 
progressIndex;
+    if (thisSimpleProgressIndex.rebootTimes > 
thatSimpleProgressIndex.rebootTimes) {
+      return this;
+    }
+    if (thisSimpleProgressIndex.rebootTimes < 
thatSimpleProgressIndex.rebootTimes) {
+      return progressIndex;
+    }
+    // thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
+    if (thisSimpleProgressIndex.memtableFlushOrderId
+        > thatSimpleProgressIndex.memtableFlushOrderId) {
+      return this;
+    }
+    if (thisSimpleProgressIndex.memtableFlushOrderId
+        < thatSimpleProgressIndex.memtableFlushOrderId) {
+      return progressIndex;
+    }
+    // thisSimpleProgressIndex.memtableFlushOrderId == 
thatSimpleProgressIndex.memtableFlushOrderId
+    return this;
+  }
+
+  public static SimpleProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final int rebootTimes = ReadWriteIOUtils.readInt(byteBuffer);
+    final long memtableFlushOrderId = ReadWriteIOUtils.readLong(byteBuffer);
+    return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
+  }
+
+  public static SimpleProgressIndex deserializeFrom(InputStream stream) throws 
IOException {
+    final int rebootTimes = ReadWriteIOUtils.readInt(stream);
+    final long memtableFlushOrderId = ReadWriteIOUtils.readLong(stream);
+    return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
+  }
+
+  @Override
+  public String toString() {
+    return "SimpleProgressIndex{"
+        + "rebootTimes="
+        + rebootTimes
+        + ", memtableFlushOrderId="
+        + memtableFlushOrderId
+        + '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 5253d5a2db2..486afe825e1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -38,9 +39,15 @@ public class PipeRuntimeAgent implements IService {
 
   private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
-  public synchronized void launchPipePluginAgent(
+  private final SimpleConsensusProgressIndexAssigner 
simpleConsensusProgressIndexAssigner =
+      new SimpleConsensusProgressIndexAssigner();
+
+  //////////////////////////// System Service Interface 
////////////////////////////
+
+  public synchronized void preparePipeResources(
       ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
     PipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
+    simpleConsensusProgressIndexAssigner.start();
   }
 
   @Override
@@ -69,6 +76,14 @@ public class PipeRuntimeAgent implements IService {
     return ServiceType.PIPE_RUNTIME_AGENT;
   }
 
+  ////////////////////// SimpleConsensus ProgressIndex Assigner 
//////////////////////
+
+  public SimpleProgressIndex assignSimpleProgressIndex() {
+    return simpleConsensusProgressIndexAssigner.assign();
+  }
+
+  //////////////////////////// Runtime Exception Handlers 
////////////////////////////
+
   public void report(PipeSubtask subtask) {
     // TODO: terminate the task by the given taskID
     LOGGER.warn(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
new file mode 100644
index 00000000000..195637a63de
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+
+public class SimpleConsensusProgressIndexAssigner {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SimpleConsensusProgressIndexAssigner.class);
+
+  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static final String PIPE_SYSTEM_DIR =
+      IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+          + File.separator
+          + "pipe"
+          + File.separator;
+  private static final String REBOOT_TIMES_FILE_NAME = "reboot_times.txt";
+
+  private boolean isEnable = false;
+
+  private int rebootTimes = 0;
+  private final AtomicLong memtableFlushOrderId = new AtomicLong(0);
+
+  public void start() throws StartupException {
+    // only works for simple consensus
+    if 
(!IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
+      return;
+    }
+
+    isEnable = true;
+    LOGGER.info("Start SimpleConsensusProgressIndexAssigner ...");
+
+    try {
+      makeDirIfNecessary();
+      parseRebootTimes();
+      recordRebootTimes();
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void makeDirIfNecessary() throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR);
+    if (file.exists() && file.isDirectory()) {
+      return;
+    }
+    FileUtils.forceMkdir(file);
+  }
+
+  private void parseRebootTimes() {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + 
REBOOT_TIMES_FILE_NAME);
+    if (!file.exists()) {
+      rebootTimes = 0;
+      return;
+    }
+    try {
+      String content = FileUtils.readFileToString(file, "UTF-8");
+      rebootTimes = Integer.parseInt(content);
+    } catch (IOException e) {
+      LOGGER.error("Cannot parse reboot times from file {}", 
file.getAbsolutePath(), e);
+      rebootTimes = 0;
+    }
+  }
+
+  private void recordRebootTimes() throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + 
REBOOT_TIMES_FILE_NAME);
+    FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), 
"UTF-8");
+  }
+
+  public SimpleProgressIndex assign() {
+    return isEnable
+        ? new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement())
+        : null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index db28843d7c7..e153f902fbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -836,7 +836,7 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void preparePipeResources() throws StartupException {
-    PipeAgent.runtime().launchPipePluginAgent(resourcesInformationHolder);
+    PipeAgent.runtime().preparePipeResources(resourcesInformationHolder);
   }
 
   private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {

Reply via email to