This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-sonar-streaming in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 29f395ae4d5fcab37d78f682550ad3aa74538d45 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jun 21 19:03:59 2023 +0800 Pipe: fix bugs & smells reported by SonarCloud (org.apache.iotdb.db.pipe) --- .../db/pipe/agent/plugin/PipePluginAgent.java | 9 +- .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 6 +- .../SimpleConsensusProgressIndexAssigner.java | 5 +- .../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 135 +++++++++++---------- .../pipe/collector/IoTDBDataRegionCollector.java | 15 ++- .../PipeHistoricalDataRegionCollector.java | 4 +- .../PipeHistoricalDataRegionTsFileCollector.java | 8 +- .../realtime/PipeRealtimeDataRegionCollector.java | 2 +- .../PipeRealtimeDataRegionFakeCollector.java | 21 +++- .../PipeRealtimeDataRegionHybridCollector.java | 27 +++-- .../PipeRealtimeDataRegionLogCollector.java | 7 +- .../PipeRealtimeDataRegionTsFileCollector.java | 7 +- .../realtime/epoch/TsFileEpochManager.java | 11 +- .../matcher/CachedSchemaPatternMatcher.java | 4 +- .../pipe/connector/legacy/IoTDBSyncConnector.java | 38 +++--- .../pipe/connector/legacy/IoTDBSyncReceiver.java | 37 ++---- .../pipe/connector/legacy/pipedata/PipeData.java | 2 +- .../connector/legacy/pipedata/TsFilePipeData.java | 3 +- .../pipe/connector/v1/IoTDBThriftConnectorV1.java | 29 ++--- .../pipe/connector/v1/IoTDBThriftReceiverV1.java | 25 ++-- .../db/pipe/connector/v1/PipeRequestType.java | 5 +- .../PipeTransferTabletInsertionEventHandler.java | 4 +- .../PipeTransferTsFileInsertionEventHandler.java | 2 +- .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 2 +- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 3 - .../common/tablet/PipeRawTabletInsertionEvent.java | 10 +- .../tablet/TabletInsertionDataContainer.java | 6 +- .../common/tsfile/PipeTsFileInsertionEvent.java | 4 +- .../tsfile/TsFileInsertionDataContainer.java | 7 +- .../executor/PipeSubtaskExecutorManager.java | 6 +- .../resource/file/PipeFileResourceManager.java | 22 ++-- .../db/pipe/resource/wal/PipeWALResource.java | 10 +- .../pipe/resource/wal/PipeWALResourceManager.java | 79 ++++-------- .../db/pipe/task/connection/EventSupplier.java | 1 + .../db/pipe/task/stage/PipeTaskCollectorStage.java | 3 +- .../db/pipe/task/stage/PipeTaskConnectorStage.java | 4 +- .../iotdb/db/pipe/task/stage/PipeTaskStage.java | 25 ++-- .../db/pipe/task/subtask/PipeConnectorSubtask.java | 42 +++++-- .../task/subtask/PipeConnectorSubtaskManager.java | 13 +- .../db/pipe/task/subtask/PipeProcessorSubtask.java | 1 - .../iotdb/db/pipe/task/subtask/PipeSubtask.java | 46 +++++-- 41 files changed, 360 insertions(+), 330 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index 9266bdf9a22..8211bbb50ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -137,7 +137,8 @@ public class PipePluginAgent { updateAllRegisteredClasses(currentActiveClassLoader); final Class<?> pluginClass = Class.forName(className, true, currentActiveClassLoader); - // ensure that it is a PipePlugin class + + @SuppressWarnings("unused") // ensure that it is a PipePlugin class final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance(); pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta); @@ -165,7 +166,7 @@ public class PipePluginAgent { } } - public void deregister(String pluginName, boolean needToDeleteJar) throws Exception { + public void deregister(String pluginName, boolean needToDeleteJar) throws PipeException { acquireLock(); try { final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName); @@ -225,7 +226,7 @@ public class PipePluginAgent { "Failed to reflect PipePlugin instance, because PipePlugin %s has not been registered.", pluginName.toUpperCase()); LOGGER.warn(errorMessage); - throw new RuntimeException(errorMessage); + throw new PipeException(errorMessage); } try { @@ -240,7 +241,7 @@ public class PipePluginAgent { "Failed to reflect PipePlugin %s(%s) instance, because %s", pluginName, information.getClassName(), e); LOGGER.warn(errorMessage, e); - throw new RuntimeException(errorMessage); + throw new PipeException(errorMessage); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 7d1a6336a16..afc899854d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -102,9 +102,9 @@ public class PipeRuntimeAgent implements IService { public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) { LOGGER.warn( - String.format( - "PipeRuntimeException: pipe task meta %s, exception %s", - pipeTaskMeta, pipeRuntimeException), + "Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}", + pipeTaskMeta, + pipeRuntimeException.getMessage(), pipeRuntimeException); pipeTaskMeta.trackExceptionMessage(pipeRuntimeException); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java index 7c81cd6cabd..b0a3eab9166 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicLong; import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; @@ -84,7 +85,7 @@ public class SimpleConsensusProgressIndexAssigner { return; } try { - String content = FileUtils.readFileToString(file, "UTF-8"); + String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8); rebootTimes = Integer.parseInt(content); } catch (IOException e) { LOGGER.error("Cannot parse reboot times from file {}", file.getAbsolutePath(), e); @@ -94,7 +95,7 @@ public class SimpleConsensusProgressIndexAssigner { private void recordRebootTimes() throws IOException { File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME); - FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), "UTF-8"); + FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), StandardCharsets.UTF_8); } public void assignIfNeeded(TsFileResource tsFileResource) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index e3fe56a52ec..bbbbd3a7d23 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -71,6 +71,9 @@ public class PipeTaskAgent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class); private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s"; + private static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: "; + private final PipeMetaKeeper pipeMetaKeeper; private final PipeTaskManager pipeTaskManager; @@ -238,8 +241,7 @@ public class PipeTaskAgent { } else { throw new IllegalStateException( String.format( - "Unknown pipe status %s for pipe %s", - statusOnDataNode, pipeStaticMeta.getPipeName())); + MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName())); } break; case STOPPED: @@ -248,8 +250,7 @@ public class PipeTaskAgent { } else { throw new IllegalStateException( String.format( - "Unknown pipe status %s for pipe %s", - statusOnDataNode, pipeStaticMeta.getPipeName())); + MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName())); } break; case DROPPED: @@ -259,8 +260,7 @@ public class PipeTaskAgent { default: throw new IllegalStateException( String.format( - "Unknown pipe status %s for pipe %s", - statusFromConfigNode, pipeStaticMeta.getPipeName())); + MESSAGE_UNKNOWN_PIPE_STATUS, statusFromConfigNode, pipeStaticMeta.getPipeName())); } } @@ -296,27 +296,33 @@ public class PipeTaskAgent { final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); if (existedPipeMeta != null) { if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) { - switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) { + final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get(); + switch (status) { case STOPPED: case RUNNING: - LOGGER.info( - "Pipe {} (creation time = {}) has already been created. Current status = {}. Skip creating.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has already been created. Current status = {}. Skip creating.", + pipeName, + creationTime, + status.name()); + } return false; case DROPPED: - LOGGER.info( - "Pipe {} (creation time = {}) has already been dropped, but the pipe task meta has not been cleaned up. " - + "Current status = {}. Try dropping the pipe and recreating it.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has already been dropped, but the pipe task meta has not been cleaned up. " + + "Current status = {}. Try dropping the pipe and recreating it.", + pipeName, + creationTime, + status.name()); + } // break to drop the pipe and recreate it break; default: throw new IllegalStateException( - "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + MESSAGE_UNEXPECTED_PIPE_STATUS + + existedPipeMeta.getRuntimeMeta().getStatus().get().name()); } } @@ -440,31 +446,39 @@ public class PipeTaskAgent { return; } - switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) { + final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get(); + switch (status) { case STOPPED: - LOGGER.info( - "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.", + pipeName, + creationTime, + status.name()); + } break; case RUNNING: - LOGGER.info( - "Pipe {} (creation time = {}) has already been started. Current status = {}. Skip starting.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has already been started. Current status = {}. Skip starting.", + pipeName, + creationTime, + status.name()); + } return; case DROPPED: - LOGGER.info( - "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip starting.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip starting.", + pipeName, + creationTime, + status.name()); + } return; default: throw new IllegalStateException( - "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + MESSAGE_UNEXPECTED_PIPE_STATUS + + existedPipeMeta.getRuntimeMeta().getStatus().get().name()); } // trigger start() method for each pipe task @@ -510,31 +524,37 @@ public class PipeTaskAgent { return; } - switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) { + final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get(); + switch (status) { case STOPPED: - LOGGER.info( - "Pipe {} (creation time = {}) has already been stopped. Current status = {}. Skip stopping.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has already been stopped. Current status = {}. Skip stopping.", + pipeName, + creationTime, + status.name()); + } return; case RUNNING: - LOGGER.info( - "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.", + pipeName, + creationTime, + status.name()); + } break; case DROPPED: - LOGGER.info( - "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip stopping.", - pipeName, - creationTime, - existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip stopping.", + pipeName, + creationTime, + status.name()); + } return; default: - throw new IllegalStateException( - "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name()); + throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + status.name()); } // trigger stop() method for each pipe task @@ -593,13 +613,6 @@ public class PipeTaskAgent { } } - private void stopPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) { - final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, dataRegionGroupId); - if (pipeTask != null) { - pipeTask.stop(); - } - } - ///////////////////////// Heartbeat ///////////////////////// public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java index d1653b30c59..cbe7fee083e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java @@ -90,14 +90,14 @@ public class IoTDBDataRegionCollector implements PipeCollector { COLLECTOR_REALTIME_MODE_LOG); } - constructHistoricalCollector(validator.getParameters()); + constructHistoricalCollector(); constructRealtimeCollector(validator.getParameters()); historicalCollector.validate(validator); realtimeCollector.validate(validator); } - private void constructHistoricalCollector(PipeParameters parameters) { + private void constructHistoricalCollector() { // enable historical collector by default historicalCollector = new PipeHistoricalDataRegionTsFileCollector(); } @@ -128,9 +128,8 @@ public class IoTDBDataRegionCollector implements PipeCollector { default: realtimeCollector = new PipeRealtimeDataRegionHybridCollector(); LOGGER.warn( - String.format( - "Unsupported collector realtime mode: %s, create a hybrid collector.", - parameters.getString(COLLECTOR_REALTIME_MODE))); + "Unsupported collector realtime mode: {}, create a hybrid collector.", + parameters.getString(COLLECTOR_REALTIME_MODE)); } } @@ -152,7 +151,7 @@ public class IoTDBDataRegionCollector implements PipeCollector { hasBeenStarted.set(true); final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null); - final DataRegionId dataRegionId = new DataRegionId(this.dataRegionId); + final DataRegionId dataRegionIdObject = new DataRegionId(this.dataRegionId); while (true) { // try to start collectors in the data region ... // first try to run if data region exists, then try to run if data region does not exist. @@ -160,7 +159,7 @@ public class IoTDBDataRegionCollector implements PipeCollector { // runIfPresent and runIfAbsent operations. in this case, we need to retry. if (StorageEngine.getInstance() .runIfPresent( - dataRegionId, + dataRegionIdObject, (dataRegion -> { dataRegion.writeLock( String.format( @@ -173,7 +172,7 @@ public class IoTDBDataRegionCollector implements PipeCollector { })) || StorageEngine.getInstance() .runIfAbsent( - dataRegionId, + dataRegionIdObject, () -> startHistoricalCollectorAndRealtimeCollector(exceptionHolder))) { rethrowExceptionIfAny(exceptionHolder); return; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java index beb96f8f26b..08d5684851e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.collector.historical; import org.apache.iotdb.pipe.api.PipeCollector; -public abstract class PipeHistoricalDataRegionCollector implements PipeCollector { +public interface PipeHistoricalDataRegionCollector extends PipeCollector { - public abstract boolean hasConsumedAll(); + boolean hasConsumedAll(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java index 7d4bc25f2dc..6ade5ca4ad8 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java @@ -50,7 +50,7 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COL import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY; -public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataRegionCollector { +public class PipeHistoricalDataRegionTsFileCollector implements PipeHistoricalDataRegionCollector { private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class); @@ -69,10 +69,10 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR private Queue<PipeTsFileInsertionEvent> pendingQueue; - public PipeHistoricalDataRegionTsFileCollector() {} - @Override - public void validate(PipeParameterValidator validator) {} + public void validate(PipeParameterValidator validator) { + // do nothing + } @Override public void customize( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java index ab9be42b062..4e6e98c168e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java @@ -35,7 +35,7 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector { protected String dataRegionId; protected PipeTaskMeta pipeTaskMeta; - public PipeRealtimeDataRegionCollector() {} + protected PipeRealtimeDataRegionCollector() {} @Override public void validate(PipeParameterValidator validator) throws Exception {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java index fbc28c4dbca..6051930e4e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java @@ -26,15 +26,22 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector { + @Override - public void validate(PipeParameterValidator validator) {} + public void validate(PipeParameterValidator validator) { + // do nothing + } @Override public void customize( - PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {} + PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) { + // do nothing + } @Override - public void start() {} + public void start() { + // do nothing + } @Override public Event supply() { @@ -42,7 +49,9 @@ public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionC } @Override - public void collect(PipeRealtimeCollectEvent event) {} + public void collect(PipeRealtimeCollectEvent event) { + // do nothing + } @Override public boolean isNeedListenToTsFile() { @@ -55,7 +64,9 @@ public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionC } @Override - public void close() {} + public void close() { + // do nothing + } @Override public String toString() { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java index 1551fc2a582..eeca9a82dad 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java @@ -80,15 +80,15 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio return; } - if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) { - if (!pendingQueue.offer(event)) { - LOGGER.warn( - String.format( - "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard tablet event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this))); - // this would not happen, but just in case. - // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity. - } + if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE) + && !pendingQueue.offer(event)) { + LOGGER.warn( + "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard tablet event {}, current state {}", + this, + event, + event.getTsFileEpoch().getState(this)); + // this would not happen, but just in case. + // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity. } } @@ -102,9 +102,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio if (!pendingQueue.offer(event)) { LOGGER.warn( - String.format( - "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard TsFile event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this))); + "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard TsFile event {}, current state {}", + this, + event, + event.getTsFileEpoch().getState(this)); // this would not happen, but just in case. // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. } @@ -163,7 +164,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio // this event is not reliable anymore. but the data represented by this event // has been carried by the following tsfile event, so we can just discard this event. event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); - LOGGER.warn(String.format("Increase reference count for event %s error.", event)); + LOGGER.warn("Increase reference count for event {} error.", event); return null; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java index 42da6a5d6fc..2d02d4fba78 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java @@ -53,9 +53,10 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo if (!pendingQueue.offer(event)) { LOGGER.warn( - String.format( - "collect: pending queue of PipeRealtimeDataRegionLogCollector %s has reached capacity, discard tablet event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this))); + "collect: pending queue of PipeRealtimeDataRegionLogCollector {} has reached capacity, discard tablet event {}, current state {}", + this, + event, + event.getTsFileEpoch().getState(this)); // this would not happen, but just in case. // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java index cd56caae708..f43582a5541 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java @@ -53,9 +53,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio if (!pendingQueue.offer(event)) { LOGGER.warn( - String.format( - "collect: pending queue of PipeRealtimeDataRegionTsFileCollector %s has reached capacity, discard TsFile event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this))); + "collect: pending queue of PipeRealtimeDataRegionTsFileCollector {} has reached capacity, discard TsFile event {}, current state {}", + this, + event, + event.getTsFileEpoch().getState(this)); // this would not happen, but just in case. // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java index 095bac51c2b..dbecc2f2a72 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java @@ -46,11 +46,12 @@ public class TsFileEpochManager { final String filePath = resource.getTsFilePath(); // this would not happen, but just in case - if (!filePath2Epoch.containsKey(filePath)) { - LOGGER.info( - String.format("Pipe: can not find TsFileEpoch for TsFile %s, creating it", filePath)); - filePath2Epoch.put(filePath, new TsFileEpoch(filePath)); - } + filePath2Epoch.computeIfAbsent( + filePath, + path -> { + LOGGER.info("TsFileEpoch not found for TsFile {}, creating a new one", path); + return new TsFileEpoch(path); + }); return new PipeRealtimeCollectEvent( event, diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java index cc3dd94a67c..14df36e9267 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java @@ -84,8 +84,6 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { } } - // TODO: maximum the efficiency of matching when pattern is root - // TODO: memory control @Override public Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event) { final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new HashSet<>(); @@ -105,7 +103,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { deviceToCollectorsCache.get(device, this::filterCollectorsByDevice); // this would not happen if (collectorsFilteredByDevice == null) { - LOGGER.warn(String.format("Match result NPE when handle device %s", device)); + LOGGER.warn("Match result NPE when handle device {}", device); continue; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java index ecb03c87a7f..48d23034dc5 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java @@ -86,7 +86,7 @@ public class IoTDBSyncConnector implements PipeConnector { private IoTDBThriftConnectorClient client; - private static SessionPool sessionPool; + private SessionPool sessionPool; @Override public void validate(PipeParameterValidator validator) throws Exception { @@ -143,8 +143,10 @@ public class IoTDBSyncConnector implements PipeConnector { throw new PipeRuntimeCriticalException(errorMsg); } } catch (TException e) { - LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, port), e); - throw new PipeConnectionException(e.getMessage(), e); + throw new PipeConnectionException( + String.format( + "Connect to receiver %s:%s error, because: %s", ipAddress, port, e.getMessage()), + e); } sessionPool = @@ -158,7 +160,9 @@ public class IoTDBSyncConnector implements PipeConnector { } @Override - public void heartbeat() throws Exception {} + public void heartbeat() throws Exception { + // do nothing + } @Override public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { @@ -172,12 +176,10 @@ public class IoTDBSyncConnector implements PipeConnector { "IoTDBSyncConnector only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent."); } } catch (TException e) { - LOGGER.warn( - "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e); - // the connection may be broken, try to reconnect by catching PipeConnectionException throw new PipeConnectionException( String.format( - "Network error when transfer tablet insertion event, because %s.", e.getMessage()), + "Network error when transfer tablet insertion event: %s, because %s.", + tabletInsertionEvent, e.getMessage()), e); } } @@ -193,7 +195,7 @@ public class IoTDBSyncConnector implements PipeConnector { } private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent) - throws PipeException, TException, IoTDBConnectionException, StatementExecutionException { + throws PipeException, IoTDBConnectionException, StatementExecutionException { final Tablet tablet = pipeTabletInsertionEvent.convertToTablet(); if (pipeTabletInsertionEvent.isAligned()) { sessionPool.insertAlignedTablet(tablet); @@ -212,10 +214,10 @@ public class IoTDBSyncConnector implements PipeConnector { try { doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent); } catch (TException e) { - LOGGER.warn( - "Network error when transfer tsFile insertion event: {}.", tsFileInsertionEvent, e); - // The connection may be broken, try to reconnect by catching PipeConnectionException - throw new PipeConnectionException("Network error when transfer tsFile insertion event.", e); + throw new PipeConnectionException( + String.format( + "Network error when transfer tsFile insertion event: %s.", tsFileInsertionEvent), + e); } } @@ -253,8 +255,7 @@ public class IoTDBSyncConnector implements PipeConnector { } else if (status.code == TSStatusCode.SYNC_FILE_REDIRECTION_ERROR.getStatusCode()) { position = Long.parseLong(status.message); randomAccessFile.seek(position); - LOGGER.info( - String.format("Redirect to position %s in transferring tsFile %s.", position, file)); + LOGGER.info("Redirect to position {} in transferring tsFile {}.", position, file); } else if (status.code == TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) { String errorMsg = String.format("Network failed to receive tsFile %s, status: %s", file, status); @@ -263,8 +264,11 @@ public class IoTDBSyncConnector implements PipeConnector { } } } catch (TException e) { - LOGGER.warn(String.format("Cannot send pipe data to receiver %s:%s.", ipAddress, port), e); - throw new PipeConnectionException(e.getMessage(), e); + throw new PipeConnectionException( + String.format( + "Cannot send pipe data to receiver %s:%s, because: %s.", + ipAddress, port, e.getMessage()), + e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java index c2b651c1e62..ff6f9faad3e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java @@ -105,12 +105,11 @@ public class IoTDBSyncReceiver { new File(getFileDataDir(identityInfo)).mkdirs(); } createConnection(identityInfo); - if (!StringUtils.isEmpty(identityInfo.getDatabase())) { - if (!registerDatabase(identityInfo.getDatabase(), partitionFetcher, schemaFetcher)) { - return RpcUtils.getStatus( - TSStatusCode.PIPESERVER_ERROR, - String.format("Auto register database %s error.", identityInfo.getDatabase())); - } + if (!StringUtils.isEmpty(identityInfo.getDatabase()) + && !registerDatabase(identityInfo.getDatabase(), partitionFetcher, schemaFetcher)) { + return RpcUtils.getStatus( + TSStatusCode.PIPESERVER_ERROR, + String.format("Auto register database %s error.", identityInfo.getDatabase())); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""); } @@ -245,7 +244,9 @@ public class IoTDBSyncReceiver { targetFile .getName() .substring(0, targetFile.getName().length() - PATCH_SUFFIX.length())); - targetFile.renameTo(newFile); + if (!targetFile.renameTo(newFile)) { + LOGGER.error("Fail to rename file {} to {}", targetFile, newFile); + } } } tsFilePipeData.setParentDirPath(dir.getAbsolutePath()); @@ -276,14 +277,9 @@ public class IoTDBSyncReceiver { File file = new File(fileDir, fileName + PATCH_SUFFIX); // step2. check startIndex - try { - IndexCheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex); - if (!result.isResult()) { - return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, result.getIndex()); - } - } catch (IOException e) { - LOGGER.error(e.getMessage()); - return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage()); + IndexCheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex); + if (!result.isResult()) { + return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, result.getIndex()); } // step3. append file @@ -294,14 +290,7 @@ public class IoTDBSyncReceiver { buff.get(byteArray); randomAccessFile.write(byteArray); recordStartIndex(new File(fileDir, fileName), startIndex + length); - LOGGER.debug( - "Sync " - + fileName - + " start at " - + startIndex - + " to " - + (startIndex + length) - + " is done."); + LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex, startIndex + length); } catch (IOException e) { LOGGER.error(e.getMessage()); return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage()); @@ -310,7 +299,7 @@ public class IoTDBSyncReceiver { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""); } - private IndexCheckResult checkStartIndexValid(File file, long startIndex) throws IOException { + private IndexCheckResult checkStartIndexValid(File file, long startIndex) { // get local index from memory map long localIndex = getCurrentFileStartIndex(file.getAbsolutePath()); // get local index from file diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java index 659c69876e9..52d880eb717 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java @@ -64,7 +64,7 @@ public abstract class PipeData { return byteStream.toByteArray(); } - public void deserialize(DataInputStream stream) throws IOException, IllegalPathException { + public void deserialize(DataInputStream stream) throws IOException { serialNumber = stream.readLong(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java index 803a5c67427..54f05eef81d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java @@ -19,7 +19,6 @@ */ package org.apache.iotdb.db.pipe.connector.legacy.pipedata; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader; import org.apache.iotdb.db.pipe.connector.legacy.loader.TsFileLoader; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -99,7 +98,7 @@ public class TsFilePipeData extends PipeData { } @Override - public void deserialize(DataInputStream stream) throws IOException, IllegalPathException { + public void deserialize(DataInputStream stream) throws IOException { super.deserialize(stream); parentDirPath = ReadWriteIOUtils.readString(stream); if (parentDirPath == null) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java index bb18acda62d..4160eaeb074 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java @@ -23,8 +23,6 @@ import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq; @@ -65,15 +63,12 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV1.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig(); private String ipAddress; private int port; private IoTDBThriftConnectorClient client; - public IoTDBThriftConnectorV1() {} - @Override public void validate(PipeParameterValidator validator) throws Exception { validator @@ -112,13 +107,17 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { throw new PipeException(String.format("Handshake error, result status %s.", resp.status)); } } catch (TException e) { - LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, port), e); - throw new PipeConnectionException(e.getMessage(), e); + throw new PipeConnectionException( + String.format( + "Connect to receiver %s:%s error, because: %s", ipAddress, port, e.getMessage()), + e); } } @Override - public void heartbeat() throws Exception {} + public void heartbeat() throws Exception { + // do nothing + } @Override public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { @@ -133,12 +132,10 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { "IoTDBThriftConnectorV1 only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent."); } } catch (TException e) { - LOGGER.warn( - "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e); - // the connection may be broken, try to reconnect by catching PipeConnectionException throw new PipeConnectionException( String.format( - "Network error when transfer tablet insertion event, because %s.", e.getMessage()), + "Network error when transfer tablet insertion event %s, because %s.", + tabletInsertionEvent, e.getMessage()), e); } } @@ -185,12 +182,10 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { try { doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent); } catch (TException e) { - LOGGER.warn( - "Network error when transfer tsfile insertion event: {}.", tsFileInsertionEvent, e); - // the connection may be broken, try to reconnect by catching PipeConnectionException throw new PipeConnectionException( String.format( - "Network error when transfer tsfile insertion event, because %s.", e.getMessage()), + "Network error when transfer tsfile insertion event %s, because %s.", + tsFileInsertionEvent, e.getMessage()), e); } } @@ -229,7 +224,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) { position = resp.getEndWritingOffset(); reader.seek(position); - LOGGER.info(String.format("Redirect file position to %s.", position)); + LOGGER.info("Redirect file position to {}.", position); continue; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java index cc803c33b95..de1df42b73a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java @@ -170,18 +170,25 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { writingFileWriter = null; } if (writingFile != null && writingFile.exists()) { - final boolean ignored = writingFile.delete(); - LOGGER.info(String.format("original file %s was deleted.", writingFile.getPath())); + if (writingFile.delete()) { + LOGGER.info("original file {} was deleted.", writingFile.getPath()); + } else { + LOGGER.warn("failed to delete original file {}.", writingFile.getPath()); + } writingFile = null; } final File receiveDir = new File(RECEIVER_FILE_DIR); if (!receiveDir.exists()) { - boolean ignored = receiveDir.mkdirs(); + if (receiveDir.mkdirs()) { + LOGGER.info("receiver file dir {} was created.", receiveDir.getPath()); + } else { + LOGGER.warn("failed to create receiver file dir {}.", receiveDir.getPath()); + } } writingFile = new File(RECEIVER_FILE_DIR, fileName); writingFileWriter = new RandomAccessFile(writingFile, "rw"); - LOGGER.info(String.format("start to write transferring file %s.", writingFile.getPath())); + LOGGER.info("start to write transferring file {}.", writingFile.getPath()); } private boolean isFileExistedAndNameCorrect(String fileName) { @@ -288,13 +295,9 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { if (writingFileWriter != null) { writingFileWriter.close(); } - if (writingFile != null) { - if (!writingFile.delete()) { - LOGGER.warn( - String.format( - "IoTDBThriftReceiverV1#handleExit: delete file %s error.", - writingFile.getPath())); - } + if (writingFile != null && !writingFile.delete()) { + LOGGER.warn( + "IoTDBThriftReceiverV1#handleExit: delete file {} error.", writingFile.getPath()); } } catch (IOException e) { LOGGER.warn("IoTDBThriftReceiverV1#handleExit: meeting errors on handleExit().", e); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java index cdbb5a59eef..b24ef0e9066 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java @@ -45,7 +45,10 @@ public enum PipeRequestType { private static final Map<Short, PipeRequestType> TYPE_MAP = Arrays.stream(PipeRequestType.values()) - .collect(HashMap::new, (map, type) -> map.put(type.getType(), type), HashMap::putAll); + .collect( + HashMap::new, + (typeMap, pipeRequestType) -> typeMap.put(pipeRequestType.getType(), pipeRequestType), + HashMap::putAll); public static boolean isValidatedRequestType(short type) { return TYPE_MAP.containsKey(type); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java index 6b150c53357..daf95b44add 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java @@ -42,7 +42,7 @@ public abstract class PipeTransferTabletInsertionEventHandler<E extends TPipeTra implements AsyncMethodCallback<E> { private static final Logger LOGGER = - LoggerFactory.getLogger(PipeTransferInsertNodeTabletInsertionEventHandler.class); + LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class); private final long requestCommitId; private final EnrichedEvent event; @@ -54,7 +54,7 @@ public abstract class PipeTransferTabletInsertionEventHandler<E extends TPipeTra (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * Math.pow(2, 5)); private int retryCount = 0; - public PipeTransferTabletInsertionEventHandler( + protected PipeTransferTabletInsertionEventHandler( long requestCommitId, @Nullable EnrichedEvent event, TPipeTransferReq req, diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java index 21c244f0537..692fa0f7ada 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java @@ -151,7 +151,7 @@ public class PipeTransferTsFileInsertionEventHandler if (code == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) { position = resp.getEndWritingOffset(); reader.seek(position); - LOGGER.info(String.format("Redirect file position to %s.", position)); + LOGGER.info("Redirect file position to {}.", position); } else if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new PipeException( String.format("Transfer file %s error, result status %s.", tsFile, resp.getStatus())); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index f2df19bf2b8..462ba9164f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -40,7 +40,7 @@ public abstract class EnrichedEvent implements Event { private final String pattern; - public EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) { + protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) { referenceCount = new AtomicInteger(0); this.pipeTaskMeta = pipeTaskMeta; this.pattern = pattern; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 0933e1a3c71..80d6eaf7879 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -124,7 +124,6 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } return dataContainer.processRowByRow(consumer); } catch (Exception e) { - LOGGER.error("Process row by row error.", e); throw new PipeException("Process row by row error.", e); } } @@ -137,7 +136,6 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } return dataContainer.processTablet(consumer); } catch (Exception e) { - LOGGER.error("Process tablet error.", e); throw new PipeException("Process tablet error.", e); } } @@ -155,7 +153,6 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } return dataContainer.convertToTablet(); } catch (Exception e) { - LOGGER.error("Convert to tablet error.", e); throw new PipeException("Convert to tablet error.", e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 594c0a38291..168400443c8 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -75,16 +75,16 @@ public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { } public Tablet convertToTablet() { - final String pattern = getPattern(); + final String notNullPattern = getPattern(); - // if pattern is "root", we don't need to convert, just return the original tablet - if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + // if notNullPattern is "root", we don't need to convert, just return the original tablet + if (notNullPattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { return tablet; } - // if pattern is not "root", we need to convert the tablet + // if notNullPattern is not "root", we need to convert the tablet if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(tablet, isAligned, pattern); + dataContainer = new TabletInsertionDataContainer(tablet, isAligned, notNullPattern); } return dataContainer.convertToTablet(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 86fbbd28f6e..427bd1ec9a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -83,7 +83,7 @@ public class TabletInsertionDataContainer { //////////////////////////// parse //////////////////////////// - private void parse(InsertRowNode insertRowNode, String pattern) throws IllegalPathException { + private void parse(InsertRowNode insertRowNode, String pattern) { final int originColumnSize = insertRowNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; @@ -149,8 +149,7 @@ public class TabletInsertionDataContainer { rowCount = 1; } - private void parse(InsertTabletNode insertTabletNode, String pattern) - throws IllegalPathException { + private void parse(InsertTabletNode insertTabletNode, String pattern) { final int originColumnSize = insertTabletNode.getMeasurements().length; final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize]; @@ -269,7 +268,6 @@ public class TabletInsertionDataContainer { rowCount = tablet.rowSize; } - // TODO: cache the result keyed by deviceId to improve performance private void generateColumnIndexMapper( String[] originMeasurementList, String pattern, diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 40aaf53c1ec..7e06fb0e01f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -170,12 +170,12 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns final String errorMsg = String.format( "Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath()); - LOGGER.warn(errorMsg); + LOGGER.warn(errorMsg, e); Thread.currentThread().interrupt(); throw new PipeException(errorMsg); } catch (IOException e) { final String errorMsg = String.format("Read TsFile %s error.", resource.getTsFilePath()); - LOGGER.warn(errorMsg); + LOGGER.warn(errorMsg, e); throw new PipeException(errorMsg); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index a9a5f657d83..0d72fd06e53 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -79,7 +79,6 @@ public class TsFileInsertionDataContainer implements AutoCloseable { deviceIsAlignedMap = readDeviceIsAlignedMap(); measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); } catch (Exception e) { - LOGGER.error("failed to create TsFileInsertionDataContainer", e); close(); throw e; } @@ -125,14 +124,14 @@ public class TsFileInsertionDataContainer implements AutoCloseable { } private Map<String, Boolean> readDeviceIsAlignedMap() throws IOException { - final Map<String, Boolean> deviceIsAlignedMap = new HashMap<>(); + final Map<String, Boolean> deviceIsAlignedResultMap = new HashMap<>(); final TsFileDeviceIterator deviceIsAlignedIterator = tsFileSequenceReader.getAllDevicesIteratorWithIsAligned(); while (deviceIsAlignedIterator.hasNext()) { final Pair<String, Boolean> deviceIsAlignedPair = deviceIsAlignedIterator.next(); - deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), deviceIsAlignedPair.getRight()); + deviceIsAlignedResultMap.put(deviceIsAlignedPair.getLeft(), deviceIsAlignedPair.getRight()); } - return deviceIsAlignedMap; + return deviceIsAlignedResultMap; } /** @return TabletInsertionEvent in a streaming way */ diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java index 22905643a02..ff42491bd35 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java @@ -19,17 +19,13 @@ package org.apache.iotdb.db.pipe.execution.executor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the * PipeTaskScheduler. It is a singleton class. */ +@SuppressWarnings("unused") // assignerSubtaskExecutor is for future use public class PipeSubtaskExecutorManager { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutorManager.class); - private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor; private final PipeProcessorSubtaskExecutor processorSubtaskExecutor; private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java index ca81f1c7643..8d12a802978 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java @@ -79,12 +79,8 @@ public class PipeFileResourceManager { } private boolean increaseReferenceIfExists(String path) { - if (hardlinkOrCopiedFileToReferenceMap.containsKey(path)) { - hardlinkOrCopiedFileToReferenceMap.put( - path, hardlinkOrCopiedFileToReferenceMap.get(path) + 1); - return true; - } - return false; + hardlinkOrCopiedFileToReferenceMap.computeIfPresent(path, (key, value) -> value + 1); + return hardlinkOrCopiedFileToReferenceMap.containsKey(path); } private static File getHardlinkOrCopiedFileInPipeDir(File file) throws IOException { @@ -125,8 +121,11 @@ public class PipeFileResourceManager { } private static File createHardLink(File sourceFile, File hardlink) throws IOException { - if (!hardlink.getParentFile().exists()) { - boolean ignored = hardlink.getParentFile().mkdirs(); + if (!hardlink.getParentFile().exists() && !hardlink.getParentFile().mkdirs()) { + throw new IOException( + String.format( + "failed to create hardlink %s for file %s: failed to create parent dir %s", + hardlink.getPath(), sourceFile.getPath(), hardlink.getParentFile().getPath())); } final Path sourcePath = FileSystems.getDefault().getPath(sourceFile.getAbsolutePath()); @@ -136,8 +135,11 @@ public class PipeFileResourceManager { } private static File copyFile(File sourceFile, File targetFile) throws IOException { - if (!targetFile.getParentFile().exists()) { - boolean ignored = targetFile.getParentFile().mkdirs(); + if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) { + throw new IOException( + String.format( + "failed to copy file %s to %s: failed to create parent dir %s", + sourceFile.getPath(), targetFile.getPath(), targetFile.getParentFile().getPath())); } Files.copy(sourceFile.toPath(), targetFile.toPath()); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java index b4a01a7828b..b882a96962d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java @@ -27,11 +27,12 @@ import org.apache.iotdb.db.wal.utils.WALEntryHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -public class PipeWALResource implements AutoCloseable { +public class PipeWALResource implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResource.class); @@ -39,8 +40,7 @@ public class PipeWALResource implements AutoCloseable { private final AtomicInteger referenceCount; - // TODO: make this configurable - public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60; + public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 60; private final AtomicLong lastLogicalPinTime; private final AtomicBoolean isPhysicallyPinned; @@ -155,8 +155,4 @@ public class PipeWALResource implements AutoCloseable { referenceCount.set(0); } - - public int getReferenceCount() { - return referenceCount.get(); - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java index de67bb8543f..6fdf3a84ed0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java @@ -9,11 +9,10 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -public class PipeWALResourceManager implements AutoCloseable { +public class PipeWALResourceManager { private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap; @@ -23,7 +22,6 @@ public class PipeWALResourceManager implements AutoCloseable { private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName()); - private final ScheduledFuture<?> ttlCheckerFuture; public PipeWALResourceManager() { // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple threads @@ -34,30 +32,29 @@ public class PipeWALResourceManager implements AutoCloseable { memtableIdSegmentLocks[i] = new ReentrantLock(); } - ttlCheckerFuture = - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - PIPE_WAL_RESOURCE_TTL_CHECKER, - () -> { - Iterator<Map.Entry<Long, PipeWALResource>> iterator = - memtableIdToPipeWALResourceMap.entrySet().iterator(); - while (iterator.hasNext()) { - final Map.Entry<Long, PipeWALResource> entry = iterator.next(); - final ReentrantLock lock = - memtableIdSegmentLocks[(int) (entry.getKey() % SEGMENT_LOCK_COUNT)]; - - lock.lock(); - try { - if (entry.getValue().invalidateIfPossible()) { - iterator.remove(); - } - } finally { - lock.unlock(); - } + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + PIPE_WAL_RESOURCE_TTL_CHECKER, + () -> { + Iterator<Map.Entry<Long, PipeWALResource>> iterator = + memtableIdToPipeWALResourceMap.entrySet().iterator(); + while (iterator.hasNext()) { + final Map.Entry<Long, PipeWALResource> entry = iterator.next(); + final ReentrantLock lock = + memtableIdSegmentLocks[(int) (entry.getKey() % SEGMENT_LOCK_COUNT)]; + + lock.lock(); + try { + if (entry.getValue().invalidateIfPossible()) { + iterator.remove(); } - }, - PipeWALResource.MIN_TIME_TO_LIVE_IN_MS, - PipeWALResource.MIN_TIME_TO_LIVE_IN_MS, - TimeUnit.MILLISECONDS); + } finally { + lock.unlock(); + } + } + }, + PipeWALResource.MIN_TIME_TO_LIVE_IN_MS, + PipeWALResource.MIN_TIME_TO_LIVE_IN_MS, + TimeUnit.MILLISECONDS); } public void pin(long memtableId, WALEntryHandler walEntryHandler) { @@ -83,34 +80,4 @@ public class PipeWALResourceManager implements AutoCloseable { lock.unlock(); } } - - @Override - public void close() throws Exception { - if (ttlCheckerFuture != null) { - ttlCheckerFuture.cancel(true); - } - - for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) { - final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)]; - - lock.lock(); - try { - memtableIdToPipeWALResourceMap.get(memtableId).close(); - memtableIdToPipeWALResourceMap.remove(memtableId); - } finally { - lock.unlock(); - } - } - } - - public int getReferenceCount(long memtableId) { - final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)]; - - lock.lock(); - try { - return memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount(); - } finally { - lock.unlock(); - } - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java index 0efb7a2ae67..c00d294be00 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java @@ -29,5 +29,6 @@ public interface EventSupplier { * the moment, but the collector is still running for more events. * @throws Exception if the supplier fails to supply the event. */ + @SuppressWarnings("squid:S00112") // Exception is thrown by the interface Event supply() throws Exception; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java index 6c6f2498b97..0d676ac4992 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java @@ -43,8 +43,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage { PipeParameters collectorParameters, TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta) { - // TODO: avoid if-else, use reflection to create collector all the time - this.pipeCollector = + pipeCollector = collectorParameters .getStringOrDefault( PipeCollectorConstant.COLLECTOR_KEY, diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java index ff4e954e614..ba8058524ae 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java @@ -46,7 +46,9 @@ public class PipeTaskConnectorStage extends PipeTaskStage { } @Override - public void createSubtask() throws PipeException {} + public void createSubtask() throws PipeException { + // do nothing + } @Override public void startSubtask() throws PipeException { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java index e3a793c0d8a..c2b4426435d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java @@ -24,6 +24,15 @@ import org.apache.iotdb.pipe.api.exception.PipeException; public abstract class PipeTaskStage { + private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STARTED = + "The PipeTaskStage has been started"; + private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED = + "The PipeTaskStage has been dropped"; + private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STOPPED = + "The PipeTaskStage has been externally stopped"; + private static final String MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED = + "The PipeTaskStage has not been created"; + protected PipeStatus status = null; protected boolean hasBeenExternallyStopped = false; @@ -35,14 +44,14 @@ public abstract class PipeTaskStage { public synchronized void create() { if (status != null) { if (status == PipeStatus.RUNNING) { - throw new PipeException("The PipeTaskStage has been started"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STARTED); } if (status == PipeStatus.DROPPED) { - throw new PipeException("The PipeTaskStage has been dropped"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED); } // status == PipeStatus.STOPPED if (hasBeenExternallyStopped) { - throw new PipeException("The PipeTaskStage has been externally stopped"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STOPPED); } // otherwise, do nothing to allow retry strategy return; @@ -63,14 +72,14 @@ public abstract class PipeTaskStage { */ public synchronized void start() { if (status == null) { - throw new PipeException("The PipeTaskStage has not been created"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED); } if (status == PipeStatus.RUNNING) { // do nothing to allow retry strategy return; } if (status == PipeStatus.DROPPED) { - throw new PipeException("The PipeTaskStage has been dropped"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED); } // status == PipeStatus.STOPPED, start the subtask @@ -88,14 +97,14 @@ public abstract class PipeTaskStage { */ public synchronized void stop() { if (status == null) { - throw new PipeException("The PipeTaskStage has not been created"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED); } if (status == PipeStatus.STOPPED) { // do nothing to allow retry strategy return; } if (status == PipeStatus.DROPPED) { - throw new PipeException("The PipeTaskStage has been dropped"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED); } // status == PipeStatus.RUNNING, stop the connector @@ -114,7 +123,7 @@ public abstract class PipeTaskStage { */ public synchronized void drop() { if (status == null) { - throw new PipeException("The PipeTaskStage has not been created"); + throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED); } if (status == PipeStatus.DROPPED) { // do nothing to allow retry strategy diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java index ce2b6403a2f..47dd1ba7e03 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java @@ -86,7 +86,6 @@ public class PipeConnectorSubtask extends PipeSubtask { } catch (PipeConnectionException e) { throw e; } catch (Exception e) { - LOGGER.warn("Execute Connector subtask once error.", e); throw new PipeException( "Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.", e); @@ -99,19 +98,29 @@ public class PipeConnectorSubtask extends PipeSubtask { public void onFailure(@NotNull Throwable throwable) { // retry to connect to the target system if the connection is broken if (throwable instanceof PipeConnectionException) { + LOGGER.warn( + "PipeConnectionException occurred, retrying to connect to the target system...", + throwable); + int retry = 0; while (retry < MAX_RETRY_TIMES) { try { outputPipeConnector.handshake(); + LOGGER.info("Successfully reconnected to the target system."); break; } catch (Exception e) { retry++; - LOGGER.error("Failed to reconnect to the target system, retrying... ({} time(s))", retry); + LOGGER.warn( + "Failed to reconnect to the target system, retrying ... after [{}/{}] time(s) retries.", + retry, + MAX_RETRY_TIMES, + e); try { Thread.sleep(retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()); } catch (InterruptedException interruptedException) { LOGGER.info( - "Interrupted while sleeping, perhaps need to check whether the thread is interrupted."); + "Interrupted while sleeping, perhaps need to check whether the thread is interrupted.", + interruptedException); Thread.currentThread().interrupt(); } } @@ -120,22 +129,35 @@ public class PipeConnectorSubtask extends PipeSubtask { // stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES // times if (retry == MAX_RETRY_TIMES) { - final String errorMessage = - String.format( - "Failed to reconnect to the target system after %d times, stopping current pipe task %s...", - MAX_RETRY_TIMES, taskID); - LOGGER.warn(errorMessage, throwable); - lastFailedCause = throwable; - if (lastEvent instanceof EnrichedEvent) { + LOGGER.warn( + "Failed to reconnect to the target system after {} times, stopping current pipe task {}... " + + "Status shown when query the pipe will be 'STOPPED'. " + + "Please restart the task by executing 'START PIPE' manually if needed.", + MAX_RETRY_TIMES, + taskID, + throwable); + ((EnrichedEvent) lastEvent) .reportException(new PipeRuntimeConnectorCriticalException(throwable.getMessage())); + } else { + LOGGER.error( + "Failed to reconnect to the target system after {} times, stopping current pipe task {} locally... " + + "Status shown when query the pipe will be 'RUNNING' instead of 'STOPPED', but the task is actually stopped. " + + "Please restart the task by executing 'START PIPE' manually if needed.", + MAX_RETRY_TIMES, + taskID, + throwable); + + // FIXME: non-EnrichedEvent should be reported to the ConfigNode instead of being logged } // although the pipe task will be stopped, we still don't release the last event here // because we need to keep it for the next retry. if user wants to restart the task, // the last event will be processed again. the last event will be released when the task // is dropped or the process is running normally. + + // stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES return; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java index 071ab4da15e..9c56b4e04db 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java @@ -42,6 +42,9 @@ import java.util.TreeMap; public class PipeConnectorSubtaskManager { + private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE = + "Failed to deregister PipeConnectorSubtask. No such subtask: "; + private final Map<String, PipeConnectorSubtaskLifeCycle> attributeSortedString2SubtaskLifeCycleMap = new HashMap<>(); @@ -53,7 +56,6 @@ public class PipeConnectorSubtaskManager { new TreeMap<>(pipeConnectorParameters.getAttribute()).toString(); if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - // TODO: construct all PipeConnector with the same reflection method, avoid using if-else // 1. construct, validate and customize PipeConnector, and then handshake (create connection) // with the target final String connectorKey = @@ -103,8 +105,7 @@ public class PipeConnectorSubtaskManager { public synchronized void deregister(String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException( - "Failed to deregister PipeConnectorSubtask. No such subtask: " + attributeSortedString); + throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); } if (attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister()) { @@ -114,8 +115,7 @@ public class PipeConnectorSubtaskManager { public synchronized void start(String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException( - "Failed to deregister PipeConnectorSubtask. No such subtask: " + attributeSortedString); + throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); } attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start(); @@ -123,8 +123,7 @@ public class PipeConnectorSubtaskManager { public synchronized void stop(String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException( - "Failed to deregister PipeConnectorSubtask. No such subtask: " + attributeSortedString); + throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); } attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java index 2472a29eefe..cd7bc206454 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java @@ -69,7 +69,6 @@ public class PipeProcessorSubtask extends PipeSubtask { releaseLastEvent(); } catch (Exception e) { - e.printStackTrace(); throw new PipeException( "Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.", e); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java index 9f5c6e4a37a..4672f8cc532 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java @@ -53,7 +53,6 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void protected static final int MAX_RETRY_TIMES = 5; private final AtomicInteger retryCount = new AtomicInteger(0); - protected Throwable lastFailedCause; protected Event lastEvent; @@ -97,6 +96,7 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void * @return true if the event is consumed successfully, false if no more event can be consumed * @throws Exception if any error occurs when consuming the event */ + @SuppressWarnings("squid:S112") // allow to throw Exception protected abstract boolean executeOnce() throws Exception; @Override @@ -107,20 +107,31 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void @Override public void onFailure(@NotNull Throwable throwable) { + if (retryCount.get() == 0) { + LOGGER.warn( + "Failed to execute subtask {}({}), because of {}. Will retry for {} times.", + taskID, + this.getClass().getSimpleName(), + throwable.getMessage(), + MAX_RETRY_TIMES, + throwable); + } + if (retryCount.get() < MAX_RETRY_TIMES) { retryCount.incrementAndGet(); LOGGER.warn( - String.format( - "Retry subtask %s, retry count [%s/%s]", - this.getClass().getSimpleName(), retryCount.get(), MAX_RETRY_TIMES)); + "Retry executing subtask {}({}), retry count [{}/{}]", + taskID, + this.getClass().getSimpleName(), + retryCount.get(), + MAX_RETRY_TIMES); submitSelf(); } else { final String errorMessage = String.format( - "Subtask %s failed, has been retried for %d times, last failed because of %s", - taskID, retryCount.get(), throwable); + "Failed to execute subtask %s(%s), retry count exceeds the max retry times %d, last exception: %s", + taskID, this.getClass().getSimpleName(), retryCount.get(), throwable.getMessage()); LOGGER.warn(errorMessage, throwable); - lastFailedCause = throwable; if (lastEvent instanceof EnrichedEvent) { ((EnrichedEvent) lastEvent) @@ -128,6 +139,23 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void throwable instanceof PipeRuntimeException ? (PipeRuntimeException) throwable : new PipeRuntimeCriticalException(errorMessage)); + LOGGER.warn( + "The last event is an instance of EnrichedEvent, so the exception is reported. " + + "Stopping current pipe task {}({}) locally... " + + "Status shown when query the pipe will be 'STOPPED'. " + + "Please restart the task by executing 'START PIPE' manually if needed.", + taskID, + this.getClass().getSimpleName(), + throwable); + } else { + LOGGER.error( + "The last event is not an instance of EnrichedEvent, so the exception cannot be reported. " + + "Stopping current pipe task {}({}) locally... " + + "Status shown when query the pipe will be 'RUNNING' instead of 'STOPPED', but the task is actually stopped. " + + "Please restart the task by executing 'START PIPE' manually if needed.", + taskID, + this.getClass().getSimpleName(), + throwable); } // although the pipe task will be stopped, we still don't release the last event here @@ -185,8 +213,4 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void public String getTaskID() { return taskID; } - - public Throwable getLastFailedCause() { - return lastFailedCause; - } }
