This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new a37a18fe885 Pipe: fix bugs & smells reported by SonarCloud
(org.apache.iotdb.db.pipe) (#10268) (#10271)
a37a18fe885 is described below
commit a37a18fe8853ee351e0b4a3e6ea53be5540e3538
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 22 13:44:22 2023 +0800
Pipe: fix bugs & smells reported by SonarCloud (org.apache.iotdb.db.pipe)
(#10268) (#10271)
(cherry picked from commit 4e8cb333130ed4ebd8c1e1a0fdf7eddda18539a1)
---
.../db/pipe/agent/plugin/PipePluginAgent.java | 11 +-
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 6 +-
.../SimpleConsensusProgressIndexAssigner.java | 5 +-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 135 +++++++++++----------
.../pipe/collector/IoTDBDataRegionCollector.java | 15 ++-
.../PipeHistoricalDataRegionCollector.java | 4 +-
.../PipeHistoricalDataRegionTsFileCollector.java | 8 +-
.../realtime/PipeRealtimeDataRegionCollector.java | 2 +-
.../PipeRealtimeDataRegionFakeCollector.java | 21 +++-
.../PipeRealtimeDataRegionHybridCollector.java | 27 +++--
.../PipeRealtimeDataRegionLogCollector.java | 7 +-
.../PipeRealtimeDataRegionTsFileCollector.java | 7 +-
.../realtime/epoch/TsFileEpochManager.java | 11 +-
.../matcher/CachedSchemaPatternMatcher.java | 4 +-
.../pipe/connector/legacy/IoTDBSyncConnector.java | 56 ++++-----
.../pipe/connector/legacy/IoTDBSyncReceiver.java | 37 ++----
.../legacy/pipedata/DeletionPipeData.java | 8 +-
.../pipe/connector/legacy/pipedata/PipeData.java | 5 +-
.../connector/legacy/pipedata/TsFilePipeData.java | 3 +-
.../pipe/connector/v1/IoTDBThriftConnectorV1.java | 29 ++---
.../pipe/connector/v1/IoTDBThriftReceiverV1.java | 25 ++--
.../db/pipe/connector/v1/PipeRequestType.java | 5 +-
.../PipeTransferTabletInsertionEventHandler.java | 4 +-
.../PipeTransferTsFileInsertionEventHandler.java | 2 +-
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 2 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 3 -
.../common/tablet/PipeRawTabletInsertionEvent.java | 10 +-
.../tablet/TabletInsertionDataContainer.java | 27 ++---
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +-
.../tsfile/TsFileInsertionDataContainer.java | 7 +-
.../executor/PipeSubtaskExecutorManager.java | 6 +-
.../resource/file/PipeFileResourceManager.java | 22 ++--
.../db/pipe/resource/wal/PipeWALResource.java | 10 +-
.../pipe/resource/wal/PipeWALResourceManager.java | 79 ++++--------
.../db/pipe/task/connection/EventSupplier.java | 1 +
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 3 +-
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 4 +-
.../iotdb/db/pipe/task/stage/PipeTaskStage.java | 25 ++--
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 42 +++++--
.../task/subtask/PipeConnectorSubtaskManager.java | 13 +-
.../db/pipe/task/subtask/PipeProcessorSubtask.java | 1 -
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 46 +++++--
42 files changed, 381 insertions(+), 361 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 9266bdf9a22..5a05cb43b53 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -137,7 +137,8 @@ public class PipePluginAgent {
updateAllRegisteredClasses(currentActiveClassLoader);
final Class<?> pluginClass = Class.forName(className, true,
currentActiveClassLoader);
- // ensure that it is a PipePlugin class
+
+ @SuppressWarnings("unused") // ensure that it is a PipePlugin class
final PipePlugin ignored = (PipePlugin)
pluginClass.getDeclaredConstructor().newInstance();
pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
@@ -165,7 +166,7 @@ public class PipePluginAgent {
}
}
- public void deregister(String pluginName, boolean needToDeleteJar) throws
Exception {
+ public void deregister(String pluginName, boolean needToDeleteJar) throws
PipeException {
acquireLock();
try {
final PipePluginMeta information =
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
@@ -187,6 +188,8 @@ public class PipePluginAgent {
PipePluginExecutableManager.getInstance()
.removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
}
+ } catch (IOException e) {
+ throw new PipeException(e.getMessage(), e);
} finally {
releaseLock();
}
@@ -225,7 +228,7 @@ public class PipePluginAgent {
"Failed to reflect PipePlugin instance, because PipePlugin %s
has not been registered.",
pluginName.toUpperCase());
LOGGER.warn(errorMessage);
- throw new RuntimeException(errorMessage);
+ throw new PipeException(errorMessage);
}
try {
@@ -240,7 +243,7 @@ public class PipePluginAgent {
"Failed to reflect PipePlugin %s(%s) instance, because %s",
pluginName, information.getClassName(), e);
LOGGER.warn(errorMessage, e);
- throw new RuntimeException(errorMessage);
+ throw new PipeException(errorMessage);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 7d1a6336a16..afc899854d0 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -102,9 +102,9 @@ public class PipeRuntimeAgent implements IService {
public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException
pipeRuntimeException) {
LOGGER.warn(
- String.format(
- "PipeRuntimeException: pipe task meta %s, exception %s",
- pipeTaskMeta, pipeRuntimeException),
+ "Report PipeRuntimeException to local PipeTaskMeta({}), exception
message: {}",
+ pipeTaskMeta,
+ pipeRuntimeException.getMessage(),
pipeRuntimeException);
pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
index 7c81cd6cabd..b0a3eab9166 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
@@ -84,7 +85,7 @@ public class SimpleConsensusProgressIndexAssigner {
return;
}
try {
- String content = FileUtils.readFileToString(file, "UTF-8");
+ String content = FileUtils.readFileToString(file,
StandardCharsets.UTF_8);
rebootTimes = Integer.parseInt(content);
} catch (IOException e) {
LOGGER.error("Cannot parse reboot times from file {}",
file.getAbsolutePath(), e);
@@ -94,7 +95,7 @@ public class SimpleConsensusProgressIndexAssigner {
private void recordRebootTimes() throws IOException {
File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR +
REBOOT_TIMES_FILE_NAME);
- FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1),
"UTF-8");
+ FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1),
StandardCharsets.UTF_8);
}
public void assignIfNeeded(TsFileResource tsFileResource) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index e3fe56a52ec..bbbbd3a7d23 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -71,6 +71,9 @@ public class PipeTaskAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskAgent.class);
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe
status %s for pipe %s";
+ private static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected
pipe status %s: ";
+
private final PipeMetaKeeper pipeMetaKeeper;
private final PipeTaskManager pipeTaskManager;
@@ -238,8 +241,7 @@ public class PipeTaskAgent {
} else {
throw new IllegalStateException(
String.format(
- "Unknown pipe status %s for pipe %s",
- statusOnDataNode, pipeStaticMeta.getPipeName()));
+ MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode,
pipeStaticMeta.getPipeName()));
}
break;
case STOPPED:
@@ -248,8 +250,7 @@ public class PipeTaskAgent {
} else {
throw new IllegalStateException(
String.format(
- "Unknown pipe status %s for pipe %s",
- statusOnDataNode, pipeStaticMeta.getPipeName()));
+ MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode,
pipeStaticMeta.getPipeName()));
}
break;
case DROPPED:
@@ -259,8 +260,7 @@ public class PipeTaskAgent {
default:
throw new IllegalStateException(
String.format(
- "Unknown pipe status %s for pipe %s",
- statusFromConfigNode, pipeStaticMeta.getPipeName()));
+ MESSAGE_UNKNOWN_PIPE_STATUS, statusFromConfigNode,
pipeStaticMeta.getPipeName()));
}
}
@@ -296,27 +296,33 @@ public class PipeTaskAgent {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta != null) {
if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
- switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ final PipeStatus status =
existedPipeMeta.getRuntimeMeta().getStatus().get();
+ switch (status) {
case STOPPED:
case RUNNING:
- LOGGER.info(
- "Pipe {} (creation time = {}) has already been created.
Current status = {}. Skip creating.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been created.
Current status = {}. Skip creating.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
return false;
case DROPPED:
- LOGGER.info(
- "Pipe {} (creation time = {}) has already been dropped, but
the pipe task meta has not been cleaned up. "
- + "Current status = {}. Try dropping the pipe and
recreating it.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped, but
the pipe task meta has not been cleaned up. "
+ + "Current status = {}. Try dropping the pipe and
recreating it.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
// break to drop the pipe and recreate it
break;
default:
throw new IllegalStateException(
- "Unexpected status: " +
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ MESSAGE_UNEXPECTED_PIPE_STATUS
+ +
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
}
}
@@ -440,31 +446,39 @@ public class PipeTaskAgent {
return;
}
- switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ final PipeStatus status =
existedPipeMeta.getRuntimeMeta().getStatus().get();
+ switch (status) {
case STOPPED:
- LOGGER.info(
- "Pipe {} (creation time = {}) has been created. Current status =
{}. Starting.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been created. Current status =
{}. Starting.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
break;
case RUNNING:
- LOGGER.info(
- "Pipe {} (creation time = {}) has already been started. Current
status = {}. Skip starting.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been started. Current
status = {}. Skip starting.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
return;
case DROPPED:
- LOGGER.info(
- "Pipe {} (creation time = {}) has already been dropped. Current
status = {}. Skip starting.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped. Current
status = {}. Skip starting.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
return;
default:
throw new IllegalStateException(
- "Unexpected status: " +
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ MESSAGE_UNEXPECTED_PIPE_STATUS
+ + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
}
// trigger start() method for each pipe task
@@ -510,31 +524,37 @@ public class PipeTaskAgent {
return;
}
- switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ final PipeStatus status =
existedPipeMeta.getRuntimeMeta().getStatus().get();
+ switch (status) {
case STOPPED:
- LOGGER.info(
- "Pipe {} (creation time = {}) has already been stopped. Current
status = {}. Skip stopping.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been stopped. Current
status = {}. Skip stopping.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
return;
case RUNNING:
- LOGGER.info(
- "Pipe {} (creation time = {}) has been started. Current status =
{}. Stopping.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been started. Current status =
{}. Stopping.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
break;
case DROPPED:
- LOGGER.info(
- "Pipe {} (creation time = {}) has already been dropped. Current
status = {}. Skip stopping.",
- pipeName,
- creationTime,
- existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped. Current
status = {}. Skip stopping.",
+ pipeName,
+ creationTime,
+ status.name());
+ }
return;
default:
- throw new IllegalStateException(
- "Unexpected status: " +
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS +
status.name());
}
// trigger stop() method for each pipe task
@@ -593,13 +613,6 @@ public class PipeTaskAgent {
}
}
- private void stopPipeTask(TConsensusGroupId dataRegionGroupId,
PipeStaticMeta pipeStaticMeta) {
- final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta,
dataRegionGroupId);
- if (pipeTask != null) {
- pipeTask.stop();
- }
- }
-
///////////////////////// Heartbeat /////////////////////////
public synchronized void collectPipeMetaList(THeartbeatReq req,
THeartbeatResp resp)
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
index d1653b30c59..cbe7fee083e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
@@ -90,14 +90,14 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
COLLECTOR_REALTIME_MODE_LOG);
}
- constructHistoricalCollector(validator.getParameters());
+ constructHistoricalCollector();
constructRealtimeCollector(validator.getParameters());
historicalCollector.validate(validator);
realtimeCollector.validate(validator);
}
- private void constructHistoricalCollector(PipeParameters parameters) {
+ private void constructHistoricalCollector() {
// enable historical collector by default
historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
}
@@ -128,9 +128,8 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
default:
realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
LOGGER.warn(
- String.format(
- "Unsupported collector realtime mode: %s, create a hybrid
collector.",
- parameters.getString(COLLECTOR_REALTIME_MODE)));
+ "Unsupported collector realtime mode: {}, create a hybrid
collector.",
+ parameters.getString(COLLECTOR_REALTIME_MODE));
}
}
@@ -152,7 +151,7 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
hasBeenStarted.set(true);
final AtomicReference<Exception> exceptionHolder = new
AtomicReference<>(null);
- final DataRegionId dataRegionId = new DataRegionId(this.dataRegionId);
+ final DataRegionId dataRegionIdObject = new
DataRegionId(this.dataRegionId);
while (true) {
// try to start collectors in the data region ...
// first try to run if data region exists, then try to run if data
region does not exist.
@@ -160,7 +159,7 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
// runIfPresent and runIfAbsent operations. in this case, we need to
retry.
if (StorageEngine.getInstance()
.runIfPresent(
- dataRegionId,
+ dataRegionIdObject,
(dataRegion -> {
dataRegion.writeLock(
String.format(
@@ -173,7 +172,7 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
}))
|| StorageEngine.getInstance()
.runIfAbsent(
- dataRegionId,
+ dataRegionIdObject,
() ->
startHistoricalCollectorAndRealtimeCollector(exceptionHolder))) {
rethrowExceptionIfAny(exceptionHolder);
return;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
index beb96f8f26b..08d5684851e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.collector.historical;
import org.apache.iotdb.pipe.api.PipeCollector;
-public abstract class PipeHistoricalDataRegionCollector implements
PipeCollector {
+public interface PipeHistoricalDataRegionCollector extends PipeCollector {
- public abstract boolean hasConsumedAll();
+ boolean hasConsumedAll();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 7d4bc25f2dc..6ade5ca4ad8 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -50,7 +50,7 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COL
import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY;
-public class PipeHistoricalDataRegionTsFileCollector extends
PipeHistoricalDataRegionCollector {
+public class PipeHistoricalDataRegionTsFileCollector implements
PipeHistoricalDataRegionCollector {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
@@ -69,10 +69,10 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
private Queue<PipeTsFileInsertionEvent> pendingQueue;
- public PipeHistoricalDataRegionTsFileCollector() {}
-
@Override
- public void validate(PipeParameterValidator validator) {}
+ public void validate(PipeParameterValidator validator) {
+ // do nothing
+ }
@Override
public void customize(
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
index ab9be42b062..4e6e98c168e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -35,7 +35,7 @@ public abstract class PipeRealtimeDataRegionCollector
implements PipeCollector {
protected String dataRegionId;
protected PipeTaskMeta pipeTaskMeta;
- public PipeRealtimeDataRegionCollector() {}
+ protected PipeRealtimeDataRegionCollector() {}
@Override
public void validate(PipeParameterValidator validator) throws Exception {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
index fbc28c4dbca..6051930e4e9 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
@@ -26,15 +26,22 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
public class PipeRealtimeDataRegionFakeCollector extends
PipeRealtimeDataRegionCollector {
+
@Override
- public void validate(PipeParameterValidator validator) {}
+ public void validate(PipeParameterValidator validator) {
+ // do nothing
+ }
@Override
public void customize(
- PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration) {}
+ PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration) {
+ // do nothing
+ }
@Override
- public void start() {}
+ public void start() {
+ // do nothing
+ }
@Override
public Event supply() {
@@ -42,7 +49,9 @@ public class PipeRealtimeDataRegionFakeCollector extends
PipeRealtimeDataRegionC
}
@Override
- public void collect(PipeRealtimeCollectEvent event) {}
+ public void collect(PipeRealtimeCollectEvent event) {
+ // do nothing
+ }
@Override
public boolean isNeedListenToTsFile() {
@@ -55,7 +64,9 @@ public class PipeRealtimeDataRegionFakeCollector extends
PipeRealtimeDataRegionC
}
@Override
- public void close() {}
+ public void close() {
+ // do nothing
+ }
@Override
public String toString() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 1551fc2a582..eeca9a82dad 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -80,15 +80,15 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
return;
}
- if
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE))
{
- if (!pendingQueue.offer(event)) {
- LOGGER.warn(
- String.format(
- "collectTabletInsertion: pending queue of
PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard tablet
event %s, current state %s",
- this, event, event.getTsFileEpoch().getState(this)));
- // this would not happen, but just in case.
- // UnboundedBlockingPendingQueue is unbounded, so it should never
reach capacity.
- }
+ if
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
+ && !pendingQueue.offer(event)) {
+ LOGGER.warn(
+ "collectTabletInsertion: pending queue of
PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard tablet
event {}, current state {}",
+ this,
+ event,
+ event.getTsFileEpoch().getState(this));
+ // this would not happen, but just in case.
+ // UnboundedBlockingPendingQueue is unbounded, so it should never reach
capacity.
}
}
@@ -102,9 +102,10 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
if (!pendingQueue.offer(event)) {
LOGGER.warn(
- String.format(
- "collectTsFileInsertion: pending queue of
PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard TsFile
event %s, current state %s",
- this, event, event.getTsFileEpoch().getState(this)));
+ "collectTsFileInsertion: pending queue of
PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard TsFile
event {}, current state {}",
+ this,
+ event,
+ event.getTsFileEpoch().getState(this));
// this would not happen, but just in case.
// ListenableUnblockingPendingQueue is unbounded, so it should never
reach capacity.
}
@@ -163,7 +164,7 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
// this event is not reliable anymore. but the data represented by
this event
// has been carried by the following tsfile event, so we can just
discard this event.
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
- LOGGER.warn(String.format("Increase reference count for event %s
error.", event));
+ LOGGER.warn("Increase reference count for event {} error.", event);
return null;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index 42da6a5d6fc..2d02d4fba78 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -53,9 +53,10 @@ public class PipeRealtimeDataRegionLogCollector extends
PipeRealtimeDataRegionCo
if (!pendingQueue.offer(event)) {
LOGGER.warn(
- String.format(
- "collect: pending queue of PipeRealtimeDataRegionLogCollector %s
has reached capacity, discard tablet event %s, current state %s",
- this, event, event.getTsFileEpoch().getState(this)));
+ "collect: pending queue of PipeRealtimeDataRegionLogCollector {} has
reached capacity, discard tablet event {}, current state {}",
+ this,
+ event,
+ event.getTsFileEpoch().getState(this));
// this would not happen, but just in case.
// ListenableUnblockingPendingQueue is unbounded, so it should never
reach capacity.
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index cd56caae708..f43582a5541 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -53,9 +53,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends
PipeRealtimeDataRegio
if (!pendingQueue.offer(event)) {
LOGGER.warn(
- String.format(
- "collect: pending queue of PipeRealtimeDataRegionTsFileCollector
%s has reached capacity, discard TsFile event %s, current state %s",
- this, event, event.getTsFileEpoch().getState(this)));
+ "collect: pending queue of PipeRealtimeDataRegionTsFileCollector {}
has reached capacity, discard TsFile event {}, current state {}",
+ this,
+ event,
+ event.getTsFileEpoch().getState(this));
// this would not happen, but just in case.
// ListenableUnblockingPendingQueue is unbounded, so it should never
reach capacity.
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
index 095bac51c2b..dbecc2f2a72 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
@@ -46,11 +46,12 @@ public class TsFileEpochManager {
final String filePath = resource.getTsFilePath();
// this would not happen, but just in case
- if (!filePath2Epoch.containsKey(filePath)) {
- LOGGER.info(
- String.format("Pipe: can not find TsFileEpoch for TsFile %s,
creating it", filePath));
- filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
- }
+ filePath2Epoch.computeIfAbsent(
+ filePath,
+ path -> {
+ LOGGER.info("TsFileEpoch not found for TsFile {}, creating a new
one", path);
+ return new TsFileEpoch(path);
+ });
return new PipeRealtimeCollectEvent(
event,
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
index cc3dd94a67c..14df36e9267 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -84,8 +84,6 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
}
}
- // TODO: maximum the efficiency of matching when pattern is root
- // TODO: memory control
@Override
public Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent
event) {
final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new
HashSet<>();
@@ -105,7 +103,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
deviceToCollectorsCache.get(device,
this::filterCollectorsByDevice);
// this would not happen
if (collectorsFilteredByDevice == null) {
- LOGGER.warn(String.format("Match result NPE when handle device %s",
device));
+ LOGGER.warn("Match result NPE when handle device {}", device);
continue;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
index ecb03c87a7f..eda9002a226 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
@@ -86,7 +86,7 @@ public class IoTDBSyncConnector implements PipeConnector {
private IoTDBThriftConnectorClient client;
- private static SessionPool sessionPool;
+ private SessionPool sessionPool;
@Override
public void validate(PipeParameterValidator validator) throws Exception {
@@ -143,8 +143,10 @@ public class IoTDBSyncConnector implements PipeConnector {
throw new PipeRuntimeCriticalException(errorMsg);
}
} catch (TException e) {
- LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress,
port), e);
- throw new PipeConnectionException(e.getMessage(), e);
+ throw new PipeConnectionException(
+ String.format(
+ "Connect to receiver %s:%s error, because: %s", ipAddress, port,
e.getMessage()),
+ e);
}
sessionPool =
@@ -158,27 +160,19 @@ public class IoTDBSyncConnector implements PipeConnector {
}
@Override
- public void heartbeat() throws Exception {}
+ public void heartbeat() throws Exception {
+ // do nothing
+ }
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
- try {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
- } else {
- throw new NotImplementedException(
- "IoTDBSyncConnector only support PipeInsertNodeInsertionEvent and
PipeTabletInsertionEvent.");
- }
- } catch (TException e) {
- LOGGER.warn(
- "Network error when transfer tablet insertion event: {}.",
tabletInsertionEvent, e);
- // the connection may be broken, try to reconnect by catching
PipeConnectionException
- throw new PipeConnectionException(
- String.format(
- "Network error when transfer tablet insertion event, because
%s.", e.getMessage()),
- e);
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ } else {
+ throw new NotImplementedException(
+ "IoTDBSyncConnector only support PipeInsertNodeInsertionEvent and
PipeTabletInsertionEvent.");
}
}
@@ -193,7 +187,7 @@ public class IoTDBSyncConnector implements PipeConnector {
}
private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
- throws PipeException, TException, IoTDBConnectionException,
StatementExecutionException {
+ throws PipeException, IoTDBConnectionException,
StatementExecutionException {
final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
if (pipeTabletInsertionEvent.isAligned()) {
sessionPool.insertAlignedTablet(tablet);
@@ -212,10 +206,10 @@ public class IoTDBSyncConnector implements PipeConnector {
try {
doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
} catch (TException e) {
- LOGGER.warn(
- "Network error when transfer tsFile insertion event: {}.",
tsFileInsertionEvent, e);
- // The connection may be broken, try to reconnect by catching
PipeConnectionException
- throw new PipeConnectionException("Network error when transfer tsFile
insertion event.", e);
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tsFile insertion event: %s.",
tsFileInsertionEvent),
+ e);
}
}
@@ -253,8 +247,7 @@ public class IoTDBSyncConnector implements PipeConnector {
} else if (status.code ==
TSStatusCode.SYNC_FILE_REDIRECTION_ERROR.getStatusCode()) {
position = Long.parseLong(status.message);
randomAccessFile.seek(position);
- LOGGER.info(
- String.format("Redirect to position %s in transferring tsFile
%s.", position, file));
+ LOGGER.info("Redirect to position {} in transferring tsFile {}.",
position, file);
} else if (status.code ==
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
String errorMsg =
String.format("Network failed to receive tsFile %s, status: %s",
file, status);
@@ -263,8 +256,11 @@ public class IoTDBSyncConnector implements PipeConnector {
}
}
} catch (TException e) {
- LOGGER.warn(String.format("Cannot send pipe data to receiver %s:%s.",
ipAddress, port), e);
- throw new PipeConnectionException(e.getMessage(), e);
+ throw new PipeConnectionException(
+ String.format(
+ "Cannot send pipe data to receiver %s:%s, because: %s.",
+ ipAddress, port, e.getMessage()),
+ e);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
index c2b651c1e62..ff6f9faad3e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
@@ -105,12 +105,11 @@ public class IoTDBSyncReceiver {
new File(getFileDataDir(identityInfo)).mkdirs();
}
createConnection(identityInfo);
- if (!StringUtils.isEmpty(identityInfo.getDatabase())) {
- if (!registerDatabase(identityInfo.getDatabase(), partitionFetcher,
schemaFetcher)) {
- return RpcUtils.getStatus(
- TSStatusCode.PIPESERVER_ERROR,
- String.format("Auto register database %s error.",
identityInfo.getDatabase()));
- }
+ if (!StringUtils.isEmpty(identityInfo.getDatabase())
+ && !registerDatabase(identityInfo.getDatabase(), partitionFetcher,
schemaFetcher)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPESERVER_ERROR,
+ String.format("Auto register database %s error.",
identityInfo.getDatabase()));
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
@@ -245,7 +244,9 @@ public class IoTDBSyncReceiver {
targetFile
.getName()
.substring(0, targetFile.getName().length() -
PATCH_SUFFIX.length()));
- targetFile.renameTo(newFile);
+ if (!targetFile.renameTo(newFile)) {
+ LOGGER.error("Fail to rename file {} to {}", targetFile, newFile);
+ }
}
}
tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
@@ -276,14 +277,9 @@ public class IoTDBSyncReceiver {
File file = new File(fileDir, fileName + PATCH_SUFFIX);
// step2. check startIndex
- try {
- IndexCheckResult result = checkStartIndexValid(new File(fileDir,
fileName), startIndex);
- if (!result.isResult()) {
- return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR,
result.getIndex());
- }
- } catch (IOException e) {
- LOGGER.error(e.getMessage());
- return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+ IndexCheckResult result = checkStartIndexValid(new File(fileDir,
fileName), startIndex);
+ if (!result.isResult()) {
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR,
result.getIndex());
}
// step3. append file
@@ -294,14 +290,7 @@ public class IoTDBSyncReceiver {
buff.get(byteArray);
randomAccessFile.write(byteArray);
recordStartIndex(new File(fileDir, fileName), startIndex + length);
- LOGGER.debug(
- "Sync "
- + fileName
- + " start at "
- + startIndex
- + " to "
- + (startIndex + length)
- + " is done.");
+ LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex,
startIndex + length);
} catch (IOException e) {
LOGGER.error(e.getMessage());
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
@@ -310,7 +299,7 @@ public class IoTDBSyncReceiver {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
- private IndexCheckResult checkStartIndexValid(File file, long startIndex)
throws IOException {
+ private IndexCheckResult checkStartIndexValid(File file, long startIndex) {
// get local index from memory map
long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
// get local index from file
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
index a98d19b5356..0e72acd6f67 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
@@ -52,10 +52,14 @@ public class DeletionPipeData extends PipeData {
}
@Override
- public void deserialize(DataInputStream stream) throws IOException,
IllegalPathException {
+ public void deserialize(DataInputStream stream) throws IOException {
super.deserialize(stream);
database = ReadWriteIOUtils.readString(stream);
- deletion = Deletion.deserializeWithoutFileOffset(stream);
+ try {
+ deletion = Deletion.deserializeWithoutFileOffset(stream);
+ } catch (IllegalPathException e) {
+ throw new IOException(e);
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
index 659c69876e9..86eca1c3585 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
@@ -64,12 +64,11 @@ public abstract class PipeData {
return byteStream.toByteArray();
}
- public void deserialize(DataInputStream stream) throws IOException,
IllegalPathException {
+ public void deserialize(DataInputStream stream) throws IOException {
serialNumber = stream.readLong();
}
- public static PipeData createPipeData(DataInputStream stream)
- throws IOException, IllegalPathException {
+ public static PipeData createPipeData(DataInputStream stream) throws
IOException {
PipeData pipeData;
PipeDataType type = PipeDataType.getPipeDataType(stream.readByte());
switch (type) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
index 803a5c67427..54f05eef81d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
@@ -19,7 +19,6 @@
*/
package org.apache.iotdb.db.pipe.connector.legacy.pipedata;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader;
import org.apache.iotdb.db.pipe.connector.legacy.loader.TsFileLoader;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -99,7 +98,7 @@ public class TsFilePipeData extends PipeData {
}
@Override
- public void deserialize(DataInputStream stream) throws IOException,
IllegalPathException {
+ public void deserialize(DataInputStream stream) throws IOException {
super.deserialize(stream);
parentDirPath = ReadWriteIOUtils.readString(stream);
if (parentDirPath == null) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index bb18acda62d..4160eaeb074 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -23,8 +23,6 @@ import
org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
@@ -65,15 +63,12 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
- private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private String ipAddress;
private int port;
private IoTDBThriftConnectorClient client;
- public IoTDBThriftConnectorV1() {}
-
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator
@@ -112,13 +107,17 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
throw new PipeException(String.format("Handshake error, result status
%s.", resp.status));
}
} catch (TException e) {
- LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress,
port), e);
- throw new PipeConnectionException(e.getMessage(), e);
+ throw new PipeConnectionException(
+ String.format(
+ "Connect to receiver %s:%s error, because: %s", ipAddress, port,
e.getMessage()),
+ e);
}
}
@Override
- public void heartbeat() throws Exception {}
+ public void heartbeat() throws Exception {
+ // do nothing
+ }
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
@@ -133,12 +132,10 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
"IoTDBThriftConnectorV1 only support
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
}
} catch (TException e) {
- LOGGER.warn(
- "Network error when transfer tablet insertion event: {}.",
tabletInsertionEvent, e);
- // the connection may be broken, try to reconnect by catching
PipeConnectionException
throw new PipeConnectionException(
String.format(
- "Network error when transfer tablet insertion event, because
%s.", e.getMessage()),
+ "Network error when transfer tablet insertion event %s, because
%s.",
+ tabletInsertionEvent, e.getMessage()),
e);
}
}
@@ -185,12 +182,10 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
try {
doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
} catch (TException e) {
- LOGGER.warn(
- "Network error when transfer tsfile insertion event: {}.",
tsFileInsertionEvent, e);
- // the connection may be broken, try to reconnect by catching
PipeConnectionException
throw new PipeConnectionException(
String.format(
- "Network error when transfer tsfile insertion event, because
%s.", e.getMessage()),
+ "Network error when transfer tsfile insertion event %s, because
%s.",
+ tsFileInsertionEvent, e.getMessage()),
e);
}
}
@@ -229,7 +224,7 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
== TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
position = resp.getEndWritingOffset();
reader.seek(position);
- LOGGER.info(String.format("Redirect file position to %s.",
position));
+ LOGGER.info("Redirect file position to {}.", position);
continue;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
index cc803c33b95..de1df42b73a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
@@ -170,18 +170,25 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
writingFileWriter = null;
}
if (writingFile != null && writingFile.exists()) {
- final boolean ignored = writingFile.delete();
- LOGGER.info(String.format("original file %s was deleted.",
writingFile.getPath()));
+ if (writingFile.delete()) {
+ LOGGER.info("original file {} was deleted.", writingFile.getPath());
+ } else {
+ LOGGER.warn("failed to delete original file {}.",
writingFile.getPath());
+ }
writingFile = null;
}
final File receiveDir = new File(RECEIVER_FILE_DIR);
if (!receiveDir.exists()) {
- boolean ignored = receiveDir.mkdirs();
+ if (receiveDir.mkdirs()) {
+ LOGGER.info("receiver file dir {} was created.", receiveDir.getPath());
+ } else {
+ LOGGER.warn("failed to create receiver file dir {}.",
receiveDir.getPath());
+ }
}
writingFile = new File(RECEIVER_FILE_DIR, fileName);
writingFileWriter = new RandomAccessFile(writingFile, "rw");
- LOGGER.info(String.format("start to write transferring file %s.",
writingFile.getPath()));
+ LOGGER.info("start to write transferring file {}.", writingFile.getPath());
}
private boolean isFileExistedAndNameCorrect(String fileName) {
@@ -288,13 +295,9 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
if (writingFileWriter != null) {
writingFileWriter.close();
}
- if (writingFile != null) {
- if (!writingFile.delete()) {
- LOGGER.warn(
- String.format(
- "IoTDBThriftReceiverV1#handleExit: delete file %s error.",
- writingFile.getPath()));
- }
+ if (writingFile != null && !writingFile.delete()) {
+ LOGGER.warn(
+ "IoTDBThriftReceiverV1#handleExit: delete file {} error.",
writingFile.getPath());
}
} catch (IOException e) {
LOGGER.warn("IoTDBThriftReceiverV1#handleExit: meeting errors on
handleExit().", e);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
index cdbb5a59eef..b24ef0e9066 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
@@ -45,7 +45,10 @@ public enum PipeRequestType {
private static final Map<Short, PipeRequestType> TYPE_MAP =
Arrays.stream(PipeRequestType.values())
- .collect(HashMap::new, (map, type) -> map.put(type.getType(), type),
HashMap::putAll);
+ .collect(
+ HashMap::new,
+ (typeMap, pipeRequestType) ->
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+ HashMap::putAll);
public static boolean isValidatedRequestType(short type) {
return TYPE_MAP.containsKey(type);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
index 6b150c53357..daf95b44add 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
@@ -42,7 +42,7 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
implements AsyncMethodCallback<E> {
private static final Logger LOGGER =
-
LoggerFactory.getLogger(PipeTransferInsertNodeTabletInsertionEventHandler.class);
+ LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
private final long requestCommitId;
private final EnrichedEvent event;
@@ -54,7 +54,7 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
(long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() *
Math.pow(2, 5));
private int retryCount = 0;
- public PipeTransferTabletInsertionEventHandler(
+ protected PipeTransferTabletInsertionEventHandler(
long requestCommitId,
@Nullable EnrichedEvent event,
TPipeTransferReq req,
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
index 21c244f0537..692fa0f7ada 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -151,7 +151,7 @@ public class PipeTransferTsFileInsertionEventHandler
if (code ==
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
position = resp.getEndWritingOffset();
reader.seek(position);
- LOGGER.info(String.format("Redirect file position to %s.", position));
+ LOGGER.info("Redirect file position to {}.", position);
} else if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format("Transfer file %s error, result status %s.", tsFile,
resp.getStatus()));
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index f2df19bf2b8..462ba9164f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -40,7 +40,7 @@ public abstract class EnrichedEvent implements Event {
private final String pattern;
- public EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
+ protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
referenceCount = new AtomicInteger(0);
this.pipeTaskMeta = pipeTaskMeta;
this.pattern = pattern;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 0933e1a3c71..80d6eaf7879 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -124,7 +124,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
return dataContainer.processRowByRow(consumer);
} catch (Exception e) {
- LOGGER.error("Process row by row error.", e);
throw new PipeException("Process row by row error.", e);
}
}
@@ -137,7 +136,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
return dataContainer.processTablet(consumer);
} catch (Exception e) {
- LOGGER.error("Process tablet error.", e);
throw new PipeException("Process tablet error.", e);
}
}
@@ -155,7 +153,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
return dataContainer.convertToTablet();
} catch (Exception e) {
- LOGGER.error("Convert to tablet error.", e);
throw new PipeException("Convert to tablet error.", e);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 594c0a38291..168400443c8 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -75,16 +75,16 @@ public class PipeRawTabletInsertionEvent implements
TabletInsertionEvent {
}
public Tablet convertToTablet() {
- final String pattern = getPattern();
+ final String notNullPattern = getPattern();
- // if pattern is "root", we don't need to convert, just return the
original tablet
- if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
{
+ // if notNullPattern is "root", we don't need to convert, just return the
original tablet
+ if
(notNullPattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
return tablet;
}
- // if pattern is not "root", we need to convert the tablet
+ // if notNullPattern is not "root", we need to convert the tablet
if (dataContainer == null) {
- dataContainer = new TabletInsertionDataContainer(tablet, isAligned,
pattern);
+ dataContainer = new TabletInsertionDataContainer(tablet, isAligned,
notNullPattern);
}
return dataContainer.convertToTablet();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 86fbbd28f6e..ede8ce91303 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.event.common.tablet;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
@@ -28,7 +27,6 @@ import
org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -62,18 +60,13 @@ public class TabletInsertionDataContainer {
private Tablet tablet;
public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
- try {
- if (insertNode instanceof InsertRowNode) {
- parse((InsertRowNode) insertNode, pattern);
- } else if (insertNode instanceof InsertTabletNode) {
- parse((InsertTabletNode) insertNode, pattern);
- } else {
- throw new UnSupportedDataTypeException(
- String.format("InsertNode type %s is not supported.",
insertNode.getClass().getName()));
- }
- } catch (IllegalPathException e) {
- throw new PipeException(
- String.format("Failed to parse insertNode with pattern %s.",
pattern), e);
+ if (insertNode instanceof InsertRowNode) {
+ parse((InsertRowNode) insertNode, pattern);
+ } else if (insertNode instanceof InsertTabletNode) {
+ parse((InsertTabletNode) insertNode, pattern);
+ } else {
+ throw new UnSupportedDataTypeException(
+ String.format("InsertNode type %s is not supported.",
insertNode.getClass().getName()));
}
}
@@ -83,7 +76,7 @@ public class TabletInsertionDataContainer {
//////////////////////////// parse ////////////////////////////
- private void parse(InsertRowNode insertRowNode, String pattern) throws
IllegalPathException {
+ private void parse(InsertRowNode insertRowNode, String pattern) {
final int originColumnSize = insertRowNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
@@ -149,8 +142,7 @@ public class TabletInsertionDataContainer {
rowCount = 1;
}
- private void parse(InsertTabletNode insertTabletNode, String pattern)
- throws IllegalPathException {
+ private void parse(InsertTabletNode insertTabletNode, String pattern) {
final int originColumnSize = insertTabletNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
@@ -269,7 +261,6 @@ public class TabletInsertionDataContainer {
rowCount = tablet.rowSize;
}
- // TODO: cache the result keyed by deviceId to improve performance
private void generateColumnIndexMapper(
String[] originMeasurementList,
String pattern,
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 40aaf53c1ec..7e06fb0e01f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -170,12 +170,12 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
final String errorMsg =
String.format(
"Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath());
- LOGGER.warn(errorMsg);
+ LOGGER.warn(errorMsg, e);
Thread.currentThread().interrupt();
throw new PipeException(errorMsg);
} catch (IOException e) {
final String errorMsg = String.format("Read TsFile %s error.",
resource.getTsFilePath());
- LOGGER.warn(errorMsg);
+ LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index a9a5f657d83..0d72fd06e53 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -79,7 +79,6 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
deviceIsAlignedMap = readDeviceIsAlignedMap();
measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
} catch (Exception e) {
- LOGGER.error("failed to create TsFileInsertionDataContainer", e);
close();
throw e;
}
@@ -125,14 +124,14 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
}
private Map<String, Boolean> readDeviceIsAlignedMap() throws IOException {
- final Map<String, Boolean> deviceIsAlignedMap = new HashMap<>();
+ final Map<String, Boolean> deviceIsAlignedResultMap = new HashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
while (deviceIsAlignedIterator.hasNext()) {
final Pair<String, Boolean> deviceIsAlignedPair =
deviceIsAlignedIterator.next();
- deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(),
deviceIsAlignedPair.getRight());
+ deviceIsAlignedResultMap.put(deviceIsAlignedPair.getLeft(),
deviceIsAlignedPair.getRight());
}
- return deviceIsAlignedMap;
+ return deviceIsAlignedResultMap;
}
/** @return TabletInsertionEvent in a streaming way */
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
index 22905643a02..ff42491bd35 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
@@ -19,17 +19,13 @@
package org.apache.iotdb.db.pipe.execution.executor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* PipeTaskExecutor is responsible for executing the pipe tasks, and it is
scheduled by the
* PipeTaskScheduler. It is a singleton class.
*/
+@SuppressWarnings("unused") // assignerSubtaskExecutor is for future use
public class PipeSubtaskExecutorManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeSubtaskExecutorManager.class);
-
private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor;
private final PipeProcessorSubtaskExecutor processorSubtaskExecutor;
private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index ca81f1c7643..8d12a802978 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -79,12 +79,8 @@ public class PipeFileResourceManager {
}
private boolean increaseReferenceIfExists(String path) {
- if (hardlinkOrCopiedFileToReferenceMap.containsKey(path)) {
- hardlinkOrCopiedFileToReferenceMap.put(
- path, hardlinkOrCopiedFileToReferenceMap.get(path) + 1);
- return true;
- }
- return false;
+ hardlinkOrCopiedFileToReferenceMap.computeIfPresent(path, (key, value) ->
value + 1);
+ return hardlinkOrCopiedFileToReferenceMap.containsKey(path);
}
private static File getHardlinkOrCopiedFileInPipeDir(File file) throws
IOException {
@@ -125,8 +121,11 @@ public class PipeFileResourceManager {
}
private static File createHardLink(File sourceFile, File hardlink) throws
IOException {
- if (!hardlink.getParentFile().exists()) {
- boolean ignored = hardlink.getParentFile().mkdirs();
+ if (!hardlink.getParentFile().exists() &&
!hardlink.getParentFile().mkdirs()) {
+ throw new IOException(
+ String.format(
+ "failed to create hardlink %s for file %s: failed to create
parent dir %s",
+ hardlink.getPath(), sourceFile.getPath(),
hardlink.getParentFile().getPath()));
}
final Path sourcePath =
FileSystems.getDefault().getPath(sourceFile.getAbsolutePath());
@@ -136,8 +135,11 @@ public class PipeFileResourceManager {
}
private static File copyFile(File sourceFile, File targetFile) throws
IOException {
- if (!targetFile.getParentFile().exists()) {
- boolean ignored = targetFile.getParentFile().mkdirs();
+ if (!targetFile.getParentFile().exists() &&
!targetFile.getParentFile().mkdirs()) {
+ throw new IOException(
+ String.format(
+ "failed to copy file %s to %s: failed to create parent dir %s",
+ sourceFile.getPath(), targetFile.getPath(),
targetFile.getParentFile().getPath()));
}
Files.copy(sourceFile.toPath(), targetFile.toPath());
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index b4a01a7828b..b882a96962d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -27,11 +27,12 @@ import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public class PipeWALResource implements AutoCloseable {
+public class PipeWALResource implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeWALResource.class);
@@ -39,8 +40,7 @@ public class PipeWALResource implements AutoCloseable {
private final AtomicInteger referenceCount;
- // TODO: make this configurable
- public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60;
+ public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 60;
private final AtomicLong lastLogicalPinTime;
private final AtomicBoolean isPhysicallyPinned;
@@ -155,8 +155,4 @@ public class PipeWALResource implements AutoCloseable {
referenceCount.set(0);
}
-
- public int getReferenceCount() {
- return referenceCount.get();
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index de67bb8543f..6fdf3a84ed0 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -9,11 +9,10 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-public class PipeWALResourceManager implements AutoCloseable {
+public class PipeWALResourceManager {
private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
@@ -23,7 +22,6 @@ public class PipeWALResourceManager implements AutoCloseable {
private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
- private final ScheduledFuture<?> ttlCheckerFuture;
public PipeWALResourceManager() {
// memtableIdToPipeWALResourceMap can be concurrently accessed by multiple
threads
@@ -34,30 +32,29 @@ public class PipeWALResourceManager implements
AutoCloseable {
memtableIdSegmentLocks[i] = new ReentrantLock();
}
- ttlCheckerFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- PIPE_WAL_RESOURCE_TTL_CHECKER,
- () -> {
- Iterator<Map.Entry<Long, PipeWALResource>> iterator =
- memtableIdToPipeWALResourceMap.entrySet().iterator();
- while (iterator.hasNext()) {
- final Map.Entry<Long, PipeWALResource> entry = iterator.next();
- final ReentrantLock lock =
- memtableIdSegmentLocks[(int) (entry.getKey() %
SEGMENT_LOCK_COUNT)];
-
- lock.lock();
- try {
- if (entry.getValue().invalidateIfPossible()) {
- iterator.remove();
- }
- } finally {
- lock.unlock();
- }
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ PIPE_WAL_RESOURCE_TTL_CHECKER,
+ () -> {
+ Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+ memtableIdToPipeWALResourceMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<Long, PipeWALResource> entry = iterator.next();
+ final ReentrantLock lock =
+ memtableIdSegmentLocks[(int) (entry.getKey() %
SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ if (entry.getValue().invalidateIfPossible()) {
+ iterator.remove();
}
- },
- PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
- PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
- TimeUnit.MILLISECONDS);
+ } finally {
+ lock.unlock();
+ }
+ }
+ },
+ PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+ PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+ TimeUnit.MILLISECONDS);
}
public void pin(long memtableId, WALEntryHandler walEntryHandler) {
@@ -83,34 +80,4 @@ public class PipeWALResourceManager implements AutoCloseable
{
lock.unlock();
}
}
-
- @Override
- public void close() throws Exception {
- if (ttlCheckerFuture != null) {
- ttlCheckerFuture.cancel(true);
- }
-
- for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
- final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
-
- lock.lock();
- try {
- memtableIdToPipeWALResourceMap.get(memtableId).close();
- memtableIdToPipeWALResourceMap.remove(memtableId);
- } finally {
- lock.unlock();
- }
- }
- }
-
- public int getReferenceCount(long memtableId) {
- final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
-
- lock.lock();
- try {
- return
memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount();
- } finally {
- lock.unlock();
- }
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
index 0efb7a2ae67..c00d294be00 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
@@ -29,5 +29,6 @@ public interface EventSupplier {
* the moment, but the collector is still running for more events.
* @throws Exception if the supplier fails to supply the event.
*/
+ @SuppressWarnings("squid:S00112") // Exception is thrown by the interface
Event supply() throws Exception;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 6c6f2498b97..0d676ac4992 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -43,8 +43,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
PipeParameters collectorParameters,
TConsensusGroupId dataRegionId,
PipeTaskMeta pipeTaskMeta) {
- // TODO: avoid if-else, use reflection to create collector all the time
- this.pipeCollector =
+ pipeCollector =
collectorParameters
.getStringOrDefault(
PipeCollectorConstant.COLLECTOR_KEY,
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index ff4e954e614..ba8058524ae 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -46,7 +46,9 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
}
@Override
- public void createSubtask() throws PipeException {}
+ public void createSubtask() throws PipeException {
+ // do nothing
+ }
@Override
public void startSubtask() throws PipeException {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index e3a793c0d8a..c2b4426435d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -24,6 +24,15 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
public abstract class PipeTaskStage {
+ private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STARTED =
+ "The PipeTaskStage has been started";
+ private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED =
+ "The PipeTaskStage has been dropped";
+ private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STOPPED =
+ "The PipeTaskStage has been externally stopped";
+ private static final String MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED =
+ "The PipeTaskStage has not been created";
+
protected PipeStatus status = null;
protected boolean hasBeenExternallyStopped = false;
@@ -35,14 +44,14 @@ public abstract class PipeTaskStage {
public synchronized void create() {
if (status != null) {
if (status == PipeStatus.RUNNING) {
- throw new PipeException("The PipeTaskStage has been started");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STARTED);
}
if (status == PipeStatus.DROPPED) {
- throw new PipeException("The PipeTaskStage has been dropped");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED);
}
// status == PipeStatus.STOPPED
if (hasBeenExternallyStopped) {
- throw new PipeException("The PipeTaskStage has been externally
stopped");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STOPPED);
}
// otherwise, do nothing to allow retry strategy
return;
@@ -63,14 +72,14 @@ public abstract class PipeTaskStage {
*/
public synchronized void start() {
if (status == null) {
- throw new PipeException("The PipeTaskStage has not been created");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED);
}
if (status == PipeStatus.RUNNING) {
// do nothing to allow retry strategy
return;
}
if (status == PipeStatus.DROPPED) {
- throw new PipeException("The PipeTaskStage has been dropped");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED);
}
// status == PipeStatus.STOPPED, start the subtask
@@ -88,14 +97,14 @@ public abstract class PipeTaskStage {
*/
public synchronized void stop() {
if (status == null) {
- throw new PipeException("The PipeTaskStage has not been created");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED);
}
if (status == PipeStatus.STOPPED) {
// do nothing to allow retry strategy
return;
}
if (status == PipeStatus.DROPPED) {
- throw new PipeException("The PipeTaskStage has been dropped");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED);
}
// status == PipeStatus.RUNNING, stop the connector
@@ -114,7 +123,7 @@ public abstract class PipeTaskStage {
*/
public synchronized void drop() {
if (status == null) {
- throw new PipeException("The PipeTaskStage has not been created");
+ throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED);
}
if (status == PipeStatus.DROPPED) {
// do nothing to allow retry strategy
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index ce2b6403a2f..47dd1ba7e03 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -86,7 +86,6 @@ public class PipeConnectorSubtask extends PipeSubtask {
} catch (PipeConnectionException e) {
throw e;
} catch (Exception e) {
- LOGGER.warn("Execute Connector subtask once error.", e);
throw new PipeException(
"Error occurred during executing PipeConnector#transfer, perhaps
need to check whether the implementation of PipeConnector is correct according
to the pipe-api description.",
e);
@@ -99,19 +98,29 @@ public class PipeConnectorSubtask extends PipeSubtask {
public void onFailure(@NotNull Throwable throwable) {
// retry to connect to the target system if the connection is broken
if (throwable instanceof PipeConnectionException) {
+ LOGGER.warn(
+ "PipeConnectionException occurred, retrying to connect to the target
system...",
+ throwable);
+
int retry = 0;
while (retry < MAX_RETRY_TIMES) {
try {
outputPipeConnector.handshake();
+ LOGGER.info("Successfully reconnected to the target system.");
break;
} catch (Exception e) {
retry++;
- LOGGER.error("Failed to reconnect to the target system, retrying...
({} time(s))", retry);
+ LOGGER.warn(
+ "Failed to reconnect to the target system, retrying ... after
[{}/{}] time(s) retries.",
+ retry,
+ MAX_RETRY_TIMES,
+ e);
try {
Thread.sleep(retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (InterruptedException interruptedException) {
LOGGER.info(
- "Interrupted while sleeping, perhaps need to check whether the
thread is interrupted.");
+ "Interrupted while sleeping, perhaps need to check whether the
thread is interrupted.",
+ interruptedException);
Thread.currentThread().interrupt();
}
}
@@ -120,22 +129,35 @@ public class PipeConnectorSubtask extends PipeSubtask {
// stop current pipe task if failed to reconnect to the target system
after MAX_RETRY_TIMES
// times
if (retry == MAX_RETRY_TIMES) {
- final String errorMessage =
- String.format(
- "Failed to reconnect to the target system after %d times,
stopping current pipe task %s...",
- MAX_RETRY_TIMES, taskID);
- LOGGER.warn(errorMessage, throwable);
- lastFailedCause = throwable;
-
if (lastEvent instanceof EnrichedEvent) {
+ LOGGER.warn(
+ "Failed to reconnect to the target system after {} times,
stopping current pipe task {}... "
+ + "Status shown when query the pipe will be 'STOPPED'. "
+ + "Please restart the task by executing 'START PIPE'
manually if needed.",
+ MAX_RETRY_TIMES,
+ taskID,
+ throwable);
+
((EnrichedEvent) lastEvent)
.reportException(new
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+ } else {
+ LOGGER.error(
+ "Failed to reconnect to the target system after {} times,
stopping current pipe task {} locally... "
+ + "Status shown when query the pipe will be 'RUNNING'
instead of 'STOPPED', but the task is actually stopped. "
+ + "Please restart the task by executing 'START PIPE'
manually if needed.",
+ MAX_RETRY_TIMES,
+ taskID,
+ throwable);
+
+ // FIXME: non-EnrichedEvent should be reported to the ConfigNode
instead of being logged
}
// although the pipe task will be stopped, we still don't release the
last event here
// because we need to keep it for the next retry. if user wants to
restart the task,
// the last event will be processed again. the last event will be
released when the task
// is dropped or the process is running normally.
+
+ // stop current pipe task if failed to reconnect to the target system
after MAX_RETRY_TIMES
return;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
index 071ab4da15e..9c56b4e04db 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
@@ -42,6 +42,9 @@ import java.util.TreeMap;
public class PipeConnectorSubtaskManager {
+ private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
+ "Failed to deregister PipeConnectorSubtask. No such subtask: ";
+
private final Map<String, PipeConnectorSubtaskLifeCycle>
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
@@ -53,7 +56,6 @@ public class PipeConnectorSubtaskManager {
new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- // TODO: construct all PipeConnector with the same reflection method,
avoid using if-else
// 1. construct, validate and customize PipeConnector, and then
handshake (create connection)
// with the target
final String connectorKey =
@@ -103,8 +105,7 @@ public class PipeConnectorSubtaskManager {
public synchronized void deregister(String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(
- "Failed to deregister PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
+ throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
if
(attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister())
{
@@ -114,8 +115,7 @@ public class PipeConnectorSubtaskManager {
public synchronized void start(String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(
- "Failed to deregister PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
+ throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
@@ -123,8 +123,7 @@ public class PipeConnectorSubtaskManager {
public synchronized void stop(String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(
- "Failed to deregister PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
+ throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 2472a29eefe..cd7bc206454 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -69,7 +69,6 @@ public class PipeProcessorSubtask extends PipeSubtask {
releaseLastEvent();
} catch (Exception e) {
- e.printStackTrace();
throw new PipeException(
"Error occurred during executing PipeProcessor#process, perhaps need
to check whether the implementation of PipeProcessor is correct according to
the pipe-api description.",
e);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 9f5c6e4a37a..4672f8cc532 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -53,7 +53,6 @@ public abstract class PipeSubtask implements
FutureCallback<Void>, Callable<Void
protected static final int MAX_RETRY_TIMES = 5;
private final AtomicInteger retryCount = new AtomicInteger(0);
- protected Throwable lastFailedCause;
protected Event lastEvent;
@@ -97,6 +96,7 @@ public abstract class PipeSubtask implements
FutureCallback<Void>, Callable<Void
* @return true if the event is consumed successfully, false if no more
event can be consumed
* @throws Exception if any error occurs when consuming the event
*/
+ @SuppressWarnings("squid:S112") // allow to throw Exception
protected abstract boolean executeOnce() throws Exception;
@Override
@@ -107,20 +107,31 @@ public abstract class PipeSubtask implements
FutureCallback<Void>, Callable<Void
@Override
public void onFailure(@NotNull Throwable throwable) {
+ if (retryCount.get() == 0) {
+ LOGGER.warn(
+ "Failed to execute subtask {}({}), because of {}. Will retry for {}
times.",
+ taskID,
+ this.getClass().getSimpleName(),
+ throwable.getMessage(),
+ MAX_RETRY_TIMES,
+ throwable);
+ }
+
if (retryCount.get() < MAX_RETRY_TIMES) {
retryCount.incrementAndGet();
LOGGER.warn(
- String.format(
- "Retry subtask %s, retry count [%s/%s]",
- this.getClass().getSimpleName(), retryCount.get(),
MAX_RETRY_TIMES));
+ "Retry executing subtask {}({}), retry count [{}/{}]",
+ taskID,
+ this.getClass().getSimpleName(),
+ retryCount.get(),
+ MAX_RETRY_TIMES);
submitSelf();
} else {
final String errorMessage =
String.format(
- "Subtask %s failed, has been retried for %d times, last failed
because of %s",
- taskID, retryCount.get(), throwable);
+ "Failed to execute subtask %s(%s), retry count exceeds the max
retry times %d, last exception: %s",
+ taskID, this.getClass().getSimpleName(), retryCount.get(),
throwable.getMessage());
LOGGER.warn(errorMessage, throwable);
- lastFailedCause = throwable;
if (lastEvent instanceof EnrichedEvent) {
((EnrichedEvent) lastEvent)
@@ -128,6 +139,23 @@ public abstract class PipeSubtask implements
FutureCallback<Void>, Callable<Void
throwable instanceof PipeRuntimeException
? (PipeRuntimeException) throwable
: new PipeRuntimeCriticalException(errorMessage));
+ LOGGER.warn(
+ "The last event is an instance of EnrichedEvent, so the exception
is reported. "
+ + "Stopping current pipe task {}({}) locally... "
+ + "Status shown when query the pipe will be 'STOPPED'. "
+ + "Please restart the task by executing 'START PIPE' manually
if needed.",
+ taskID,
+ this.getClass().getSimpleName(),
+ throwable);
+ } else {
+ LOGGER.error(
+ "The last event is not an instance of EnrichedEvent, so the
exception cannot be reported. "
+ + "Stopping current pipe task {}({}) locally... "
+ + "Status shown when query the pipe will be 'RUNNING' instead
of 'STOPPED', but the task is actually stopped. "
+ + "Please restart the task by executing 'START PIPE' manually
if needed.",
+ taskID,
+ this.getClass().getSimpleName(),
+ throwable);
}
// although the pipe task will be stopped, we still don't release the
last event here
@@ -185,8 +213,4 @@ public abstract class PipeSubtask implements
FutureCallback<Void>, Callable<Void
public String getTaskID() {
return taskID;
}
-
- public Throwable getLastFailedCause() {
- return lastFailedCause;
- }
}