This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5788 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 270802123d38f6bab29027964526522b46dafe14 Author: Steve Yurong Su <[email protected]> AuthorDate: Sun Apr 23 20:44:55 2023 +0800 1. support built-in pipe plugins. 2. allow to create / drop the same function multiple times without errors on PipePluginAgent / PipePluginInfo to enable retry policy. 3. allow to drop a function that is not registered on leader config node so that user can have a chance to change the inconsistant registration state over different config/data nodes. --- .../persistence/pipe/PipePluginInfo.java | 37 +++++++------ .../impl/pipe/plugin/DropPipePluginProcedure.java | 6 +-- .../thrift/ConfigNodeRPCServiceProcessor.java | 1 - .../pipe/plugin/builtin/BuiltinPipePlugin.java | 36 ++++++++++++- .../builtin/connector/DoNothingConnector.java | 53 +++++++++++++++++- .../builtin/processor/DoNothingProcessor.java | 49 ++++++++++++++++- .../meta/ConfigNodePipePluginMetaKeeper.java | 25 +++------ .../plugin/meta/DataNodePipePluginMetaKeeper.java | 28 +++++++--- .../commons/pipe/plugin/meta/PipePluginMeta.java | 57 +++++++++++++------- .../pipe/plugin/meta/PipePluginMetaKeeper.java | 62 +++++++++++++++++----- .../config/executor/ClusterConfigTaskExecutor.java | 3 +- .../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 5 +- .../db/pipe/agent/plugin/PipePluginAgent.java | 41 ++++++++++---- 13 files changed, 311 insertions(+), 92 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 0b47dccb6c..3818070dac 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -84,6 +84,7 @@ public class PipePluginInfo implements SnapshotProcessor { /////////////////////////////// Validator /////////////////////////////// public void validateBeforeCreatingPipePlugin(String pluginName, String jarName, String jarMD5) { + // both build-in and user defined pipe plugin should be unique if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { throw new PipeManagementException( String.format( @@ -100,12 +101,13 @@ public class PipePluginInfo implements SnapshotProcessor { } public void validateBeforeDroppingPipePlugin(String pluginName) { - if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { - return; + if (pipePluginMetaKeeper.containsPipePlugin(pluginName) + && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) { + throw new PipeManagementException( + String.format( + "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin", + pluginName)); } - - throw new PipeManagementException( - String.format("Failed to drop PipePlugin [%s], the PipePlugin does not exist", pluginName)); } public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(String jarName) { @@ -119,32 +121,37 @@ public class PipePluginInfo implements SnapshotProcessor { /////////////////////////////// Pipe Plugin Management /////////////////////////////// - public TSStatus createPipePlugin(CreatePipePluginPlan physicalPlan) { + public TSStatus createPipePlugin(CreatePipePluginPlan createPipePluginPlan) { try { - final PipePluginMeta pipePluginMeta = physicalPlan.getPipePluginMeta(); + final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); + + // try to drop the old pipe plugin if exists to reduce the effect of the inconsistency + dropPipePlugin(new DropPipePluginPlan(pipePluginMeta.getPluginName())); + pipePluginMetaKeeper.addPipePluginMeta(pipePluginMeta.getPluginName(), pipePluginMeta); pipePluginMetaKeeper.addJarNameAndMd5( pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5()); - if (physicalPlan.getJarFile() != null) { + if (createPipePluginPlan.getJarFile() != null) { pipePluginExecutableManager.saveToInstallDir( - ByteBuffer.wrap(physicalPlan.getJarFile().getValues()), pipePluginMeta.getJarName()); + ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), + pipePluginMeta.getJarName()); } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (Exception e) { final String errorMessage = String.format( - "Failed to add PipePlugin [%s] in PipePlugin_Table on Config Nodes, because of %s", - physicalPlan.getPipePluginMeta().getPluginName(), e); + "Failed to execute createPipePlugin(%s) on config nodes, because of %s", + createPipePluginPlan.getPipePluginMeta().getPluginName(), e); LOGGER.warn(errorMessage, e); return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) .setMessage(errorMessage); } } - public TSStatus dropPipePlugin(DropPipePluginPlan physicalPlan) { - final String pluginName = physicalPlan.getPluginName(); + public TSStatus dropPipePlugin(DropPipePluginPlan dropPipePluginPlan) { + final String pluginName = dropPipePluginPlan.getPluginName(); if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { pipePluginMetaKeeper.removeJarNameAndMd5IfPossible( @@ -161,10 +168,10 @@ public class PipePluginInfo implements SnapshotProcessor { Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta())); } - public JarResp getPipePluginJar(GetPipePluginJarPlan physicalPlan) { + public JarResp getPipePluginJar(GetPipePluginJarPlan getPipePluginJarPlan) { try { List<ByteBuffer> jarList = new ArrayList<>(); - for (String jarName : physicalPlan.getJarNames()) { + for (String jarName : getPipePluginJarPlan.getJarNames()) { jarList.add( ExecutableManager.transferToBytebuffer( PipePluginExecutableManager.getInstance() diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index 6aa4d6109d..7c8268ecd0 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -110,7 +110,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi try { pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName); } catch (PipeManagementException e) { - // if the pipe plugin is not exist, we should end the procedure + // if the pipe plugin is a built-in plugin, we should not drop it LOGGER.warn(e.getMessage()); setFailure(new ProcedureException(e.getMessage())); pipePluginCoordinator.unlock(); @@ -180,14 +180,14 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnDataNodes({})", pluginName); // do nothing but wait for rolling back to the previous state: LOCK - // TODO: we should drop the pipe plugin on data nodes + // TODO: we should drop the pipe plugin on data nodes properly with RuntimeAgent's help } private void rollbackFromDropOnConfigNodes(ConfigNodeProcedureEnv env) { LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnConfigNodes({})", pluginName); // do nothing but wait for rolling back to the previous state: DROP_ON_DATA_NODES - // TODO: we should drop the pipe plugin on config nodes + // TODO: we should drop the pipe plugin on config nodes properly with RuntimeCoordinator's help } @Override diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 2e80a9b0d1..efc3541a8c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -897,7 +897,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac } @Override - @Deprecated public TGetAllPipeInfoResp getAllPipeInfo() { return configManager.getAllPipeInfo(); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index 2e21cbf893..d6972646d4 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -17,5 +17,39 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.plugin.builtin;public enum BuiltinPipePlugin { +package org.apache.iotdb.commons.pipe.plugin.builtin; + +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector; +import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; + +public enum BuiltinPipePlugin { + + // processors + DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class), + + // connectors + DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class), + ; + + private final String pipePluginName; + private final Class<?> pipePluginClass; + private final String className; + + BuiltinPipePlugin(String functionName, Class<?> pipePluginClass) { + this.pipePluginName = functionName; + this.pipePluginClass = pipePluginClass; + this.className = pipePluginClass.getName(); + } + + public String getPipePluginName() { + return pipePluginName; + } + + public Class<?> getPipePluginClass() { + return pipePluginClass; + } + + public String getClassName() { + return className; + } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java index e50d3c3c1e..503005fefa 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java @@ -17,5 +17,56 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.plugin.builtin.connector;public class DoNothingConnector { +package org.apache.iotdb.commons.pipe.plugin.builtin.connector; + +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent; + +public class DoNothingConnector implements PipeConnector { + + @Override + public void validate(PipeParameterValidator validator) { + // do nothing + } + + @Override + public void customize( + PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) { + // do nothing + } + + @Override + public void handshake() { + // do nothing + } + + @Override + public void heartbeat() { + // do nothing + } + + @Override + public void transfer(TabletInsertionEvent tabletInsertionEvent) { + // do nothing + } + + @Override + public void transfer(TsFileInsertionEvent tsFileInsertionEvent) { + // do nothing + } + + @Override + public void transfer(DeletionEvent deletionEvent) { + // do nothing + } + + @Override + public void close() { + // do nothing + } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java index 572a9d365a..45634119db 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java @@ -17,5 +17,52 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.plugin.builtin.processor;public class DoNothingProcessor { +package org.apache.iotdb.commons.pipe.plugin.builtin.processor; + +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent; + +import java.io.IOException; + +public class DoNothingProcessor implements PipeProcessor { + + @Override + public void validate(PipeParameterValidator validator) { + // do nothing + } + + @Override + public void customize( + PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { + // do nothing + } + + @Override + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) + throws IOException { + eventCollector.collectTabletInsertionEvent(tabletInsertionEvent); + } + + @Override + public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) + throws IOException { + eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent); + } + + @Override + public void process(DeletionEvent deletionEvent, EventCollector eventCollector) + throws IOException { + eventCollector.collectDeletionEvent(deletionEvent); + } + + @Override + public void close() { + // do nothing + } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java index c02fdeadb6..7c3632e4b4 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java @@ -33,7 +33,8 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper { protected final Map<String, Integer> jarNameToReferenceCountMap; public ConfigNodePipePluginMetaKeeper() { - super(); + loadBuiltInPlugins(); + jarNameToMd5Map = new HashMap<>(); jarNameToReferenceCountMap = new HashMap<>(); } @@ -67,6 +68,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper { } } + @Override public void processTakeSnapshot(OutputStream outputStream) throws IOException { ReadWriteIOUtils.write(jarNameToMd5Map.size(), outputStream); for (Map.Entry<String, String> entry : jarNameToMd5Map.entrySet()) { @@ -75,14 +77,13 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper { ReadWriteIOUtils.write(jarNameToReferenceCountMap.get(entry.getKey()), outputStream); } - ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), outputStream); - for (PipePluginMeta pipePluginMeta : pipeNameToPipeMetaMap.values()) { - ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream); - } + super.processTakeSnapshot(outputStream); } + @Override public void processLoadSnapshot(InputStream inputStream) throws IOException { - clear(); + jarNameToMd5Map.clear(); + jarNameToReferenceCountMap.clear(); final int jarSize = ReadWriteIOUtils.readInt(inputStream); for (int i = 0; i < jarSize; i++) { @@ -93,16 +94,6 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper { jarNameToReferenceCountMap.put(jarName, count); } - final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream); - for (int i = 0; i < pipePluginMetaSize; i++) { - final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream); - addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta); - } - } - - public void clear() { - pipeNameToPipeMetaMap.clear(); - jarNameToMd5Map.clear(); - jarNameToReferenceCountMap.clear(); + super.processLoadSnapshot(inputStream); } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java index 64d7c8ef94..42656ad870 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.plugin.meta; +import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader; import java.util.Map; @@ -26,28 +27,39 @@ import java.util.concurrent.ConcurrentHashMap; public class DataNodePipePluginMetaKeeper extends PipePluginMetaKeeper { - private final Map<String, Class<?>> pipeNameToPipeClassMap; + private final Map<String, Class<?>> pipePluginNameToClassMap; public DataNodePipePluginMetaKeeper() { - super(); - pipeNameToPipeClassMap = new ConcurrentHashMap<>(); + pipePluginNameToClassMap = new ConcurrentHashMap<>(); + + loadBuiltInPlugins(); + } + + @Override + protected void loadBuiltInPlugins() { + super.loadBuiltInPlugins(); + + for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) { + addPluginAndClass( + builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getPipePluginClass()); + } } public void addPluginAndClass(String pluginName, Class<?> clazz) { - pipeNameToPipeClassMap.put(pluginName.toUpperCase(), clazz); + pipePluginNameToClassMap.put(pluginName.toUpperCase(), clazz); } public Class<?> getPluginClass(String pluginName) { - return pipeNameToPipeClassMap.get(pluginName.toUpperCase()); + return pipePluginNameToClassMap.get(pluginName.toUpperCase()); } public void removePluginClass(String pluginName) { - pipeNameToPipeClassMap.remove(pluginName.toUpperCase()); + pipePluginNameToClassMap.remove(pluginName.toUpperCase()); } public void updatePluginClass(PipePluginMeta pipePluginMeta, PipePluginClassLoader classLoader) throws ClassNotFoundException { - Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader); - pipeNameToPipeClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass); + final Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader); + pipePluginNameToClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass); } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java index 08bcc614f0..5b11cd4d44 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java @@ -26,24 +26,39 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Objects; public class PipePluginMeta { - private String pluginName; + private final String pluginName; + private final String className; - private String className; + // jarName and jarMD5 are used to identify the jar file. + // they could be null if the plugin is built-in. they should be both null or both not null. + private final boolean isBuiltin; + private final String jarName; + private final String jarMD5; - private String jarName; + public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) { + this.pluginName = Objects.requireNonNull(pluginName).toUpperCase(); + this.className = Objects.requireNonNull(className); - private String jarMD5; + isBuiltin = false; + this.jarName = Objects.requireNonNull(jarName); + this.jarMD5 = Objects.requireNonNull(jarMD5); + } - private PipePluginMeta() {} + public PipePluginMeta(String pluginName, String className) { + this.pluginName = Objects.requireNonNull(pluginName).toUpperCase(); + this.className = Objects.requireNonNull(className); - public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) { - this.pluginName = pluginName.toUpperCase(); - this.className = className; - this.jarName = jarName; - this.jarMD5 = jarMD5; + this.isBuiltin = true; + this.jarName = null; + this.jarMD5 = null; + } + + public boolean isBuiltin() { + return isBuiltin; } public String getPluginName() { @@ -69,6 +84,10 @@ public class PipePluginMeta { return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } + /** + * All built-in plugins' information is kept in Java class {@code BuiltinPipePlugin }. So we never + * serialize the built-in plugins, then we don't need to serialize the isBuiltin field. + */ public void serialize(DataOutputStream outputStream) throws IOException { ReadWriteIOUtils.write(pluginName, outputStream); ReadWriteIOUtils.write(className, outputStream); @@ -77,12 +96,11 @@ public class PipePluginMeta { } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { - PipePluginMeta pipePluginMeta = new PipePluginMeta(); - pipePluginMeta.pluginName = ReadWriteIOUtils.readString(byteBuffer); - pipePluginMeta.className = ReadWriteIOUtils.readString(byteBuffer); - pipePluginMeta.jarName = ReadWriteIOUtils.readString(byteBuffer); - pipePluginMeta.jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - return pipePluginMeta; + final String pluginName = ReadWriteIOUtils.readString(byteBuffer); + final String className = ReadWriteIOUtils.readString(byteBuffer); + final String jarName = ReadWriteIOUtils.readString(byteBuffer); + final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); + return new PipePluginMeta(pluginName, className, jarName, jarMD5); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { @@ -101,8 +119,9 @@ public class PipePluginMeta { PipePluginMeta that = (PipePluginMeta) obj; return pluginName.equals(that.pluginName) && className.equals(that.className) - && jarName.equals(that.jarName) - && jarMD5.equals(that.jarMD5); + && isBuiltin == that.isBuiltin + && Objects.equals(jarName, that.jarName) + && Objects.equals(jarMD5, that.jarMD5); } @Override @@ -119,6 +138,8 @@ public class PipePluginMeta { + ", className='" + className + '\'' + + ", isBuiltin=" + + isBuiltin + ", jarName='" + jarName + '\'' diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java index 4417141106..9a47fcfb89 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java @@ -19,40 +19,78 @@ package org.apache.iotdb.commons.pipe.plugin.meta; +import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; public abstract class PipePluginMetaKeeper { - protected final Map<String, PipePluginMeta> pipeNameToPipeMetaMap; + protected final Map<String, PipePluginMeta> pipePluginNameToMetaMap = new ConcurrentHashMap<>(); - public PipePluginMetaKeeper() { - pipeNameToPipeMetaMap = new ConcurrentHashMap<>(); + protected void loadBuiltInPlugins() { + for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) { + addPipePluginMeta( + builtinPipePlugin.getPipePluginName(), + new PipePluginMeta( + builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getClassName())); + } } public void addPipePluginMeta(String pluginName, PipePluginMeta pipePluginMeta) { - pipeNameToPipeMetaMap.put(pluginName.toUpperCase(), pipePluginMeta); + pipePluginNameToMetaMap.put(pluginName.toUpperCase(), pipePluginMeta); } public void removePipePluginMeta(String pluginName) { - pipeNameToPipeMetaMap.remove(pluginName.toUpperCase()); + pipePluginNameToMetaMap.remove(pluginName.toUpperCase()); } public PipePluginMeta getPipePluginMeta(String pluginName) { - return pipeNameToPipeMetaMap.get(pluginName.toUpperCase()); + return pipePluginNameToMetaMap.get(pluginName.toUpperCase()); } public PipePluginMeta[] getAllPipePluginMeta() { - return pipeNameToPipeMetaMap.values().toArray(new PipePluginMeta[0]); + return pipePluginNameToMetaMap.values().toArray(new PipePluginMeta[0]); } public boolean containsPipePlugin(String pluginName) { - return pipeNameToPipeMetaMap.containsKey(pluginName.toUpperCase()); + return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase()); + } + + protected void processTakeSnapshot(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write( + (int) + pipePluginNameToMetaMap.values().stream() + .filter(pipePluginMeta -> !pipePluginMeta.isBuiltin()) + .count(), + outputStream); + + for (PipePluginMeta pipePluginMeta : pipePluginNameToMetaMap.values()) { + if (pipePluginMeta.isBuiltin()) { + continue; + } + ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream); + } } - public void clear() { - pipeNameToPipeMetaMap.clear(); + protected void processLoadSnapshot(InputStream inputStream) throws IOException { + pipePluginNameToMetaMap.forEach( + (pluginName, pluginMeta) -> { + if (!pluginMeta.isBuiltin()) { + pipePluginNameToMetaMap.remove(pluginName); + } + }); + + final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream); + for (int i = 0; i < pipePluginMetaSize; i++) { + final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream); + addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta); + } } @Override @@ -64,11 +102,11 @@ public abstract class PipePluginMetaKeeper { return false; } PipePluginMetaKeeper that = (PipePluginMetaKeeper) o; - return pipeNameToPipeMetaMap.equals(that.pipeNameToPipeMetaMap); + return pipePluginNameToMetaMap.equals(that.pipePluginNameToMetaMap); } @Override public int hashCode() { - return Objects.hash(pipeNameToPipeMetaMap); + return Objects.hash(pipePluginNameToMetaMap); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 2dfbdef2c8..9f98d4979e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -172,6 +172,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement; import org.apache.iotdb.db.trigger.service.TriggerClassLoader; +import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; @@ -759,7 +760,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) { // ensure that jar file contains the class and the class is a pipe plugin Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader); - clazz.getDeclaredConstructor().newInstance(); + PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance(); } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java index 1c50fe5b40..a38d86f974 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.pipe.agent; -import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper; import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent; import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent; import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent; @@ -33,9 +32,7 @@ public class PipeAgent { /** Private constructor to prevent users from creating a new instance. */ private PipeAgent() { - final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper(); - - pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper); + pipePluginAgent = PipePluginAgent.setupAndGetInstance(); pipeTaskAgent = PipeTaskAgent.setupAndGetInstance(); pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index d4c8359bcc..1f5f70bd3d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -58,6 +58,10 @@ public class PipePluginAgent { public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile) throws Exception { acquireLock(); try { + // try to deregister first to avoid inconsistent state + deregister(pipePluginMeta.getPluginName(), false); + + // register process from here checkIfRegistered(pipePluginMeta); saveJarFileIfNeeded(pipePluginMeta.getJarName(), jarFile); doRegister(pipePluginMeta); @@ -73,6 +77,15 @@ public class PipePluginAgent { return; } + if (information.isBuiltin()) { + String errorMessage = + String.format( + "Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name.", + pluginName); + LOGGER.warn(errorMessage); + throw new PipeManagementException(errorMessage); + } + if (PipePluginExecutableManager.getInstance() .hasFileUnderInstallDir(pipePluginMeta.getJarName()) && !PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) { @@ -84,6 +97,9 @@ public class PipePluginAgent { LOGGER.warn(errMsg); throw new PipeManagementException(errMsg); } + + // if the pipe plugin is already registered and the jar file is the same, do nothing + // we allow users to register the same pipe plugin multiple times without any error } private void saveJarFileIfNeeded(String jarName, ByteBuffer byteBuffer) throws IOException { @@ -111,7 +127,7 @@ public class PipePluginAgent { final Class<?> pluginClass = Class.forName(className, true, currentActiveClassLoader); // ensure that it is a PipePlugin class - PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance(); + final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance(); pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta); pipePluginMetaKeeper.addPluginAndClass(pluginName, pluginClass); @@ -141,15 +157,21 @@ public class PipePluginAgent { public void deregister(String pluginName, boolean needToDeleteJar) throws Exception { acquireLock(); try { - PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName); - if (information == null) { - return; + final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName); + + if (information != null && information.isBuiltin()) { + String errorMessage = + String.format("Failed to deregister builtin PipePlugin %s.", pluginName); + LOGGER.warn(errorMessage); + throw new PipeManagementException(errorMessage); } + // remove anyway pipePluginMetaKeeper.removePipePluginMeta(pluginName); pipePluginMetaKeeper.removePluginClass(pluginName); - if (needToDeleteJar) { + // if it is needed to delete jar file of the pipe plugin, delete both jar file and md5 + if (information != null && needToDeleteJar) { PipePluginExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName()); PipePluginExecutableManager.getInstance() .removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt"); @@ -188,18 +210,17 @@ public class PipePluginAgent { ///////////////////////// Singleton Instance Holder ///////////////////////// - private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) { - this.pipePluginMetaKeeper = pipePluginMetaKeeper; + private PipePluginAgent() { + this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper(); } private static class PipePluginAgentServiceHolder { private static PipePluginAgent instance = null; } - public static PipePluginAgent setupAndGetInstance( - DataNodePipePluginMetaKeeper pipePluginMetaKeeper) { + public static PipePluginAgent setupAndGetInstance() { if (PipePluginAgentServiceHolder.instance == null) { - PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper); + PipePluginAgentServiceHolder.instance = new PipePluginAgent(); } return PipePluginAgentServiceHolder.instance; }
