This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eb96aa72b9 [IOTDB-4582]Implement snapshot of TriggerInfo (#7542)
eb96aa72b9 is described below

commit eb96aa72b9ec299ec31eef387e1c5187884f603c
Author: Weihao Li <[email protected]>
AuthorDate: Sat Oct 8 21:32:08 2022 +0800

    [IOTDB-4582]Implement snapshot of TriggerInfo (#7542)
---
 .gitignore                                         |   2 +
 .../iotdb/confignode/persistence/TriggerInfo.java  | 108 +++++++++++++++++--
 .../confignode/persistence/TriggerInfoTest.java    | 120 +++++++++++++++++++++
 .../confignode/IoTDBConfigNodeSnapshotIT.java      |  93 ++++++++++++++++
 .../src/test/resources/trigger-example.jar         | Bin 0 -> 9221 bytes
 .../iotdb/commons/trigger/TriggerInformation.java  |   8 +-
 .../apache/iotdb/commons/trigger/TriggerTable.java |  31 ++++--
 7 files changed, 343 insertions(+), 19 deletions(-)

diff --git a/.gitignore b/.gitignore
index 6b90b2f1f5..878cc06c88 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,6 +37,8 @@ tsfile-jdbc/src/main/resources/output/queryRes.csv
 *.txt
 
 *.jar
+!trigger-example.jar
+
 *.gz
 *.tar.gz
 *.tar
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index 77c7464be9..2f84afec3d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -32,16 +32,22 @@ import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTrigger
 import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
 import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class TriggerInfo implements SnapshotProcessor {
@@ -79,17 +85,6 @@ public class TriggerInfo implements SnapshotProcessor {
     triggerTableLock.unlock();
   }
 
-  @Override
-  public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
-    // TODO implement when 'Drop Trigger' done
-    return true;
-  }
-
-  @Override
-  public void processLoadSnapshot(File snapshotDir) throws TException, 
IOException {
-    // TODO implement when 'Drop Trigger' done
-  }
-
   /**
    * Validate whether the trigger can be created
    *
@@ -172,4 +167,95 @@ public class TriggerInfo implements SnapshotProcessor {
         new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
         triggerTable.getAllTriggerInformation());
   }
+
+  /** only used in Test */
+  public Map<String, TriggerInformation> getRawTriggerTable() {
+    return triggerTable.getTable();
+  }
+
+  /** only used in Test */
+  public Map<String, String> getRawExistedJarToMD5() {
+    return existedJarToMD5;
+  }
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot file [{}] is already 
exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+    File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + 
UUID.randomUUID());
+
+    acquireTriggerTableLock();
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile)) {
+
+      serializeExistedJarToMD5(fileOutputStream);
+
+      triggerTable.serializeTriggerTable(fileOutputStream);
+
+      fileOutputStream.flush();
+
+      fileOutputStream.close();
+
+      return tmpFile.renameTo(snapshotFile);
+    } finally {
+      releaseTriggerTableLock();
+      for (int retry = 0; retry < 5; retry++) {
+        if (!tmpFile.exists() || tmpFile.delete()) {
+          break;
+        } else {
+          LOGGER.warn(
+              "Can't delete temporary snapshot file: {}, retrying...", 
tmpFile.getAbsolutePath());
+        }
+      }
+    }
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws TException, 
IOException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+
+    acquireTriggerTableLock();
+    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+
+      clear();
+
+      deserializeExistedJarToMD5(fileInputStream);
+
+      triggerTable.deserializeTriggerTable(fileInputStream);
+    } finally {
+      releaseTriggerTableLock();
+    }
+  }
+
+  public void serializeExistedJarToMD5(OutputStream outputStream) throws 
IOException {
+    ReadWriteIOUtils.write(existedJarToMD5.size(), outputStream);
+    for (Map.Entry<String, String> entry : existedJarToMD5.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+  }
+
+  public void deserializeExistedJarToMD5(InputStream inputStream) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(inputStream);
+    while (size > 0) {
+      existedJarToMD5.put(
+          ReadWriteIOUtils.readString(inputStream), 
ReadWriteIOUtils.readString(inputStream));
+      size--;
+    }
+  }
+
+  public void clear() {
+    existedJarToMD5.clear();
+    triggerTable.clear();
+  }
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TriggerInfoTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TriggerInfoTest.java
new file mode 100644
index 0000000000..c59ef76ca7
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TriggerInfoTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class TriggerInfoTest {
+
+  private static TriggerInfo triggerInfo;
+  private static TriggerInfo triggerInfoSaveBefore;
+  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, 
"snapshot");
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    triggerInfo = new TriggerInfo();
+    triggerInfoSaveBefore = new TriggerInfo();
+    if (!snapshotDir.exists()) {
+      snapshotDir.mkdirs();
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    triggerInfo.clear();
+    if (snapshotDir.exists()) {
+      FileUtils.deleteDirectory(snapshotDir);
+    }
+  }
+
+  @Test
+  public void testSnapshot() throws TException, IOException, 
IllegalPathException {
+    TriggerInformation triggerInformation =
+        new TriggerInformation(
+            new PartialPath("root.test.**"),
+            "test1",
+            "test1.class",
+            "test1.jar",
+            null,
+            TriggerEvent.AFTER_INSERT,
+            TTriggerState.INACTIVE,
+            false,
+            null,
+            "testMD5test");
+    AddTriggerInTablePlan addTriggerInTablePlan =
+        new AddTriggerInTablePlan(triggerInformation, new Binary(new byte[] 
{1, 2, 3}));
+    triggerInfo.addTriggerInTable(addTriggerInTablePlan);
+    triggerInfoSaveBefore.addTriggerInTable(addTriggerInTablePlan);
+
+    Map<String, String> attributes = new HashMap<>();
+    attributes.put("test-key", "test-value");
+    triggerInformation =
+        new TriggerInformation(
+            new PartialPath("root.test.**"),
+            "test2",
+            "test2.class",
+            "test2.jar",
+            attributes,
+            TriggerEvent.BEFORE_INSERT,
+            TTriggerState.INACTIVE,
+            false,
+            new TDataNodeLocation(
+                10000,
+                new TEndPoint("127.0.0.1", 6600),
+                new TEndPoint("127.0.0.1", 7700),
+                new TEndPoint("127.0.0.1", 8800),
+                new TEndPoint("127.0.0.1", 9900),
+                new TEndPoint("127.0.0.1", 11000)),
+            "testMD5test");
+    addTriggerInTablePlan = new AddTriggerInTablePlan(triggerInformation, 
null);
+    triggerInfo.addTriggerInTable(addTriggerInTablePlan);
+    triggerInfoSaveBefore.addTriggerInTable(addTriggerInTablePlan);
+
+    triggerInfo.processTakeSnapshot(snapshotDir);
+    triggerInfo.clear();
+    triggerInfo.processLoadSnapshot(snapshotDir);
+
+    Assert.assertEquals(
+        triggerInfoSaveBefore.getRawTriggerTable(), 
triggerInfo.getRawTriggerTable());
+    Assert.assertEquals(
+        triggerInfoSaveBefore.getRawExistedJarToMD5(), 
triggerInfo.getRawExistedJarToMD5());
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
index e9fba69f87..07e999db7d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
@@ -24,9 +24,14 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
@@ -36,8 +41,12 @@ import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.trigger.api.enums.TriggerType;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
@@ -46,8 +55,10 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -117,6 +128,8 @@ public class IoTDBConfigNodeSnapshotIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getConfigNodeConnection()) {
 
+      List<TCreateTriggerReq> createTriggerReqs = createTrigger(client);
+
       for (int i = 0; i < storageGroupNum; i++) {
         String storageGroup = sg + i;
         TSetStorageGroupReq setStorageGroupReq =
@@ -181,6 +194,86 @@ public class IoTDBConfigNodeSnapshotIT {
           }
         }
       }
+
+      assertTriggerInformation(createTriggerReqs, client.getTriggerTable());
+    }
+  }
+
+  private List<TCreateTriggerReq> createTrigger(SyncConfigNodeIServiceClient 
client)
+      throws IllegalPathException, TException, IOException {
+    final String triggerPath =
+        System.getProperty("user.dir")
+            + File.separator
+            + "target"
+            + File.separator
+            + "test-classes"
+            + File.separator
+            + "trigger-example.jar";
+    ByteBuffer jarFile = 
TriggerExecutableManager.transferToBytebuffer(triggerPath);
+    String jarMD5 = DigestUtils.md5Hex(jarFile.array());
+
+    TCreateTriggerReq createTriggerReq1 =
+        new TCreateTriggerReq(
+                "test1",
+                "org.apache.iotdb.trigger.SimpleTrigger",
+                "trigger-example.jar",
+                false,
+                TriggerEvent.AFTER_INSERT.getId(),
+                TriggerType.STATELESS.getId(),
+                new PartialPath("root.test1.**").serialize(),
+                Collections.emptyMap(),
+                FailureStrategy.OPTIMISTIC.getId())
+            .setJarMD5(jarMD5)
+            .setJarFile(jarFile);
+
+    Map<String, String> attributes = new HashMap<>();
+    attributes.put("test-key", "test-value");
+    TCreateTriggerReq createTriggerReq2 =
+        new TCreateTriggerReq(
+                "test2",
+                "org.apache.iotdb.trigger.SimpleTrigger",
+                "trigger-example.jar",
+                false,
+                TriggerEvent.BEFORE_INSERT.getId(),
+                TriggerType.STATEFUL.getId(),
+                new PartialPath("root.test2.**").serialize(),
+                attributes,
+                FailureStrategy.OPTIMISTIC.getId())
+            .setJarMD5(jarMD5)
+            .setJarFile(jarFile);
+
+    Assert.assertEquals(
+        client.createTrigger(createTriggerReq1).getCode(),
+        TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    Assert.assertEquals(
+        client.createTrigger(createTriggerReq2).getCode(),
+        TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+    List<TCreateTriggerReq> result = new ArrayList<>();
+    result.add(createTriggerReq2);
+    result.add(createTriggerReq1);
+    return result;
+  }
+
+  private void assertTriggerInformation(List<TCreateTriggerReq> req, 
TGetTriggerTableResp resp) {
+    for (int i = 0; i < req.size(); i++) {
+      TCreateTriggerReq createTriggerReq = req.get(i);
+      TriggerInformation triggerInformation =
+          
TriggerInformation.deserialize(resp.getAllTriggerInformation().get(i));
+
+      Assert.assertEquals(createTriggerReq.getTriggerName(), 
triggerInformation.getTriggerName());
+      Assert.assertEquals(createTriggerReq.getClassName(), 
triggerInformation.getClassName());
+      Assert.assertEquals(createTriggerReq.getJarPath(), 
triggerInformation.getJarName());
+      Assert.assertEquals(
+          createTriggerReq.getTriggerEvent(), 
triggerInformation.getEvent().getId());
+      Assert.assertEquals(
+          createTriggerReq.getTriggerType(),
+          triggerInformation.isStateful()
+              ? TriggerType.STATEFUL.getId()
+              : TriggerType.STATELESS.getId());
+      Assert.assertEquals(
+          
PathDeserializeUtil.deserialize(ByteBuffer.wrap(createTriggerReq.getPathPattern())),
+          triggerInformation.getPathPattern());
     }
   }
 }
diff --git a/integration-test/src/test/resources/trigger-example.jar 
b/integration-test/src/test/resources/trigger-example.jar
new file mode 100644
index 0000000000..619df8f54f
Binary files /dev/null and 
b/integration-test/src/test/resources/trigger-example.jar differ
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index 119321d143..4fc49801b3 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
@@ -121,6 +122,11 @@ public class TriggerInformation {
     return triggerInformation;
   }
 
+  public static TriggerInformation deserialize(InputStream inputStream) throws 
IOException {
+    return deserialize(
+        
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -134,7 +140,7 @@ public class TriggerInformation {
         && Objects.equals(attributes, that.attributes)
         && event == that.event
         && triggerState == that.triggerState
-        && (isStateful() ? Objects.equals(dataNodeLocation, 
that.dataNodeLocation) : true)
+        && (!isStateful() || Objects.equals(dataNodeLocation, 
that.dataNodeLocation))
         && Objects.equals(jarFileMD5, that.jarFileMD5);
   }
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index 75f97417bb..eed43d15fd 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -19,11 +19,14 @@
 package org.apache.iotdb.commons.trigger;
 
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -70,12 +73,6 @@ public class TriggerTable {
     triggerTable.get(triggerName).setTriggerState(triggerState);
   }
 
-  // for showTrigger
-  public Map<String, TTriggerState> getAllTriggerStates() {
-    Map<String, TTriggerState> allTriggerStates = new 
HashMap<>(triggerTable.size());
-    triggerTable.forEach((k, v) -> allTriggerStates.put(k, 
v.getTriggerState()));
-    return allTriggerStates;
-  }
   // for getTriggerTable
   public List<TriggerInformation> getAllTriggerInformation() {
     return new ArrayList<>(triggerTable.values());
@@ -88,4 +85,24 @@ public class TriggerTable {
   public Map<String, TriggerInformation> getTable() {
     return triggerTable;
   }
+
+  public void serializeTriggerTable(OutputStream outputStream) throws 
IOException {
+    ReadWriteIOUtils.write(triggerTable.size(), outputStream);
+    for (TriggerInformation triggerInformation : triggerTable.values()) {
+      ReadWriteIOUtils.write(triggerInformation.serialize(), outputStream);
+    }
+  }
+
+  public void deserializeTriggerTable(InputStream inputStream) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(inputStream);
+    while (size > 0) {
+      TriggerInformation triggerInformation = 
TriggerInformation.deserialize(inputStream);
+      triggerTable.put(triggerInformation.getTriggerName(), 
triggerInformation);
+      size--;
+    }
+  }
+
+  public void clear() {
+    triggerTable.clear();
+  }
 }

Reply via email to