This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch pipe-log in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e05b7d4cbf2703d0470a7696426537225d95dce Author: Caideyipi <[email protected]> AuthorDate: Mon May 18 11:19:11 2026 +0800 zh --- .../iotdb/confignode/i18n/ManagerMessages.java | 14 ++--- .../apache/iotdb/db/i18n/DataNodePipeMessages.java | 19 ++++++ .../apache/iotdb/db/i18n/DataNodePipeMessages.java | 73 ++++++++++++++-------- .../pipe/agent/plugin/PipeDataNodePluginAgent.java | 22 ++++--- .../common/tsfile/PipeTsFileInsertionEvent.java | 10 +-- .../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 3 +- .../PipeRealtimeDataRegionHybridSource.java | 4 +- .../realtime/PipeRealtimeDataRegionLogSource.java | 5 +- .../realtime/PipeRealtimeDataRegionSource.java | 8 +-- .../PipeRealtimeDataRegionTsFileSource.java | 5 +- .../apache/iotdb/commons/i18n/PipeMessages.java | 14 +++++ .../apache/iotdb/commons/i18n/PipeMessages.java | 14 +++++ .../pipe/datastructure/pattern/TreePattern.java | 36 +++++------ 13 files changed, 145 insertions(+), 82 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 836fe7dd60b..e7321e5376c 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -28,11 +28,11 @@ public final class ManagerMessages { public static final String AFTER_THIS_SUCCESSFUL_SYNC_IF_SUBSCRIPTIONINFO_IS_EMPTY_DURING_THIS = "After this successful sync, if SubscriptionInfo is empty during this sync and has not been modified afterwards, all subsequent syncs will be skipped"; public static final String ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA = - "Attempt to report pipe exception to a null PipeTaskMeta."; + "尝试向空的 PipeTaskMeta 上报 pipe 异常。"; public static final String AUTH_RUN_AUTH_PLAN = "Auth: run auth plan: {}"; public static final String CLUSTERID = "clusterID: {}"; public static final String COLLECTING_PIPE_HEARTBEAT_FROM_DATA_NODES = - "Collecting pipe heartbeat {} from data nodes"; + "正在从 data nodes 收集 pipe 心跳 {}"; public static final String CONNECTION_FROM_DATANODE_TO_DATANODE_IS_BROKEN = "Connection from DataNode {} to DataNode {} is broken"; public static final String CONSENSUSGROUPSTATISTICS = "[ConsensusGroupStatistics]\t {}: {} -> {}"; @@ -128,7 +128,7 @@ public final class ManagerMessages { public static final String FAILED_TO_CREATE_PEER_FOR_CONSENSUS_GROUP = "Failed to create peer for consensus group"; public static final String FAILED_TO_CREATE_PIPE_RESULT_STATUS = - "Failed to create pipe {}. Result status: {}."; + "创建 pipe {} 失败。结果状态:{}。"; public static final String FAILED_TO_CREATE_SUBTASK_FOR_PIPE_CREATION_TIME = "Failed to create subtask for pipe %s, creation time %d"; public static final String FAILED_TO_CREATE_TOPIC_WITH_ATTRIBUTES_RESULT_STATUS = @@ -143,7 +143,7 @@ public final class ManagerMessages { public static final String FAILED_TO_DEREGISTER_PIPE_TEMPORARY_META_METRICS_PIPETEMPORARYMETA_DOES_NOT = "Failed to deregister pipe temporary meta metrics, PipeTemporaryMeta({}) does not exist"; public static final String FAILED_TO_DROP_PIPE_RESULT_STATUS = - "Failed to drop pipe {}. Result status: {}."; + "删除 pipe {} 失败。结果状态:{}。"; public static final String FAILED_TO_GET_ALL_PIPE_INFO = "Failed to get all pipe info."; public static final String FAILED_TO_GET_ALL_SUBSCRIPTION_INFO = "Failed to get all subscription info."; @@ -162,9 +162,9 @@ public final class ManagerMessages { public static final String FAILED_TO_SHOW_SUBSCRIPTION_INFO = "Failed to show subscription info."; public static final String FAILED_TO_SHOW_TOPIC_INFO = "Failed to show topic info."; public static final String FAILED_TO_START_PIPE_RESULT_STATUS = - "Failed to start pipe {}. Result status: {}."; + "启动 pipe {} 失败。结果状态:{}。"; public static final String FAILED_TO_STOP_PIPE_RESULT_STATUS = - "Failed to stop pipe {}. Result status: {}."; + "停止 pipe {} 失败。结果状态:{}。"; public static final String FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_CREATION_FOR = "Failed to submit async consensus pipe creation for {}: {}"; public static final String FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_DROP_FOR = @@ -172,7 +172,7 @@ public final class ManagerMessages { public static final String FAILED_TO_SYNC_CONSUMER_GROUP_META_RESULT_STATUS = "Failed to sync consumer group meta. Result status: {}."; public static final String FAILED_TO_SYNC_PIPE_META_RESULT_STATUS = - "Failed to sync pipe meta. Result status: {}."; + "同步 pipe 元数据失败。结果状态:{}。"; public static final String FAILED_TO_SYNC_TEMPLATE_EXTENSION_INFO_TO_DATANODE = "Failed to sync template {} extension info to DataNode {}"; public static final String FAILED_TO_SYNC_TOPIC_META_RESULT_STATUS = diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 2edd14936a9..73f923ea4ef 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -142,6 +142,16 @@ public final class DataNodePipeMessages { "Failed to persist progress index to configNode, status: {}"; public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS = "Failure when register pipe plugin {}. Skip this plugin and continue startup."; + public static final String + FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN = + "Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name."; + public static final String + FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED = + "Failed to register PipePlugin %s(%s), because its instance can not be constructed successfully. Exception: %s"; + public static final String FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH = + "Failed to register PipePlugin %s, because existed md5 of jar file for pipe plugin %s is different from the new jar file."; + public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN = + "Failed to deregister builtin PipePlugin %s."; public static final String PIPECONNECTOR = "PipeConnector: "; public static final String PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION = "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}"; @@ -439,8 +449,15 @@ public final class DataNodePipeMessages { public static final String FAILED_TO_START_SOURCES = "failed to start sources."; public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE = "Heartbeat Event {} can not be supplied because the reference count can not be increased"; + public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST = + "Event %s can not be supplied because the reference count can not be increased, the data represented by this event is lost"; public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP = "Interrupted waiting for processor to stop"; + public static final String INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE = + "Interrupted when waiting for parsing privilege for TsFile %s."; + public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR = + "Parse TsFile %s when checking privilege error. Because: %s"; + public static final String READ_TSFILE_ERROR = "Read TsFile %s error."; public static final String IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER = "IoTDBSchemaRegionSource does not support transferring events under simple consensus"; public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT = @@ -826,6 +843,8 @@ public final class DataNodePipeMessages { public static final String REDIRECT_FILE_POSITION_TO = "Redirect file position to {}."; public static final String REDIRECT_TO_POSITION_IN_TRANSFERRING_TSFILE = "Redirect to position {} in transferring tsFile {}."; + public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS = + "Network failed to receive tsFile %s, status: %s"; public static final String SECURITY_DIR = "security dir: {}"; public static final String SECURITY_PKI_DIR = "security pki dir: {}"; public static final String SUCCESSFULLY_ADDED_ITEM = "Successfully added item {}."; diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index ebd58bc0c62..4a0ba344d73 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -84,7 +84,7 @@ public final class DataNodePipeMessages { // ===================== AGENT ===================== public static final String ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A = - "Attempt to report pipe exception to a null PipeTaskMeta."; + "尝试向空的 PipeTaskMeta 上报 pipe 异常。"; public static final String CANNOT_PARSE_REBOOT_TIMES_FROM_FILE_SET = "无法解析 reboot times from file {}, set the current time in seconds ({}) as the reboot times"; public static final String CANNOT_RECORD_REBOOT_TIMES_TO_FILE_THE = @@ -92,11 +92,11 @@ public final class DataNodePipeMessages { public static final String CANNOT_START_SIMPLEPROGRESSINDEXASSIGNER_BECAUSE_OF = "无法启动 SimpleProgressIndexAssigner because of {}"; public static final String CREATE_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS = - "创建 pipe DN task {} successfully within {} ms"; + "创建 pipe DN task {} 成功,耗时 {} ms"; public static final String DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT = - "Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}"; + "注销子任务 {}。runningTaskCount: {}, registeredTaskCount: {}"; public static final String DROP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS = - "Drop pipe DN task {} successfully within {} ms"; + "删除 pipe DN task {} 成功,耗时 {} ms"; public static final String ERROR_OCCURRED_WHEN_COLLECTING_EVENTS_FROM_PROCESSOR = "collecting events from processor 时发生错误"; public static final String EXCEPTION_IN_PIPE_EVENT_PROCESSING_IGNORED_BECAUSE = @@ -132,9 +132,19 @@ public final class DataNodePipeMessages { "获取 pipe task meta from config node. Ignore the exception 失败,原因:config node may not be " + "ready yet, and meta will be pushed by config node later."; public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE = - "persist progress index to configNode, status: {} 失败"; + "持久化 progress index 到 configNode 失败,状态:{}"; public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS = - "Failure when register pipe plugin {}. Skip this plugin and continue startup."; + "注册 pipe plugin {} 失败。将跳过该插件并继续启动。"; + public static final String + FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN = + "注册 PipePlugin %s 失败,因为给定的 PipePlugin 名称与内置 PipePlugin 名称重复。"; + public static final String + FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED = + "注册 PipePlugin %s(%s) 失败,因为其实例无法成功构造。异常:%s"; + public static final String FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH = + "注册 PipePlugin %s 失败,因为 pipe plugin %s 已存在的 jar 文件 MD5 与新的 jar 文件不同。"; + public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN = + "注销内置 PipePlugin %s 失败。"; public static final String PIPECONNECTOR = "PipeConnector: "; public static final String PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION = "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}"; @@ -157,51 +167,51 @@ public final class DataNodePipeMessages { "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in " + "sending tsfile by default to reserve disk and network IO for realtime sending."; public static final String PIPEEVENTCOLLECTOR_THE_EVENT_IS_ALREADY_RELEASED_SKIPPING = - "PipeEventCollector: The event {} is already released, skipping it."; + "PipeEventCollector:事件 {} 已被释放,跳过处理。"; public static final String PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS = "Pipe:connector subtask {} ({}) 已关闭 within {} ms"; - public static final String PIPE_META_NOT_FOUND = "Pipe meta not found: "; + public static final String PIPE_META_NOT_FOUND = "未找到 pipe 元数据:"; public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED = "Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} and " + "callbackExecutor {}."; public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T = - "Pipe skipping temporary TsFile which shouldn't be transferred: {}"; + "Pipe 跳过不应传输的临时 TsFile:{}"; public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING = - "Pulled pipe meta from config node: {}, recovering ..."; + "已从 config node 拉取 pipe 元数据:{},正在恢复 ..."; public static final String RECEIVED_PIPE_HEARTBEAT_REQUEST_FROM_CONFIG_NODE = - "Received pipe heartbeat request {} from config node."; + "收到来自 config node 的 pipe 心跳请求 {}。"; public static final String REGION_NO_TSFILEINSERTIONEVENTS_TO_REPLACE_FOR_SOURCE = "Region {}: No TsFileInsertionEvents to replace for source files {}"; public static final String REGION_REPLACED_TSFILEINSERTIONEVENTS_WITH = "Region {}: Replaced TsFileInsertionEvents {} with {}"; - public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount < 0"; - public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount <= 0"; + public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount 小于 0"; + public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount 小于等于 0"; public static final String REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT = - "Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}"; + "注册子任务 {}。runningTaskCount: {}, registeredTaskCount: {}"; public static final String REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE = - "Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}"; - public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount < 0"; - public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount <= 0"; + "向本地 PipeTaskMeta({}) 上报 PipeRuntimeException,异常信息:{}"; + public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount 小于 0"; + public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount 小于等于 0"; public static final String SIMPLEPROGRESSINDEXASSIGNER_STARTED_SUCCESSFULLY_ISSIMPLECONSENSUSENABLE_R = - "SimpleProgressIndexAssigner started successfully. isSimpleConsensusEnable: {}, " + "SimpleProgressIndexAssigner 启动成功。isSimpleConsensusEnable: {}, " + "rebootTimes: {}"; public static final String STARTING_SIMPLEPROGRESSINDEXASSIGNER = - "Starting SimpleProgressIndexAssigner ..."; + "正在启动 SimpleProgressIndexAssigner ..."; public static final String START_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS = - "Start pipe DN task {} successfully within {} ms"; + "启动 pipe DN task {} 成功,耗时 {} ms"; public static final String START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT = - "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}"; + "启动子任务 {}。runningTaskCount: {}, registeredTaskCount: {}"; public static final String STOP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS = - "Stop pipe DN task {} successfully within {} ms"; + "停止 pipe DN task {} 成功,耗时 {} ms"; public static final String STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT = - "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}"; + "停止子任务 {}。runningTaskCount: {}, registeredTaskCount: {}"; public static final String SUBTASK_IS_CLOSED_IGNORE_EXCEPTION = "subtask {} 已关闭, ignore exception"; - public static final String SUBTASK_WORKER_IS_INTERRUPTED = "subtask worker is interrupted"; + public static final String SUBTASK_WORKER_IS_INTERRUPTED = "子任务工作线程被中断"; public static final String SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO = "成功 persisted all pipe's info to configNode。"; public static final String THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN = - "The executor {} and {} has been successfully shutdown."; + "执行器 {} 和 {} 已成功关闭。"; // ===================== EVENT ===================== @@ -422,9 +432,16 @@ public final class DataNodePipeMessages { "加载 snapshot from byteBuffer {} 失败。"; public static final String FAILED_TO_START_SOURCES = "启动 sources 失败。"; public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE = - "Heartbeat Event {} can not be supplied because the reference count can not be increased"; + "Heartbeat Event {} 无法被提供,因为其引用计数无法增加"; + public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST = + "Event %s 无法被提供,因为其引用计数无法增加,事件代表的数据已经丢失"; public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP = - "Interrupted waiting for processor to stop"; + "等待 processor 停止时被中断"; + public static final String INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE = + "等待解析 TsFile %s 的权限信息时被中断。"; + public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR = + "检查权限时解析 TsFile %s 失败。原因:%s"; + public static final String READ_TSFILE_ERROR = "读取 TsFile %s 失败。"; public static final String IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER = "IoTDBSchemaRegionSource 不支持 transferring events under simple consensus"; public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT = "没有权限 transfer event: "; @@ -829,6 +846,8 @@ public final class DataNodePipeMessages { + "Peeked event: {}, polled event: {}."; public static final String THE_FILE_IS_NOT_FOUND_MAY_ALREADY = "The file {} is not found, may already be deleted."; + public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS = + "网络接收 TsFile %s 失败,状态:%s"; public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT = "The pipe {} was dropped so the event ack {} will be ignored."; public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT_1 = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java index 70226b35dc7..3eb48b2c04b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java @@ -100,8 +100,8 @@ public class PipeDataNodePluginAgent { if (information.isBuiltin()) { String errorMessage = String.format( - "Failed to register PipePlugin %s, because " - + "the given PipePlugin name is the same as a built-in PipePlugin name.", + DataNodePipeMessages + .FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN, pluginName); LOGGER.warn(errorMessage); throw new PipeException(errorMessage); @@ -113,10 +113,9 @@ public class PipeDataNodePluginAgent { && !PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) { String errMsg = String.format( - "Failed to register PipePlugin %s, because " - + "existed md5 of jar file for pipe plugin %s " - + "is different from the new jar file.", - pluginName, pluginName); + DataNodePipeMessages.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH, + pluginName, + pluginName); LOGGER.warn(errMsg); throw new PipeException(errMsg); } @@ -170,9 +169,11 @@ public class PipeDataNodePluginAgent { | ClassCastException e) { String errorMessage = String.format( - "Failed to register PipePlugin %s(%s), because " - + "its instance can not be constructed successfully. Exception: %s", - pluginName.toUpperCase(), className, e); + DataNodePipeMessages + .FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED, + pluginName.toUpperCase(), + className, + e); LOGGER.warn(errorMessage, e); throw new PipeException(errorMessage); } @@ -210,7 +211,8 @@ public class PipeDataNodePluginAgent { if (information != null && information.isBuiltin()) { String errorMessage = - String.format("Failed to deregister builtin PipePlugin %s.", pluginName); + String.format( + DataNodePipeMessages.FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN, pluginName); LOGGER.warn(errorMessage); throw new PipeException(errorMessage); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index cce3ac29b37..6d4c3580bd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -610,11 +610,12 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent final String errorMsg = e instanceof InterruptedException ? String.format( - "Interrupted when waiting for parsing privilege for TsFile %s.", + DataNodePipeMessages.INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE, resource.getTsFilePath()) : String.format( - "Parse TsFile %s when checking privilege error. Because: %s", - resource.getTsFilePath(), e.getMessage()); + DataNodePipeMessages.PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR, + resource.getTsFilePath(), + e.getMessage()); LOGGER.warn(errorMsg, e); throw new PipeException(errorMsg, e); } finally { @@ -861,7 +862,8 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent } catch (final Exception e) { close(); - final String errorMsg = String.format("Read TsFile %s error.", tsFile.getPath()); + final String errorMsg = + String.format(DataNodePipeMessages.READ_TSFILE_ERROR, tsFile.getPath()); LOGGER.warn(errorMsg, e); throw new PipeException(errorMsg, e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index 829c9aed6b9..c0a973337c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -415,7 +415,8 @@ public class IoTDBLegacyPipeSink implements PipeConnector { DataNodePipeMessages.REDIRECT_TO_POSITION_IN_TRANSFERRING_TSFILE, position, file); } else if (status.code == TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) { final String errorMsg = - String.format("Network failed to receive tsFile %s, status: %s", file, status); + String.format( + DataNodePipeMessages.NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS, file, status); LOGGER.warn(errorMsg); throw new PipeConnectionException(errorMsg); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java index 9af44d54578..c219acbc697 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java @@ -279,9 +279,7 @@ public class PipeRealtimeDataRegionHybridSource extends PipeRealtimeDataRegionSo // event and report the exception to PipeRuntimeAgent. final String errorMessage = String.format( - "TsFile Event %s can not be supplied because " - + "the reference count can not be increased, " - + "the data represented by this event is lost", + DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST, event.getEvent()); LOGGER.error(errorMessage); PipeDataNodeAgent.runtime() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java index 3d9c81bcb0c..2835da02d83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; +import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; @@ -111,9 +112,7 @@ public class PipeRealtimeDataRegionLogSource extends PipeRealtimeDataRegionSourc // and report the exception to PipeRuntimeAgent. final String errorMessage = String.format( - "Event %s can not be supplied because " - + "the reference count can not be increased, " - + "the data represented by this event is lost", + DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST, realtimeEvent.getEvent()); LOGGER.error(errorMessage); PipeDataNodeAgent.runtime() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 772154ba07f..b13b2040016 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -504,9 +504,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { } else { // This would not happen, but just in case. LOGGER.error( - "Heartbeat Event {} can not be supplied because " - + "the reference count can not be increased", - event.getEvent()); + DataNodePipeMessages.HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE, event.getEvent()); // Do not report exception since the PipeHeartbeatEvent doesn't affect // the correction of pipe progress. @@ -524,9 +522,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { // event and report the exception to PipeRuntimeAgent. final String errorMessage = String.format( - "Event %s can not be supplied because " - + "the reference count can not be increased, " - + "the data represented by this event is lost", + DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST, event.getEvent()); LOGGER.error(errorMessage); PipeDataNodeAgent.runtime() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java index d70d93db548..73bef31d85b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; +import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; @@ -96,9 +97,7 @@ public class PipeRealtimeDataRegionTsFileSource extends PipeRealtimeDataRegionSo // and report the exception to PipeRuntimeAgent. final String errorMessage = String.format( - "Event %s can not be supplied because " - + "the reference count can not be increased, " - + "the data represented by this event is lost", + DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST, realtimeEvent.getEvent()); LOGGER.error(errorMessage); PipeDataNodeAgent.runtime() diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java index 482717e8b04..f0086f26898 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java @@ -395,6 +395,20 @@ public final class PipeMessages { + "because of {}. Will retry forever."; public static final String RETRY_EXECUTING_SUBTASK_FOREVER = "Retry executing subtask {} (creation time: {}, simple class: {}), retry count {}, last exception: {}"; + public static final String PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH = + "Pipe: %s cannot be used together with %s or %s."; + public static final String PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION = + "Pipe: %s cannot be used together with %s."; + public static final String PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER = + "Pipe: %s and %s cannot be used together."; + public static final String PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN = + "Pipe: The parameter %s only supports a single pattern now."; + public static final String FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK = + "Pipe: Failed to perform pattern coverage check for inclusion [{}] and exclusion [{}]."; + public static final String EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN = + "Pipe: The provided exclusion pattern fully covers the inclusion pattern. This pipe pattern will match nothing. Inclusion: [%s], Exclusion: [%s]"; + public static final String EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS = + "Pipe: The provided exclusion pattern covers {} out of {} inclusion paths. These paths will be excluded. Inclusion: [{}], Exclusion: [{}]"; // ===================== PipeAbstractSinkSubtask ===================== diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java index eec9a123170..d081a8fd4cf 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java @@ -379,6 +379,20 @@ public final class PipeMessages { "执行子任务 {}(创建时间:{},类名:{})失败,原因:{}。将无限重试。"; public static final String RETRY_EXECUTING_SUBTASK_FOREVER = "重试执行子任务 {}(创建时间:{},类名:{}),重试次数 {},上次异常:{}"; + public static final String PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH = + "Pipe:%s 不能与 %s 或 %s 同时使用。"; + public static final String PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION = + "Pipe:%s 不能与 %s 同时使用。"; + public static final String PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER = + "Pipe:%s 和 %s 不能同时使用。"; + public static final String PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN = + "Pipe:参数 %s 当前只支持单个 pattern。"; + public static final String FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK = + "Pipe:对 inclusion [{}] 和 exclusion [{}] 执行 pattern 覆盖检查失败。"; + public static final String EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN = + "Pipe:给定 exclusion pattern 完全覆盖了 inclusion pattern。该 pipe pattern 不会匹配任何内容。Inclusion: [%s], Exclusion: [%s]"; + public static final String EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS = + "Pipe:给定 exclusion pattern 覆盖了 {} / {} 条 inclusion 路径。这些路径将被排除。Inclusion: [{}], Exclusion: [{}]"; // ===================== PipeAbstractSinkSubtask ===================== diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index 6631f60a65c..48c4c64abaa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.datastructure.pattern; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.i18n.PipeMessages; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternUtil; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; @@ -180,8 +181,10 @@ public abstract class TreePattern { if (hasPatternInclusionKey && (hasLegacyPathKey || hasLegacyPatternKey)) { final String msg = String.format( - "Pipe: %s cannot be used together with %s or %s.", - SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_KEY, SOURCE_PATH_KEY); + PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH, + SOURCE_PATTERN_INCLUSION_KEY, + SOURCE_PATTERN_KEY, + SOURCE_PATH_KEY); LOGGER.warn(msg); throw new PipeException(msg); } @@ -219,8 +222,9 @@ public abstract class TreePattern { EXTRACTOR_PATH_EXCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY)) { final String msg = String.format( - "Pipe: %s cannot be used together with %s.", - SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY); + PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION, + SOURCE_PATTERN_INCLUSION_KEY, + SOURCE_PATH_EXCLUSION_KEY); LOGGER.warn(msg); throw new PipeException(msg); } @@ -256,9 +260,7 @@ public abstract class TreePattern { if (inclusionPatterns.isEmpty()) { final String msg = String.format( - "Pipe: The provided exclusion pattern fully covers the inclusion pattern. " - + "This pipe pattern will match nothing. " - + "Inclusion: %s, Exclusion: %s", + PipeMessages.EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN, sourceParameters.getStringByKeys( EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY, @@ -399,7 +401,8 @@ public abstract class TreePattern { if (path != null && pattern != null) { final String msg = - String.format("Pipe: %s and %s cannot be used together.", pathKeyName, patternKeyName); + String.format( + PipeMessages.PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER, pathKeyName, patternKeyName); LOGGER.warn(msg); throw new PipeException(msg); } @@ -439,7 +442,7 @@ public abstract class TreePattern { if (!allowMultiple && patterns.size() > 1) { final String msg = - String.format("Pipe: The parameter %s only supports a single pattern now.", parameterKey); + String.format(PipeMessages.PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN, parameterKey); LOGGER.warn(msg); throw new PipeException(msg); } @@ -692,7 +695,7 @@ public abstract class TreePattern { if (!allowMultiple && patterns.size() > 1) { final String msg = - String.format("Pipe: The parameter %s only supports a single pattern now.", parameterKey); + String.format(PipeMessages.PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN, parameterKey); LOGGER.warn(msg); throw new PipeException(msg); } @@ -854,7 +857,7 @@ public abstract class TreePattern { } catch (final Exception e) { // This check is best-effort. Do not fail construction. LOGGER.warn( - "Pipe: Failed to perform pattern coverage check for inclusion [{}] and exclusion [{}].", + PipeMessages.FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK, inclusion.getPattern(), exclusion.getPattern(), e); @@ -866,18 +869,15 @@ public abstract class TreePattern { // All inclusion paths are covered by the exclusion final String msg = String.format( - "Pipe: The provided exclusion pattern fully covers the inclusion pattern. " - + "This pipe pattern will match nothing. " - + "Inclusion: [%s], Exclusion: [%s]", - inclusion.getPattern(), exclusion.getPattern()); + PipeMessages.EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN, + inclusion.getPattern(), + exclusion.getPattern()); LOGGER.warn(msg); throw new PipeException(msg); } else if (coveredCount > 0) { // Some inclusion paths are covered LOGGER.warn( - "Pipe: The provided exclusion pattern covers {} out of {} inclusion paths. " - + "These paths will be excluded. " - + "Inclusion: [{}], Exclusion: [{}]", + PipeMessages.EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS, coveredCount, inclusionPaths.size(), inclusion.getPattern(),
