This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 50d1c050563 Pipe: Refactor some features for user access (#12686)
50d1c050563 is described below

commit 50d1c0505630bb8345690a94015a3ebdaaf08ccd
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 7 18:22:36 2024 +0800

    Pipe: Refactor some features for user access (#12686)
---
 .../persistence/pipe/PipePluginInfo.java           | 39 +++++++++++--------
 .../config/executor/ClusterConfigTaskExecutor.java | 45 +++++++++++-----------
 .../resources/conf/iotdb-system.properties         |  2 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 +--
 .../pipe/connector/protocol/IoTDBConnector.java    | 28 +++++++++-----
 5 files changed, 66 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 382ebd51620..d3c46c2957d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -91,7 +91,8 @@ public class PipePluginInfo implements SnapshotProcessor {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public void validateBeforeCreatingPipePlugin(String pluginName, String 
jarName, String jarMD5) {
+  public void validateBeforeCreatingPipePlugin(
+      final String pluginName, final String jarName, final String jarMD5) {
     // both build-in and user defined pipe plugin should be unique
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
       throw new PipeException(
@@ -108,9 +109,13 @@ public class PipePluginInfo implements SnapshotProcessor {
     }
   }
 
-  public void validateBeforeDroppingPipePlugin(String pluginName) {
-    if (pipePluginMetaKeeper.containsPipePlugin(pluginName)
-        && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
+  public void validateBeforeDroppingPipePlugin(final String pluginName) {
+    if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+      throw new PipeException(
+          String.format(
+              "Failed to drop PipePlugin [%s], this PipePlugin has not been 
created", pluginName));
+    }
+    if (pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
       throw new PipeException(
           String.format(
               "Failed to drop PipePlugin [%s], the PipePlugin is a built-in 
PipePlugin",
@@ -118,14 +123,14 @@ public class PipePluginInfo implements SnapshotProcessor {
     }
   }
 
-  public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(String jarName) {
+  public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(final String 
jarName) {
     return !pipePluginMetaKeeper.containsJar(jarName);
   }
 
   public void checkPipePluginExistence(
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes) {
+      final Map<String, String> extractorAttributes,
+      final Map<String, String> processorAttributes,
+      final Map<String, String> connectorAttributes) {
     final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
     final String extractorPluginName =
         extractorParameters.getStringOrDefault(
@@ -170,7 +175,7 @@ public class PipePluginInfo implements SnapshotProcessor {
 
   /////////////////////////////// Pipe Plugin Management 
///////////////////////////////
 
-  public TSStatus createPipePlugin(CreatePipePluginPlan createPipePluginPlan) {
+  public TSStatus createPipePlugin(final CreatePipePluginPlan 
createPipePluginPlan) {
     try {
       final PipePluginMeta pipePluginMeta = 
createPipePluginPlan.getPipePluginMeta();
 
@@ -188,7 +193,7 @@ public class PipePluginInfo implements SnapshotProcessor {
       }
 
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       final String errorMessage =
           String.format(
               "Failed to execute createPipePlugin(%s) on config nodes, because 
of %s",
@@ -199,7 +204,7 @@ public class PipePluginInfo implements SnapshotProcessor {
     }
   }
 
-  public TSStatus dropPipePlugin(DropPipePluginPlan dropPipePluginPlan) {
+  public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) {
     final String pluginName = dropPipePluginPlan.getPluginName();
 
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
@@ -217,17 +222,17 @@ public class PipePluginInfo implements SnapshotProcessor {
         Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta()));
   }
 
-  public JarResp getPipePluginJar(GetPipePluginJarPlan getPipePluginJarPlan) {
+  public JarResp getPipePluginJar(final GetPipePluginJarPlan 
getPipePluginJarPlan) {
     try {
       final List<ByteBuffer> jarList = new ArrayList<>();
-      for (String jarName : getPipePluginJarPlan.getJarNames()) {
+      for (final String jarName : getPipePluginJarPlan.getJarNames()) {
         jarList.add(
             ExecutableManager.transferToBytebuffer(
                 PipePluginExecutableManager.getInstance()
                     .getFileStringUnderInstallByName(jarName)));
       }
       return new JarResp(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.error("Get PipePlugin_Jar failed", e);
       return new JarResp(
           new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
@@ -239,7 +244,7 @@ public class PipePluginInfo implements SnapshotProcessor {
   /////////////////////////////// Snapshot Processor 
///////////////////////////////
 
   @Override
-  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+  public boolean processTakeSnapshot(final File snapshotDir) throws 
IOException {
     acquirePipePluginInfoLock();
     try {
       final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -261,7 +266,7 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   @Override
-  public void processLoadSnapshot(File snapshotDir) throws IOException {
+  public void processLoadSnapshot(final File snapshotDir) throws IOException {
     acquirePipePluginInfoLock();
     try {
       final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -288,7 +293,7 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index cb4d6f57fb6..1fedccdf8e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -469,7 +469,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           }
         } catch (IOException | URISyntaxException e) {
           LOGGER.warn(
-              "Failed to get executable for UDF({}) using URI: {}, the cause 
is: {}",
+              "Failed to get executable for UDF({}) using URI: {}.",
               createFunctionStatement.getUdfName(),
               createFunctionStatement.getUriString(),
               e);
@@ -505,14 +505,14 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           | InvocationTargetException
           | ClassCastException e) {
         LOGGER.warn(
-            "Failed to create function when try to create UDF({}) instance 
first, the cause is: {}",
+            "Failed to create function when try to create UDF({}) instance 
first.",
             createFunctionStatement.getUdfName(),
             e);
         future.setException(
             new IoTDBException(
                 "Failed to load class '"
                     + createFunctionStatement.getClassName()
-                    + "', because it's not found in jar file: "
+                    + "', because it's not found in jar file or is invalid: "
                     + createFunctionStatement.getUriString(),
                 TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()));
         return future;
@@ -642,7 +642,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           }
         } catch (IOException | URISyntaxException e) {
           LOGGER.warn(
-              "Failed to get executable for Trigger({}) using URI: {}, the 
cause is: {}",
+              "Failed to get executable for Trigger({}) using URI: {}.",
               createTriggerStatement.getTriggerName(),
               createTriggerStatement.getUriString(),
               e);
@@ -679,14 +679,14 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           | InvocationTargetException
           | ClassCastException e) {
         LOGGER.warn(
-            "Failed to create trigger when try to create trigger({}) instance 
first, the cause is: {}",
+            "Failed to create trigger when try to create trigger({}) instance 
first.",
             createTriggerStatement.getTriggerName(),
             e);
         future.setException(
             new IoTDBException(
                 "Failed to load class '"
                     + createTriggerStatement.getClassName()
-                    + "', because it's not found in jar file: "
+                    + "', because it's not found in jar file or is invalid: "
                     + createTriggerStatement.getUriString(),
                 TSStatusCode.TRIGGER_LOAD_CLASS_ERROR.getStatusCode()));
         return future;
@@ -752,7 +752,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
   @Override
   public SettableFuture<ConfigTaskResult> createPipePlugin(
-      CreatePipePluginStatement createPipePluginStatement) {
+      final CreatePipePluginStatement createPipePluginStatement) {
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     final String pluginName = createPipePluginStatement.getPluginName();
     final String className = createPipePluginStatement.getClassName();
@@ -768,13 +768,13 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
     try (final ConfigNodeClient client =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      String libRoot;
-      ByteBuffer jarFile;
-      String jarMd5;
+      final String libRoot;
+      final ByteBuffer jarFile;
+      final String jarMd5;
 
       final String jarFileName = new File(uriString).getName();
       try {
-        URI uri = new URI(uriString);
+        final URI uri = new URI(uriString);
         if (uri.getScheme() == null) {
           future.setException(
               new IoTDBException(
@@ -784,10 +784,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         }
         if (!uri.getScheme().equals("file")) {
           // Download executable
-          ExecutableResource resource =
+          final ExecutableResource resource =
               PipePluginExecutableManager.getInstance()
                   .request(Collections.singletonList(uriString));
-          String jarFilePathUnderTempDir =
+          final String jarFilePathUnderTempDir =
               PipePluginExecutableManager.getInstance()
                       
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
                   + File.separator
@@ -805,9 +805,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           // Set md5 of the jar file
           jarMd5 = 
DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
         }
-      } catch (IOException | URISyntaxException e) {
+      } catch (final IOException | URISyntaxException e) {
         LOGGER.warn(
-            "Failed to get executable for PipePlugin({}) using URI: {}, the 
cause is: {}",
+            "Failed to get executable for PipePlugin({}) using URI: {}.",
             createPipePluginStatement.getPluginName(),
             createPipePluginStatement.getUriString(),
             e);
@@ -821,25 +821,26 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       }
 
       // try to create instance, this request will fail if creation is not 
successful
-      try (PipePluginClassLoader classLoader = new 
PipePluginClassLoader(libRoot)) {
+      try (final PipePluginClassLoader classLoader = new 
PipePluginClassLoader(libRoot)) {
         // ensure that jar file contains the class and the class is a pipe 
plugin
-        Class<?> clazz = 
Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
-        PipePlugin ignored = (PipePlugin) 
clazz.getDeclaredConstructor().newInstance();
-      } catch (ClassNotFoundException
+        final Class<?> clazz =
+            Class.forName(createPipePluginStatement.getClassName(), true, 
classLoader);
+        final PipePlugin ignored = (PipePlugin) 
clazz.getDeclaredConstructor().newInstance();
+      } catch (final ClassNotFoundException
           | NoSuchMethodException
           | InstantiationException
           | IllegalAccessException
           | InvocationTargetException
           | ClassCastException e) {
         LOGGER.warn(
-            "Failed to create function when try to create PipePlugin({}) 
instance first, the cause is: {}",
+            "Failed to create function when try to create PipePlugin({}) 
instance first.",
             createPipePluginStatement.getPluginName(),
             e);
         future.setException(
             new IoTDBException(
                 "Failed to load class '"
                     + createPipePluginStatement.getClassName()
-                    + "', because it's not found in jar file: "
+                    + "', because it's not found in jar file or is invalid: "
                     + createPipePluginStatement.getUriString(),
                 TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
         return future;
@@ -868,7 +869,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
-    } catch (ClientManagerException | TException | IOException e) {
+    } catch (final ClientManagerException | TException | IOException e) {
       future.setException(e);
     }
     return future;
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
index 958765e523f..9e32106da52 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
@@ -1658,7 +1658,7 @@ data_replication_factor=1
 # The total bytes that all pipe sinks can transfer per second.
 # When given a value less than or equal to 0, it means no limit.
 # default value is -1, which means no limit.
-# Datatype: int
+# Datatype: double
 # pipe_all_sinks_rate_limit_bytes_per_second=-1
 
 ####################
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3d42596e1fe..f72ea5e528d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -562,15 +562,14 @@ public class CommonDescriptor {
             properties.getProperty(
                 "two_stage_aggregate_sender_end_points_cache_in_ms",
                 
String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs()))));
+  }
 
+  private void loadSubscriptionProps(Properties properties) {
     config.setSubscriptionCacheMemoryUsagePercentage(
         Float.parseFloat(
             properties.getProperty(
                 "subscription_cache_memory_usage_percentage",
                 
String.valueOf(config.getSubscriptionCacheMemoryUsagePercentage()))));
-  }
-
-  private void loadSubscriptionProps(Properties properties) {
     config.setSubscriptionSubtaskExecutorMaxThreadNum(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index ad8e328e45e..d81292e34d5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -117,7 +117,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected PipeReceiverStatusHandler receiverStatusHandler;
 
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
     final PipeParameters parameters = validator.getParameters();
 
     validator.validate(
@@ -232,7 +232,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+  public void customize(
+      final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
     nodeUrls.clear();
     nodeUrls.addAll(parseNodeUrls(parameters));
@@ -276,7 +277,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                 CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
   }
 
-  protected LinkedHashSet<TEndPoint> parseNodeUrls(PipeParameters parameters)
+  protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters 
parameters)
       throws PipeParameterNotValidException {
     final LinkedHashSet<TEndPoint> givenNodeUrls = new 
LinkedHashSet<>(nodeUrls);
 
@@ -317,15 +318,22 @@ public abstract class IoTDBConnector implements 
PipeConnector {
         givenNodeUrls.addAll(
             NodeUrlUtils.parseTEndPointUrls(
                 Arrays.asList(
-                    
parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
+                    parameters
+                        .getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY)
+                        .replace(" ", "")
+                        .split(","))));
       }
 
       if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
         givenNodeUrls.addAll(
             NodeUrlUtils.parseTEndPointUrls(
-                
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+                Arrays.asList(
+                    parameters
+                        .getStringByKeys(SINK_IOTDB_NODE_URLS_KEY)
+                        .replace(" ", "")
+                        .split(","))));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(PARSE_URL_ERROR_FORMATTER, e.toString());
       throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
     }
@@ -335,8 +343,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     return givenNodeUrls;
   }
 
-  private void checkNodeUrls(Set<TEndPoint> nodeUrls) throws 
PipeParameterNotValidException {
-    for (TEndPoint nodeUrl : nodeUrls) {
+  private void checkNodeUrls(final Set<TEndPoint> nodeUrls) throws 
PipeParameterNotValidException {
+    for (final TEndPoint nodeUrl : nodeUrls) {
       if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) {
         LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "host cannot be empty");
         throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
@@ -354,13 +362,13 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     PIPE_END_POINT_RATE_LIMITER_MAP.clear();
   }
 
-  protected TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws 
IOException {
+  protected TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) 
throws IOException {
     return isRpcCompressionEnabled
         ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
         : req;
   }
 
-  protected byte[] compressIfNeeded(byte[] reqInBytes) throws IOException {
+  protected byte[] compressIfNeeded(final byte[] reqInBytes) throws 
IOException {
     return isRpcCompressionEnabled
         ? PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes, 
compressors)
         : reqInBytes;

Reply via email to