This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new eb96aa72b9 [IOTDB-4582]Implement snapshot of TriggerInfo (#7542)
eb96aa72b9 is described below
commit eb96aa72b9ec299ec31eef387e1c5187884f603c
Author: Weihao Li <[email protected]>
AuthorDate: Sat Oct 8 21:32:08 2022 +0800
[IOTDB-4582]Implement snapshot of TriggerInfo (#7542)
---
.gitignore | 2 +
.../iotdb/confignode/persistence/TriggerInfo.java | 108 +++++++++++++++++--
.../confignode/persistence/TriggerInfoTest.java | 120 +++++++++++++++++++++
.../confignode/IoTDBConfigNodeSnapshotIT.java | 93 ++++++++++++++++
.../src/test/resources/trigger-example.jar | Bin 0 -> 9221 bytes
.../iotdb/commons/trigger/TriggerInformation.java | 8 +-
.../apache/iotdb/commons/trigger/TriggerTable.java | 31 ++++--
7 files changed, 343 insertions(+), 19 deletions(-)
diff --git a/.gitignore b/.gitignore
index 6b90b2f1f5..878cc06c88 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,6 +37,8 @@ tsfile-jdbc/src/main/resources/output/queryRes.csv
*.txt
*.jar
+!trigger-example.jar
+
*.gz
*.tar.gz
*.tar
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index 77c7464be9..2f84afec3d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -32,16 +32,22 @@ import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTrigger
import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
public class TriggerInfo implements SnapshotProcessor {
@@ -79,17 +85,6 @@ public class TriggerInfo implements SnapshotProcessor {
triggerTableLock.unlock();
}
- @Override
- public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
- // TODO implement when 'Drop Trigger' done
- return true;
- }
-
- @Override
- public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
- // TODO implement when 'Drop Trigger' done
- }
-
/**
* Validate whether the trigger can be created
*
@@ -172,4 +167,95 @@ public class TriggerInfo implements SnapshotProcessor {
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
triggerTable.getAllTriggerInformation());
}
+
+ /** only used in Test */
+ public Map<String, TriggerInformation> getRawTriggerTable() {
+ return triggerTable.getTable();
+ }
+
+ /** only used in Test */
+ public Map<String, String> getRawExistedJarToMD5() {
+ return existedJarToMD5;
+ }
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot file [{}] is already
exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+ File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" +
UUID.randomUUID());
+
+ acquireTriggerTableLock();
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile)) {
+
+ serializeExistedJarToMD5(fileOutputStream);
+
+ triggerTable.serializeTriggerTable(fileOutputStream);
+
+ fileOutputStream.flush();
+
+ fileOutputStream.close();
+
+ return tmpFile.renameTo(snapshotFile);
+ } finally {
+ releaseTriggerTableLock();
+ for (int retry = 0; retry < 5; retry++) {
+ if (!tmpFile.exists() || tmpFile.delete()) {
+ break;
+ } else {
+ LOGGER.warn(
+ "Can't delete temporary snapshot file: {}, retrying...",
tmpFile.getAbsolutePath());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+
+ acquireTriggerTableLock();
+ try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+
+ clear();
+
+ deserializeExistedJarToMD5(fileInputStream);
+
+ triggerTable.deserializeTriggerTable(fileInputStream);
+ } finally {
+ releaseTriggerTableLock();
+ }
+ }
+
+ public void serializeExistedJarToMD5(OutputStream outputStream) throws
IOException {
+ ReadWriteIOUtils.write(existedJarToMD5.size(), outputStream);
+ for (Map.Entry<String, String> entry : existedJarToMD5.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ }
+
+ public void deserializeExistedJarToMD5(InputStream inputStream) throws
IOException {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ while (size > 0) {
+ existedJarToMD5.put(
+ ReadWriteIOUtils.readString(inputStream),
ReadWriteIOUtils.readString(inputStream));
+ size--;
+ }
+ }
+
+ public void clear() {
+ existedJarToMD5.clear();
+ triggerTable.clear();
+ }
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TriggerInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TriggerInfoTest.java
new file mode 100644
index 0000000000..c59ef76ca7
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TriggerInfoTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class TriggerInfoTest {
+
+ private static TriggerInfo triggerInfo;
+ private static TriggerInfo triggerInfoSaveBefore;
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH,
"snapshot");
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ triggerInfo = new TriggerInfo();
+ triggerInfoSaveBefore = new TriggerInfo();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ triggerInfo.clear();
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ @Test
+ public void testSnapshot() throws TException, IOException,
IllegalPathException {
+ TriggerInformation triggerInformation =
+ new TriggerInformation(
+ new PartialPath("root.test.**"),
+ "test1",
+ "test1.class",
+ "test1.jar",
+ null,
+ TriggerEvent.AFTER_INSERT,
+ TTriggerState.INACTIVE,
+ false,
+ null,
+ "testMD5test");
+ AddTriggerInTablePlan addTriggerInTablePlan =
+ new AddTriggerInTablePlan(triggerInformation, new Binary(new byte[]
{1, 2, 3}));
+ triggerInfo.addTriggerInTable(addTriggerInTablePlan);
+ triggerInfoSaveBefore.addTriggerInTable(addTriggerInTablePlan);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("test-key", "test-value");
+ triggerInformation =
+ new TriggerInformation(
+ new PartialPath("root.test.**"),
+ "test2",
+ "test2.class",
+ "test2.jar",
+ attributes,
+ TriggerEvent.BEFORE_INSERT,
+ TTriggerState.INACTIVE,
+ false,
+ new TDataNodeLocation(
+ 10000,
+ new TEndPoint("127.0.0.1", 6600),
+ new TEndPoint("127.0.0.1", 7700),
+ new TEndPoint("127.0.0.1", 8800),
+ new TEndPoint("127.0.0.1", 9900),
+ new TEndPoint("127.0.0.1", 11000)),
+ "testMD5test");
+ addTriggerInTablePlan = new AddTriggerInTablePlan(triggerInformation,
null);
+ triggerInfo.addTriggerInTable(addTriggerInTablePlan);
+ triggerInfoSaveBefore.addTriggerInTable(addTriggerInTablePlan);
+
+ triggerInfo.processTakeSnapshot(snapshotDir);
+ triggerInfo.clear();
+ triggerInfo.processLoadSnapshot(snapshotDir);
+
+ Assert.assertEquals(
+ triggerInfoSaveBefore.getRawTriggerTable(),
triggerInfo.getRawTriggerTable());
+ Assert.assertEquals(
+ triggerInfoSaveBefore.getRawExistedJarToMD5(),
triggerInfo.getRawExistedJarToMD5());
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
index e9fba69f87..07e999db7d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
@@ -24,9 +24,14 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
@@ -36,8 +41,12 @@ import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
+import org.apache.iotdb.trigger.api.enums.TriggerEvent;
+import org.apache.iotdb.trigger.api.enums.TriggerType;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
@@ -46,8 +55,10 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -117,6 +128,8 @@ public class IoTDBConfigNodeSnapshotIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getConfigNodeConnection()) {
+ List<TCreateTriggerReq> createTriggerReqs = createTrigger(client);
+
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
TSetStorageGroupReq setStorageGroupReq =
@@ -181,6 +194,86 @@ public class IoTDBConfigNodeSnapshotIT {
}
}
}
+
+ assertTriggerInformation(createTriggerReqs, client.getTriggerTable());
+ }
+ }
+
+ private List<TCreateTriggerReq> createTrigger(SyncConfigNodeIServiceClient
client)
+ throws IllegalPathException, TException, IOException {
+ final String triggerPath =
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator
+ + "trigger-example.jar";
+ ByteBuffer jarFile =
TriggerExecutableManager.transferToBytebuffer(triggerPath);
+ String jarMD5 = DigestUtils.md5Hex(jarFile.array());
+
+ TCreateTriggerReq createTriggerReq1 =
+ new TCreateTriggerReq(
+ "test1",
+ "org.apache.iotdb.trigger.SimpleTrigger",
+ "trigger-example.jar",
+ false,
+ TriggerEvent.AFTER_INSERT.getId(),
+ TriggerType.STATELESS.getId(),
+ new PartialPath("root.test1.**").serialize(),
+ Collections.emptyMap(),
+ FailureStrategy.OPTIMISTIC.getId())
+ .setJarMD5(jarMD5)
+ .setJarFile(jarFile);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("test-key", "test-value");
+ TCreateTriggerReq createTriggerReq2 =
+ new TCreateTriggerReq(
+ "test2",
+ "org.apache.iotdb.trigger.SimpleTrigger",
+ "trigger-example.jar",
+ false,
+ TriggerEvent.BEFORE_INSERT.getId(),
+ TriggerType.STATEFUL.getId(),
+ new PartialPath("root.test2.**").serialize(),
+ attributes,
+ FailureStrategy.OPTIMISTIC.getId())
+ .setJarMD5(jarMD5)
+ .setJarFile(jarFile);
+
+ Assert.assertEquals(
+ client.createTrigger(createTriggerReq1).getCode(),
+ TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ Assert.assertEquals(
+ client.createTrigger(createTriggerReq2).getCode(),
+ TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+ List<TCreateTriggerReq> result = new ArrayList<>();
+ result.add(createTriggerReq2);
+ result.add(createTriggerReq1);
+ return result;
+ }
+
+ private void assertTriggerInformation(List<TCreateTriggerReq> req,
TGetTriggerTableResp resp) {
+ for (int i = 0; i < req.size(); i++) {
+ TCreateTriggerReq createTriggerReq = req.get(i);
+ TriggerInformation triggerInformation =
+
TriggerInformation.deserialize(resp.getAllTriggerInformation().get(i));
+
+ Assert.assertEquals(createTriggerReq.getTriggerName(),
triggerInformation.getTriggerName());
+ Assert.assertEquals(createTriggerReq.getClassName(),
triggerInformation.getClassName());
+ Assert.assertEquals(createTriggerReq.getJarPath(),
triggerInformation.getJarName());
+ Assert.assertEquals(
+ createTriggerReq.getTriggerEvent(),
triggerInformation.getEvent().getId());
+ Assert.assertEquals(
+ createTriggerReq.getTriggerType(),
+ triggerInformation.isStateful()
+ ? TriggerType.STATEFUL.getId()
+ : TriggerType.STATELESS.getId());
+ Assert.assertEquals(
+
PathDeserializeUtil.deserialize(ByteBuffer.wrap(createTriggerReq.getPathPattern())),
+ triggerInformation.getPathPattern());
}
}
}
diff --git a/integration-test/src/test/resources/trigger-example.jar
b/integration-test/src/test/resources/trigger-example.jar
new file mode 100644
index 0000000000..619df8f54f
Binary files /dev/null and
b/integration-test/src/test/resources/trigger-example.jar differ
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index 119321d143..4fc49801b3 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -121,6 +122,11 @@ public class TriggerInformation {
return triggerInformation;
}
+ public static TriggerInformation deserialize(InputStream inputStream) throws
IOException {
+ return deserialize(
+
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -134,7 +140,7 @@ public class TriggerInformation {
&& Objects.equals(attributes, that.attributes)
&& event == that.event
&& triggerState == that.triggerState
- && (isStateful() ? Objects.equals(dataNodeLocation,
that.dataNodeLocation) : true)
+ && (!isStateful() || Objects.equals(dataNodeLocation,
that.dataNodeLocation))
&& Objects.equals(jarFileMD5, that.jarFileMD5);
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index 75f97417bb..eed43d15fd 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.commons.trigger;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -70,12 +73,6 @@ public class TriggerTable {
triggerTable.get(triggerName).setTriggerState(triggerState);
}
- // for showTrigger
- public Map<String, TTriggerState> getAllTriggerStates() {
- Map<String, TTriggerState> allTriggerStates = new
HashMap<>(triggerTable.size());
- triggerTable.forEach((k, v) -> allTriggerStates.put(k,
v.getTriggerState()));
- return allTriggerStates;
- }
// for getTriggerTable
public List<TriggerInformation> getAllTriggerInformation() {
return new ArrayList<>(triggerTable.values());
@@ -88,4 +85,24 @@ public class TriggerTable {
public Map<String, TriggerInformation> getTable() {
return triggerTable;
}
+
+ public void serializeTriggerTable(OutputStream outputStream) throws
IOException {
+ ReadWriteIOUtils.write(triggerTable.size(), outputStream);
+ for (TriggerInformation triggerInformation : triggerTable.values()) {
+ ReadWriteIOUtils.write(triggerInformation.serialize(), outputStream);
+ }
+ }
+
+ public void deserializeTriggerTable(InputStream inputStream) throws
IOException {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ while (size > 0) {
+ TriggerInformation triggerInformation =
TriggerInformation.deserialize(inputStream);
+ triggerTable.put(triggerInformation.getTriggerName(),
triggerInformation);
+ size--;
+ }
+ }
+
+ public void clear() {
+ triggerTable.clear();
+ }
}