This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 50d1c050563 Pipe: Refactor some features for user access (#12686)
50d1c050563 is described below
commit 50d1c0505630bb8345690a94015a3ebdaaf08ccd
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 7 18:22:36 2024 +0800
Pipe: Refactor some features for user access (#12686)
---
.../persistence/pipe/PipePluginInfo.java | 39 +++++++++++--------
.../config/executor/ClusterConfigTaskExecutor.java | 45 +++++++++++-----------
.../resources/conf/iotdb-system.properties | 2 +-
.../iotdb/commons/conf/CommonDescriptor.java | 5 +--
.../pipe/connector/protocol/IoTDBConnector.java | 28 +++++++++-----
5 files changed, 66 insertions(+), 53 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 382ebd51620..d3c46c2957d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -91,7 +91,8 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Validator ///////////////////////////////
- public void validateBeforeCreatingPipePlugin(String pluginName, String
jarName, String jarMD5) {
+ public void validateBeforeCreatingPipePlugin(
+ final String pluginName, final String jarName, final String jarMD5) {
// both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
throw new PipeException(
@@ -108,9 +109,13 @@ public class PipePluginInfo implements SnapshotProcessor {
}
}
- public void validateBeforeDroppingPipePlugin(String pluginName) {
- if (pipePluginMetaKeeper.containsPipePlugin(pluginName)
- && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
+ public void validateBeforeDroppingPipePlugin(final String pluginName) {
+ if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+ throw new PipeException(
+ String.format(
+ "Failed to drop PipePlugin [%s], this PipePlugin has not been
created", pluginName));
+ }
+ if (pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
throw new PipeException(
String.format(
"Failed to drop PipePlugin [%s], the PipePlugin is a built-in
PipePlugin",
@@ -118,14 +123,14 @@ public class PipePluginInfo implements SnapshotProcessor {
}
}
- public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(String jarName) {
+ public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(final String
jarName) {
return !pipePluginMetaKeeper.containsJar(jarName);
}
public void checkPipePluginExistence(
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes) {
+ final Map<String, String> extractorAttributes,
+ final Map<String, String> processorAttributes,
+ final Map<String, String> connectorAttributes) {
final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
final String extractorPluginName =
extractorParameters.getStringOrDefault(
@@ -170,7 +175,7 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Pipe Plugin Management
///////////////////////////////
- public TSStatus createPipePlugin(CreatePipePluginPlan createPipePluginPlan) {
+ public TSStatus createPipePlugin(final CreatePipePluginPlan
createPipePluginPlan) {
try {
final PipePluginMeta pipePluginMeta =
createPipePluginPlan.getPipePluginMeta();
@@ -188,7 +193,7 @@ public class PipePluginInfo implements SnapshotProcessor {
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (Exception e) {
+ } catch (final Exception e) {
final String errorMessage =
String.format(
"Failed to execute createPipePlugin(%s) on config nodes, because
of %s",
@@ -199,7 +204,7 @@ public class PipePluginInfo implements SnapshotProcessor {
}
}
- public TSStatus dropPipePlugin(DropPipePluginPlan dropPipePluginPlan) {
+ public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) {
final String pluginName = dropPipePluginPlan.getPluginName();
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
@@ -217,17 +222,17 @@ public class PipePluginInfo implements SnapshotProcessor {
Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta()));
}
- public JarResp getPipePluginJar(GetPipePluginJarPlan getPipePluginJarPlan) {
+ public JarResp getPipePluginJar(final GetPipePluginJarPlan
getPipePluginJarPlan) {
try {
final List<ByteBuffer> jarList = new ArrayList<>();
- for (String jarName : getPipePluginJarPlan.getJarNames()) {
+ for (final String jarName : getPipePluginJarPlan.getJarNames()) {
jarList.add(
ExecutableManager.transferToBytebuffer(
PipePluginExecutableManager.getInstance()
.getFileStringUnderInstallByName(jarName)));
}
return new JarResp(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Get PipePlugin_Jar failed", e);
return new JarResp(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
@@ -239,7 +244,7 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Snapshot Processor
///////////////////////////////
@Override
- public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ public boolean processTakeSnapshot(final File snapshotDir) throws
IOException {
acquirePipePluginInfoLock();
try {
final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -261,7 +266,7 @@ public class PipePluginInfo implements SnapshotProcessor {
}
@Override
- public void processLoadSnapshot(File snapshotDir) throws IOException {
+ public void processLoadSnapshot(final File snapshotDir) throws IOException {
acquirePipePluginInfoLock();
try {
final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -288,7 +293,7 @@ public class PipePluginInfo implements SnapshotProcessor {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index cb4d6f57fb6..1fedccdf8e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -469,7 +469,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
} catch (IOException | URISyntaxException e) {
LOGGER.warn(
- "Failed to get executable for UDF({}) using URI: {}, the cause
is: {}",
+ "Failed to get executable for UDF({}) using URI: {}.",
createFunctionStatement.getUdfName(),
createFunctionStatement.getUriString(),
e);
@@ -505,14 +505,14 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
| InvocationTargetException
| ClassCastException e) {
LOGGER.warn(
- "Failed to create function when try to create UDF({}) instance
first, the cause is: {}",
+ "Failed to create function when try to create UDF({}) instance
first.",
createFunctionStatement.getUdfName(),
e);
future.setException(
new IoTDBException(
"Failed to load class '"
+ createFunctionStatement.getClassName()
- + "', because it's not found in jar file: "
+ + "', because it's not found in jar file or is invalid: "
+ createFunctionStatement.getUriString(),
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()));
return future;
@@ -642,7 +642,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
} catch (IOException | URISyntaxException e) {
LOGGER.warn(
- "Failed to get executable for Trigger({}) using URI: {}, the
cause is: {}",
+ "Failed to get executable for Trigger({}) using URI: {}.",
createTriggerStatement.getTriggerName(),
createTriggerStatement.getUriString(),
e);
@@ -679,14 +679,14 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
| InvocationTargetException
| ClassCastException e) {
LOGGER.warn(
- "Failed to create trigger when try to create trigger({}) instance
first, the cause is: {}",
+ "Failed to create trigger when try to create trigger({}) instance
first.",
createTriggerStatement.getTriggerName(),
e);
future.setException(
new IoTDBException(
"Failed to load class '"
+ createTriggerStatement.getClassName()
- + "', because it's not found in jar file: "
+ + "', because it's not found in jar file or is invalid: "
+ createTriggerStatement.getUriString(),
TSStatusCode.TRIGGER_LOAD_CLASS_ERROR.getStatusCode()));
return future;
@@ -752,7 +752,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> createPipePlugin(
- CreatePipePluginStatement createPipePluginStatement) {
+ final CreatePipePluginStatement createPipePluginStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
final String pluginName = createPipePluginStatement.getPluginName();
final String className = createPipePluginStatement.getClassName();
@@ -768,13 +768,13 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try (final ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- String libRoot;
- ByteBuffer jarFile;
- String jarMd5;
+ final String libRoot;
+ final ByteBuffer jarFile;
+ final String jarMd5;
final String jarFileName = new File(uriString).getName();
try {
- URI uri = new URI(uriString);
+ final URI uri = new URI(uriString);
if (uri.getScheme() == null) {
future.setException(
new IoTDBException(
@@ -784,10 +784,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
if (!uri.getScheme().equals("file")) {
// Download executable
- ExecutableResource resource =
+ final ExecutableResource resource =
PipePluginExecutableManager.getInstance()
.request(Collections.singletonList(uriString));
- String jarFilePathUnderTempDir =
+ final String jarFilePathUnderTempDir =
PipePluginExecutableManager.getInstance()
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
+ File.separator
@@ -805,9 +805,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// Set md5 of the jar file
jarMd5 =
DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
}
- } catch (IOException | URISyntaxException e) {
+ } catch (final IOException | URISyntaxException e) {
LOGGER.warn(
- "Failed to get executable for PipePlugin({}) using URI: {}, the
cause is: {}",
+ "Failed to get executable for PipePlugin({}) using URI: {}.",
createPipePluginStatement.getPluginName(),
createPipePluginStatement.getUriString(),
e);
@@ -821,25 +821,26 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
// try to create instance, this request will fail if creation is not
successful
- try (PipePluginClassLoader classLoader = new
PipePluginClassLoader(libRoot)) {
+ try (final PipePluginClassLoader classLoader = new
PipePluginClassLoader(libRoot)) {
// ensure that jar file contains the class and the class is a pipe
plugin
- Class<?> clazz =
Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
- PipePlugin ignored = (PipePlugin)
clazz.getDeclaredConstructor().newInstance();
- } catch (ClassNotFoundException
+ final Class<?> clazz =
+ Class.forName(createPipePluginStatement.getClassName(), true,
classLoader);
+ final PipePlugin ignored = (PipePlugin)
clazz.getDeclaredConstructor().newInstance();
+ } catch (final ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException
| ClassCastException e) {
LOGGER.warn(
- "Failed to create function when try to create PipePlugin({})
instance first, the cause is: {}",
+ "Failed to create function when try to create PipePlugin({})
instance first.",
createPipePluginStatement.getPluginName(),
e);
future.setException(
new IoTDBException(
"Failed to load class '"
+ createPipePluginStatement.getClassName()
- + "', because it's not found in jar file: "
+ + "', because it's not found in jar file or is invalid: "
+ createPipePluginStatement.getUriString(),
TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
return future;
@@ -868,7 +869,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (ClientManagerException | TException | IOException e) {
+ } catch (final ClientManagerException | TException | IOException e) {
future.setException(e);
}
return future;
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
index 958765e523f..9e32106da52 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
@@ -1658,7 +1658,7 @@ data_replication_factor=1
# The total bytes that all pipe sinks can transfer per second.
# When given a value less than or equal to 0, it means no limit.
# default value is -1, which means no limit.
-# Datatype: int
+# Datatype: double
# pipe_all_sinks_rate_limit_bytes_per_second=-1
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3d42596e1fe..f72ea5e528d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -562,15 +562,14 @@ public class CommonDescriptor {
properties.getProperty(
"two_stage_aggregate_sender_end_points_cache_in_ms",
String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs()))));
+ }
+ private void loadSubscriptionProps(Properties properties) {
config.setSubscriptionCacheMemoryUsagePercentage(
Float.parseFloat(
properties.getProperty(
"subscription_cache_memory_usage_percentage",
String.valueOf(config.getSubscriptionCacheMemoryUsagePercentage()))));
- }
-
- private void loadSubscriptionProps(Properties properties) {
config.setSubscriptionSubtaskExecutorMaxThreadNum(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index ad8e328e45e..d81292e34d5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -117,7 +117,7 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected PipeReceiverStatusHandler receiverStatusHandler;
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
+ public void validate(final PipeParameterValidator validator) throws
Exception {
final PipeParameters parameters = validator.getParameters();
validator.validate(
@@ -232,7 +232,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
}
@Override
- public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
throws Exception {
nodeUrls.clear();
nodeUrls.addAll(parseNodeUrls(parameters));
@@ -276,7 +277,7 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
}
- protected LinkedHashSet<TEndPoint> parseNodeUrls(PipeParameters parameters)
+ protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters
parameters)
throws PipeParameterNotValidException {
final LinkedHashSet<TEndPoint> givenNodeUrls = new
LinkedHashSet<>(nodeUrls);
@@ -317,15 +318,22 @@ public abstract class IoTDBConnector implements
PipeConnector {
givenNodeUrls.addAll(
NodeUrlUtils.parseTEndPointUrls(
Arrays.asList(
-
parameters.getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
+ parameters
+ .getStringByKeys(CONNECTOR_IOTDB_NODE_URLS_KEY)
+ .replace(" ", "")
+ .split(","))));
}
if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
givenNodeUrls.addAll(
NodeUrlUtils.parseTEndPointUrls(
-
Arrays.asList(parameters.getStringByKeys(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+ Arrays.asList(
+ parameters
+ .getStringByKeys(SINK_IOTDB_NODE_URLS_KEY)
+ .replace(" ", "")
+ .split(","))));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(PARSE_URL_ERROR_FORMATTER, e.toString());
throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
}
@@ -335,8 +343,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
return givenNodeUrls;
}
- private void checkNodeUrls(Set<TEndPoint> nodeUrls) throws
PipeParameterNotValidException {
- for (TEndPoint nodeUrl : nodeUrls) {
+ private void checkNodeUrls(final Set<TEndPoint> nodeUrls) throws
PipeParameterNotValidException {
+ for (final TEndPoint nodeUrl : nodeUrls) {
if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) {
LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "host cannot be empty");
throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
@@ -354,13 +362,13 @@ public abstract class IoTDBConnector implements
PipeConnector {
PIPE_END_POINT_RATE_LIMITER_MAP.clear();
}
- protected TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws
IOException {
+ protected TPipeTransferReq compressIfNeeded(final TPipeTransferReq req)
throws IOException {
return isRpcCompressionEnabled
? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
: req;
}
- protected byte[] compressIfNeeded(byte[] reqInBytes) throws IOException {
+ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws
IOException {
return isRpcCompressionEnabled
? PipeTransferCompressedReq.toTPipeTransferReqBytes(reqInBytes,
compressors)
: reqInBytes;