This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch insertion-fix-real
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/insertion-fix-real by this
push:
new 1479cfc924b Fixed multiple bugs of insertion (#17570)
1479cfc924b is described below
commit 1479cfc924b309c2d1165148b188fc0d330b13ad
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 12:26:42 2026 +0800
Fixed multiple bugs of insertion (#17570)
* insert-fix
* source/sink
* source/sink-2
* sptls
* fix
* sink
* compile
---
.../pipe/sink/protocol/IoTDBConfigRegionSink.java | 9 +-
.../iotdb/db/pipe/agent/task/PipeDataNodeTask.java | 28 +--
.../task/execution/PipeSinkSubtaskExecutor.java | 2 +-
.../agent/task/stage/PipeTaskProcessorStage.java | 16 +-
.../pipe/agent/task/stage/PipeTaskSinkStage.java | 13 +-
.../pipe/agent/task/stage/PipeTaskSourceStage.java | 19 +-
.../airgap/IoTDBSchemaRegionAirGapSink.java | 7 +-
.../PipeTransferTabletBatchEventHandler.java | 10 +-
.../PipeTransferTabletInsertNodeEventHandler.java | 2 +-
.../PipeTransferTabletInsertionEventHandler.java | 7 +-
.../handler/PipeTransferTabletRawEventHandler.java | 6 +-
.../handler/PipeTransferTrackableHandler.java | 24 +-
.../async/handler/PipeTransferTsFileHandler.java | 27 +--
.../realtime/assigner/DisruptorQueue.java | 4 +-
.../listener/PipeInsertionDataNodeListener.java | 38 ++--
.../planner/plan/node/write/InsertTabletNode.java | 140 +++++++++++-
.../db/storageengine/dataregion/DataRegion.java | 248 +++++++++++++++------
.../task/stage/SubscriptionTaskSinkStage.java | 11 +-
.../storageengine/dataregion/DataRegionTest.java | 226 +++++++++++++++++++
.../iotdb/commons/client/ClientPoolFactory.java | 4 +-
.../iotdb/commons/concurrent/ThreadName.java | 12 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 93 ++++----
.../iotdb/commons/pipe/config/PipeDescriptor.java | 8 +-
23 files changed, 692 insertions(+), 262 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index 5ee983d7945..b5f62ef4ccf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -88,7 +88,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink {
protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(
final String fileName, final long position, final byte[] payLoad) {
throw new UnsupportedOperationException(
- "The config region connector does not support transferring single file
piece req.");
+ "The config region sink does not support transferring single file
piece req.");
}
@Override
@@ -105,13 +105,13 @@ public class IoTDBConfigRegionSink extends
IoTDBSslSyncSink {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBConfigRegionConnector can't transfer TabletInsertionEvent.");
+ "IoTDBConfigRegionSink can't transfer TabletInsertionEvent.");
}
@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBConfigRegionConnector can't transfer TsFileInsertionEvent.");
+ "IoTDBConfigRegionSink can't transfer TsFileInsertionEvent.");
}
@Override
@@ -121,8 +121,7 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink
{
} else if (event instanceof PipeConfigRegionSnapshotEvent) {
doTransferWrapper((PipeConfigRegionSnapshotEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn(
- "IoTDBConfigRegionConnector does not support transferring generic
event: {}.", event);
+ LOGGER.warn("IoTDBConfigRegionSink does not support transferring generic
event: {}.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
index d33ec44a86e..0d0b955c210 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java
@@ -32,32 +32,32 @@ public class PipeDataNodeTask implements PipeTask {
private final String pipeName;
private final int regionId;
- private final PipeTaskStage extractorStage;
+ private final PipeTaskStage sourceStage;
private final PipeTaskStage processorStage;
- private final PipeTaskStage connectorStage;
+ private final PipeTaskStage sinkStage;
private volatile boolean isCompleted = false;
public PipeDataNodeTask(
final String pipeName,
final int regionId,
- final PipeTaskStage extractorStage,
+ final PipeTaskStage sourceStage,
final PipeTaskStage processorStage,
- final PipeTaskStage connectorStage) {
+ final PipeTaskStage sinkStage) {
this.pipeName = pipeName;
this.regionId = regionId;
- this.extractorStage = extractorStage;
+ this.sourceStage = sourceStage;
this.processorStage = processorStage;
- this.connectorStage = connectorStage;
+ this.sinkStage = sinkStage;
}
@Override
public void create() {
final long startTime = System.currentTimeMillis();
- extractorStage.create();
+ sourceStage.create();
processorStage.create();
- connectorStage.create();
+ sinkStage.create();
LOGGER.info(
"Create pipe DN task {} successfully within {} ms",
this,
@@ -67,9 +67,9 @@ public class PipeDataNodeTask implements PipeTask {
@Override
public void drop() {
final long startTime = System.currentTimeMillis();
- extractorStage.drop();
+ sourceStage.drop();
processorStage.drop();
- connectorStage.drop();
+ sinkStage.drop();
LOGGER.info(
"Drop pipe DN task {} successfully within {} ms",
this,
@@ -79,9 +79,9 @@ public class PipeDataNodeTask implements PipeTask {
@Override
public void start() {
final long startTime = System.currentTimeMillis();
- extractorStage.start();
+ sourceStage.start();
processorStage.start();
- connectorStage.start();
+ sinkStage.start();
LOGGER.info(
"Start pipe DN task {} successfully within {} ms",
this,
@@ -91,9 +91,9 @@ public class PipeDataNodeTask implements PipeTask {
@Override
public void stop() {
final long startTime = System.currentTimeMillis();
- extractorStage.stop();
+ sourceStage.stop();
processorStage.stop();
- connectorStage.stop();
+ sinkStage.stop();
LOGGER.info(
"Stop pipe DN task {} successfully within {} ms",
this,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
index 9a88ad74d7c..4abc8daed30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSinkSubtaskExecutor.java
@@ -31,7 +31,7 @@ public class PipeSinkSubtaskExecutor extends
PipeSubtaskExecutor {
public PipeSinkSubtaskExecutor() {
super(
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
- ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL.getName() + "-" + id.get(),
+ ThreadName.PIPE_SINK_EXECUTOR_POOL.getName() + "-" + id.get(),
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName() + "-" +
id.getAndIncrement(),
true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index ddc194716d2..2809ec9ec93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -54,8 +54,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
* @param creationTime pipe creation time
* @param pipeProcessorParameters used to create {@link PipeProcessor}
* @param regionId {@link DataRegion} id
- * @param pipeExtractorInputEventSupplier used to input {@link Event}s from
{@link PipeExtractor}
- * @param pipeConnectorOutputPendingQueue used to output {@link Event}s to
{@link PipeConnector}
+ * @param pipeSourceInputEventSupplier used to input {@link Event}s from
{@link PipeExtractor}
+ * @param pipeSinkOutputPendingQueue used to output {@link Event}s to {@link
PipeConnector}
* @throws PipeException if failed to {@link
PipeProcessor#validate(PipeParameterValidator)} or
* {@link PipeProcessor#customize(PipeParameters,
PipeProcessorRuntimeConfiguration)}}
*/
@@ -64,8 +64,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
final long creationTime,
final PipeParameters pipeProcessorParameters,
final int regionId,
- final EventSupplier pipeExtractorInputEventSupplier,
- final UnboundedBlockingPendingQueue<Event>
pipeConnectorOutputPendingQueue,
+ final EventSupplier pipeSourceInputEventSupplier,
+ final UnboundedBlockingPendingQueue<Event> pipeSinkOutputPendingQueue,
final PipeProcessorSubtaskExecutor executor,
final PipeTaskMeta pipeTaskMeta,
final boolean forceTabletFormat,
@@ -99,9 +99,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// removed, the new subtask will have the same pipeName and regionId as the
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
- final PipeEventCollector pipeConnectorOutputEventCollector =
+ final PipeEventCollector pipeSinkOutputEventCollector =
new PipeEventCollector(
- pipeConnectorOutputPendingQueue,
+ pipeSinkOutputPendingQueue,
creationTime,
regionId,
forceTabletFormat,
@@ -112,9 +112,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
pipeName,
creationTime,
regionId,
- pipeExtractorInputEventSupplier,
+ pipeSourceInputEventSupplier,
pipeProcessor,
- pipeConnectorOutputEventCollector);
+ pipeSinkOutputEventCollector);
this.executor = executor;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index a22fbb536d7..88eac560cde 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -38,7 +38,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
protected final int regionId;
protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
- protected String connectorSubtaskId;
+ protected String sinkSubtaskId;
public PipeTaskSinkStage(
String pipeName,
@@ -56,7 +56,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
}
protected void registerSubtask() {
- this.connectorSubtaskId =
+ this.sinkSubtaskId =
PipeSinkSubtaskManager.instance()
.register(
executor,
@@ -71,21 +71,20 @@ public class PipeTaskSinkStage extends PipeTaskStage {
@Override
public void startSubtask() throws PipeException {
- PipeSinkSubtaskManager.instance().start(connectorSubtaskId);
+ PipeSinkSubtaskManager.instance().start(sinkSubtaskId);
}
@Override
public void stopSubtask() throws PipeException {
- PipeSinkSubtaskManager.instance().stop(connectorSubtaskId);
+ PipeSinkSubtaskManager.instance().stop(sinkSubtaskId);
}
@Override
public void dropSubtask() throws PipeException {
- PipeSinkSubtaskManager.instance()
- .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+ PipeSinkSubtaskManager.instance().deregister(pipeName, creationTime,
regionId, sinkSubtaskId);
}
public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
- return
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
+ return
PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(sinkSubtaskId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index 6acc0fc3d4a..240b5499e92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -44,32 +44,31 @@ public class PipeTaskSourceStage extends PipeTaskStage {
public PipeTaskSourceStage(
String pipeName,
long creationTime,
- PipeParameters extractorParameters,
+ PipeParameters sourceParameters,
int regionId,
PipeTaskMeta pipeTaskMeta) {
pipeExtractor =
StorageEngine.getInstance().getAllDataRegionIds().contains(new
DataRegionId(regionId))
- ?
PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters)
- :
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters);
+ ?
PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters)
+ :
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);
- // Validate and customize should be called before createSubtask. this
allows extractor exposing
+ // Validate and customize should be called before createSubtask. this
allows source exposing
// exceptions in advance.
try {
- // 1. Validate extractor parameters
- pipeExtractor.validate(new PipeParameterValidator(extractorParameters));
+ // 1. Validate source parameters
+ pipeExtractor.validate(new PipeParameterValidator(sourceParameters));
- // 2. Customize extractor
+ // 2. Customize source
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime,
regionId, pipeTaskMeta));
- pipeExtractor.customize(extractorParameters, runtimeConfiguration);
+ pipeExtractor.customize(sourceParameters, runtimeConfiguration);
} catch (Exception e) {
try {
pipeExtractor.close();
} catch (Exception closeException) {
LOGGER.warn(
- "Failed to close extractor after failed to initialize extractor. "
- + "Ignore this exception.",
+ "Failed to close source after failed to initialize source. " +
"Ignore this exception.",
closeException);
}
throw new PipeException(e.getMessage(), e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index 8280709446a..6b7787ba162 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -47,13 +47,13 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBSchemaRegionAirGapConnector can't transfer
TabletInsertionEvent.");
+ "IoTDBSchemaRegionAirGapSink can't transfer TabletInsertionEvent.");
}
@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
- "IoTDBSchemaRegionAirGapConnector can't transfer
TsFileInsertionEvent.");
+ "IoTDBSchemaRegionAirGapSink can't transfer TsFileInsertionEvent.");
}
@Override
@@ -68,8 +68,7 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn(
- "IoTDBSchemaRegionAirGapConnector does not support transferring
generic event: {}.",
- event);
+ "IoTDBSchemaRegionAirGapSink does not support transferring generic
event: {}.", event);
}
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 110d3cb6450..8bcb9d47009 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -69,7 +69,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
public void transfer(final AsyncPipeDataTransferServiceClient client) throws
TException {
for (final Map.Entry<Pair<String, Long>, Long> entry :
pipeName2BytesAccumulated.entrySet()) {
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
entry.getKey().getLeft(),
entry.getKey().getRight(),
client.getEndPoint(),
@@ -92,13 +92,11 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
- .handle(status, response.getStatus().getMessage(),
events.toString());
+ sink.statusHandler().handle(status, response.getStatus().getMessage(),
events.toString());
}
for (final Pair<String, TEndPoint> redirectPair :
LeaderCacheUtils.parseRecommendedRedirections(status)) {
- connector.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
+ sink.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
}
events.forEach(
@@ -123,7 +121,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
events.size(),
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
} finally {
- connector.addFailureEventsToRetryQueue(events, exception);
+ sink.addFailureEventsToRetryQueue(events, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 70ba7f4cfc5..912a1e724f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -46,7 +46,7 @@ public class PipeTransferTabletInsertNodeEventHandler
@Override
protected void updateLeaderCache(final TSStatus status) {
- connector.updateLeaderCache(
+ sink.updateLeaderCache(
((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 66a1f4a013b..a8f1136b897 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -54,7 +54,7 @@ public abstract class PipeTransferTabletInsertionEventHandler
extends PipeTransf
public void transfer(final AsyncPipeDataTransferServiceClient client) throws
TException {
if (event instanceof EnrichedEvent) {
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
((EnrichedEvent) event).getPipeName(),
((EnrichedEvent) event).getCreationTime(),
client.getEndPoint(),
@@ -77,8 +77,7 @@ public abstract class PipeTransferTabletInsertionEventHandler
extends PipeTransf
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
+ sink.statusHandler()
.handle(response.getStatus(), response.getStatus().getMessage(),
event.toString());
}
if (event instanceof EnrichedEvent) {
@@ -109,7 +108,7 @@ public abstract class
PipeTransferTabletInsertionEventHandler extends PipeTransf
event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitIds() : null);
} finally {
- connector.addFailureEventToRetryQueue(event, exception);
+ sink.addFailureEventToRetryQueue(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index ff1daa05c28..b64e446827a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -32,8 +32,8 @@ public class PipeTransferTabletRawEventHandler extends
PipeTransferTabletInserti
public PipeTransferTabletRawEventHandler(
final PipeRawTabletInsertionEvent event,
final TPipeTransferReq req,
- final IoTDBDataRegionAsyncSink connector) {
- super(event, req, connector);
+ final IoTDBDataRegionAsyncSink sink) {
+ super(event, req, sink);
}
@Override
@@ -45,7 +45,7 @@ public class PipeTransferTabletRawEventHandler extends
PipeTransferTabletInserti
@Override
protected void updateLeaderCache(final TSStatus status) {
- connector.updateLeaderCache(
+ sink.updateLeaderCache(
((PipeRawTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 21f7c144bed..a8b4a3b7a79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -36,18 +36,18 @@ public abstract class PipeTransferTrackableHandler
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
- protected final IoTDBDataRegionAsyncSink connector;
+ protected final IoTDBDataRegionAsyncSink sink;
protected volatile AsyncPipeDataTransferServiceClient client;
- public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink
connector) {
- this.connector = connector;
+ public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) {
+ this.sink = sink;
}
@Override
public void onComplete(final TPipeTransferResp response) {
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this, true);
+ sink.eliminateHandler(this, true);
return;
}
@@ -56,7 +56,7 @@ public abstract class PipeTransferTrackableHandler
// completed
// NOTE: We should not clear the reference count of events, as this
would cause the
//
`org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3`
test to fail.
- connector.eliminateHandler(this, false);
+ sink.eliminateHandler(this, false);
}
}
@@ -67,14 +67,14 @@ public abstract class PipeTransferTrackableHandler
client.setPrintLogWhenEncounterException(false);
}
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this, true);
+ sink.eliminateHandler(this, true);
return;
}
onErrorInternal(exception);
- connector.eliminateHandler(this, false);
+ sink.eliminateHandler(this, false);
}
/**
@@ -93,10 +93,10 @@ public abstract class PipeTransferTrackableHandler
this.client = client;
}
// track handler before checking if connector is closed
- connector.trackHandler(this);
- if (connector.isClosed()) {
+ sink.trackHandler(this);
+ if (sink.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this, true);
+ sink.eliminateHandler(this, true);
client.setShouldReturnSelf(true);
client.returnSelf(
(e) -> {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index a9426ed7b8b..6742199ac10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -159,7 +159,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
if (client == null) {
LOGGER.warn(
"Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
- connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+ sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;
}
@@ -168,7 +168,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
client.setTimeoutDynamically(clientManager.getConnectionTimeout());
PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
- if (connector.isEnableSendTsFileLimit()) {
+ if (sink.isEnableSendTsFileLimit()) {
TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
}
final int readLength = reader.read(readBuffer);
@@ -192,11 +192,11 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())
:
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
- final TPipeTransferReq req =
connector.compressIfNeeded(uncompressedReq);
+ final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
pipeName2WeightMap.forEach(
(pipePair, weight) ->
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
pipePair.getLeft(),
pipePair.getRight(),
client.getEndPoint(),
@@ -219,11 +219,11 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
currentFile.getName(), position, payload)
: PipeTransferTsFilePieceReq.toTPipeTransferReq(
currentFile.getName(), position, payload);
- final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
+ final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
pipeName2WeightMap.forEach(
(pipePair, weight) ->
- connector.rateLimitIfNeeded(
+ sink.rateLimitIfNeeded(
pipePair.getLeft(),
pipePair.getRight(),
client.getEndPoint(),
@@ -241,7 +241,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
try {
super.onComplete(response);
} finally {
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
returnClientIfNecessary();
}
}
@@ -255,8 +255,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
+ sink.statusHandler()
.handle(
status,
String.format(
@@ -330,9 +329,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
// Only handle the failed statuses to avoid string format performance
overhead
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- connector
- .statusHandler()
- .handle(status, response.getStatus().getMessage(),
tsFile.getName());
+ sink.statusHandler().handle(status,
response.getStatus().getMessage(), tsFile.getName());
}
}
@@ -404,7 +401,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
returnClientIfNecessary();
} finally {
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- connector.addFailureEventsToRetryQueue(events, exception);
+ sink.addFailureEventsToRetryQueue(events, exception);
}
}
}
@@ -415,7 +412,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
return;
}
- if (connector.isClosed()) {
+ if (sink.isClosed()) {
closeClient();
}
@@ -439,7 +436,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
if (client == null) {
LOGGER.warn(
"Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
- connector.isClosed() ? "CLOSED" : "NOT CLOSED",
+ sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 52ac137ae4e..2019eba8560 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -35,13 +35,13 @@ import org.slf4j.LoggerFactory;
import java.util.function.Consumer;
-import static
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
+import static
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_SOURCE_DISRUPTOR;
public class DisruptorQueue {
private static final Logger LOGGER =
LoggerFactory.getLogger(DisruptorQueue.class);
private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
- new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
+ new IoTDBDaemonThreadFactory(PIPE_SOURCE_DISRUPTOR.getName());
private final PipeMemoryBlock allocatedMemoryBlock;
private final Disruptor<EventContainer> disruptor;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index aaa98220178..882d4aff0d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -41,34 +41,34 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* <p>All events extracted by this listener will be first published to
different
* PipeEventDataRegionAssigners (identified by data region id), and then
PipeEventDataRegionAssigner
- * will filter events and assign them to different
PipeRealtimeEventDataRegionExtractors.
+ * will filter events and assign them to different
PipeRealtimeEventDataRegionSources.
*/
public class PipeInsertionDataNodeListener {
private final ConcurrentMap<String, PipeDataRegionAssigner>
dataRegionId2Assigner =
new ConcurrentHashMap<>();
- private final AtomicInteger listenToTsFileExtractorCount = new
AtomicInteger(0);
- private final AtomicInteger listenToInsertNodeExtractorCount = new
AtomicInteger(0);
+ private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
+ private final AtomicInteger listenToInsertNodeSourceCount = new
AtomicInteger(0);
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListenAndAssign(
- String dataRegionId, PipeRealtimeDataRegionSource extractor) {
+ final String dataRegionId, final PipeRealtimeDataRegionSource source) {
dataRegionId2Assigner
.computeIfAbsent(dataRegionId, o -> new
PipeDataRegionAssigner(dataRegionId))
- .startAssignTo(extractor);
+ .startAssignTo(source);
- if (extractor.isNeedListenToTsFile()) {
- listenToTsFileExtractorCount.incrementAndGet();
+ if (source.isNeedListenToTsFile()) {
+ listenToTsFileSourceCount.incrementAndGet();
}
- if (extractor.isNeedListenToInsertNode()) {
- listenToInsertNodeExtractorCount.incrementAndGet();
+ if (source.isNeedListenToInsertNode()) {
+ listenToInsertNodeSourceCount.incrementAndGet();
}
}
public synchronized void stopListenAndAssign(
- final String dataRegionId, final PipeRealtimeDataRegionSource extractor)
{
+ final String dataRegionId, final PipeRealtimeDataRegionSource source) {
PipeDataRegionAssigner assignerToClose = null;
synchronized (this) {
@@ -77,13 +77,13 @@ public class PipeInsertionDataNodeListener {
return;
}
- assigner.stopAssignTo(extractor);
+ assigner.stopAssignTo(source);
- if (extractor.isNeedListenToTsFile()) {
- listenToTsFileExtractorCount.decrementAndGet();
+ if (source.isNeedListenToTsFile()) {
+ listenToTsFileSourceCount.decrementAndGet();
}
- if (extractor.isNeedListenToInsertNode()) {
- listenToInsertNodeExtractorCount.decrementAndGet();
+ if (source.isNeedListenToInsertNode()) {
+ listenToInsertNodeSourceCount.decrementAndGet();
}
if (assigner.notMoreSourceNeededToBeAssigned()) {
@@ -104,8 +104,8 @@ public class PipeInsertionDataNodeListener {
public void listenToTsFile(
final String dataRegionId, final TsFileResource tsFileResource, final
boolean isLoaded) {
- // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
- // because extractors may use tsfile events when some exceptions occur in
the
+ // We don't judge whether listenToTsFileSourceCount.get() == 0 here on
purpose
+ // because sources may use tsfile events when some exceptions occur in the
// insert nodes listening process.
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
@@ -120,8 +120,8 @@ public class PipeInsertionDataNodeListener {
}
public void listenToInsertNode(
- String dataRegionId, InsertNode insertNode, TsFileResource
tsFileResource) {
- if (listenToInsertNodeExtractorCount.get() == 0) {
+ final String dataRegionId, final InsertNode insertNode, final
TsFileResource tsFileResource) {
+ if (listenToInsertNodeSourceCount.get() == 0) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index a366aed71b3..1deb5515eb1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1076,68 +1076,184 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return visitor.visitInsertTablet(this, context);
}
- public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+ public TimeValuePair composeLastTimeValuePair(
+ int measurementIndex, int startOffset, int endOffset) {
if (measurementIndex >= columns.length ||
Objects.isNull(dataTypes[measurementIndex])) {
return null;
}
// get non-null value
- int lastIdx = rowCount - 1;
+ int lastIdx = Math.min(endOffset - 1, rowCount - 1);
if (bitMaps != null && bitMaps[measurementIndex] != null) {
BitMap bitMap = bitMaps[measurementIndex];
- while (lastIdx >= 0) {
+ while (lastIdx >= startOffset) {
if (!bitMap.isMarked(lastIdx)) {
break;
}
lastIdx--;
}
}
- if (lastIdx < 0) {
+ if (lastIdx < startOffset) {
+ return null;
+ }
+ return composeTimeValuePair(measurementIndex, lastIdx);
+ }
+
+ public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+ return composeLastTimeValuePair(measurementIndex, 0, rowCount);
+ }
+
+ protected TimeValuePair composeLastTimeValuePair(
+ final int measurementIndex,
+ final TSStatus[] results,
+ final int startOffset,
+ final int endOffset) {
+ if (results == null) {
+ return composeLastTimeValuePair(measurementIndex, startOffset,
endOffset);
+ }
+ if (measurementIndex >= columns.length ||
Objects.isNull(dataTypes[measurementIndex])) {
return null;
}
+ final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex];
+ int lastIdx = Math.min(endOffset - 1, rowCount - 1);
+ while (lastIdx >= startOffset) {
+ if (results[lastIdx] != null
+ && results[lastIdx].getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ lastIdx--;
+ continue;
+ }
+ if (bitMap != null && bitMap.isMarked(lastIdx)) {
+ lastIdx--;
+ continue;
+ }
+ break;
+ }
+ return lastIdx < startOffset ? null :
composeTimeValuePair(measurementIndex, lastIdx);
+ }
+
+ private TimeValuePair composeTimeValuePair(final int measurementIndex, final
int rowIndex) {
TsPrimitiveType value;
switch (dataTypes[measurementIndex]) {
case INT32:
case DATE:
int[] intValues = (int[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+ value = new TsPrimitiveType.TsInt(intValues[rowIndex]);
break;
case INT64:
case TIMESTAMP:
long[] longValues = (long[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+ value = new TsPrimitiveType.TsLong(longValues[rowIndex]);
break;
case FLOAT:
float[] floatValues = (float[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+ value = new TsPrimitiveType.TsFloat(floatValues[rowIndex]);
break;
case DOUBLE:
double[] doubleValues = (double[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+ value = new TsPrimitiveType.TsDouble(doubleValues[rowIndex]);
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+ value = new TsPrimitiveType.TsBoolean(boolValues[rowIndex]);
break;
case TEXT:
case BLOB:
case STRING:
Binary[] binaryValues = (Binary[]) columns[measurementIndex];
- value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+ value = new TsPrimitiveType.TsBinary(binaryValues[rowIndex]);
break;
default:
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
}
- return new TimeValuePair(times[lastIdx], value);
+ return new TimeValuePair(times[rowIndex], value);
+ }
+
+ public IDeviceID getDeviceID(int rowIdx) {
+ if (deviceID != null) {
+ return deviceID;
+ }
+ deviceID = DeviceIDFactory.getInstance().getDeviceID(targetPath);
+ return deviceID;
+ }
+
+ protected static class PartitionSplitInfo {
+
+ // for each List in split, they are range1.start, range1.end,
range2.start, range2.end, ...
+ List<Integer> ranges = new ArrayList<>();
+ List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+ List<TRegionReplicaSet> replicaSets;
+ }
+
+ /**
+ * Split the tablet of the given range according to Table deviceID.
+ *
+ * @param start inclusive
+ * @param end exclusive
+ * @return each the number in the pair is the end offset of a device
+ */
+ public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
+ return Collections.singletonList(new Pair<>(getDeviceID(), end));
+ }
+
+ /**
+ * @param results insertion result of each row
+ * @param ttl the ttl
+ * @return the position of the first alive row
+ * @throws OutOfTTLException if all rows have expired the TTL
+ */
+ public int checkTTL(TSStatus[] results, long ttl) throws OutOfTTLException {
+ return checkTTLInternal(results, ttl, true);
+ }
+
+ protected int checkTTLInternal(TSStatus[] results, long ttl, boolean
breakOnFirstAlive)
+ throws OutOfTTLException {
+
+ /*
+ * assume that batch has been sorted by client
+ */
+ int loc = 0;
+ int firstAliveLoc = -1;
+ while (loc < getRowCount()) {
+ long currTime = getTimes()[loc];
+ // skip points that do not satisfy TTL
+ if (!isAlive(currTime, ttl)) {
+ results[loc] =
+ RpcUtils.getStatus(
+ TSStatusCode.OUT_OF_TTL,
+ String.format(
+ "Insertion time [%s] is less than ttl time bound [%s]",
+ DateTimeUtils.convertLongToDate(currTime),
+
DateTimeUtils.convertLongToDate(CommonDateTimeUtils.currentTime() - ttl)));
+ } else {
+ if (firstAliveLoc == -1) {
+ firstAliveLoc = loc;
+ }
+ if (breakOnFirstAlive) {
+ break;
+ }
+ }
+ loc++;
+ }
+
+ if (firstAliveLoc == -1) {
+ // no alive data
+ throw new OutOfTTLException(
+ getTimes()[getTimes().length - 1],
(CommonDateTimeUtils.currentTime() - ttl));
+ }
+ return firstAliveLoc;
}
public void updateLastCache(final String databaseName) {
+ updateLastCache(databaseName, null);
+ }
+
+ public void updateLastCache(final String databaseName, final TSStatus[]
results) {
final String[] rawMeasurements = getRawMeasurements();
final TimeValuePair[] timeValuePairs = new
TimeValuePair[rawMeasurements.length];
for (int i = 0; i < rawMeasurements.length; i++) {
- timeValuePairs[i] = composeLastTimeValuePair(i);
+ timeValuePairs[i] = composeLastTimeValuePair(i, results, 0, rowCount);
}
DataNodeSchemaCache.getInstance()
.updateLastCacheIfExists(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3cad57962e5..791d01f9709 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1193,7 +1193,7 @@ public class DataRegion implements IDataRegionForQuery {
if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
// disable updating last cache on follower
startTime = System.nanoTime();
- tryToUpdateInsertTabletLastCache(insertTabletNode);
+ tryToUpdateInsertTabletLastCache(insertTabletNode, results);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(
System.nanoTime() - startTime);
}
@@ -1261,13 +1261,13 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
- TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
- if (tsFileProcessor == null) {
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
+ } catch (WriteProcessException e) {
+ final TSStatus failureStatus = RpcUtils.getStatus(e.getErrorCode(),
e.getMessage());
for (int i = start; i < end; i++) {
- results[i] =
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR,
- "can not create TsFileProcessor, timePartitionId: " +
timePartitionId);
+ results[i] = failureStatus;
}
return false;
}
@@ -1289,8 +1289,9 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
- private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
- node.updateLastCache(getDatabaseName());
+ private void tryToUpdateInsertTabletLastCache(
+ final InsertTabletNode node, final TSStatus[] results) {
+ node.updateLastCache(getDatabaseName(), results);
}
private TsFileProcessor insertToTsFileProcessor(
@@ -1300,9 +1301,6 @@ public class DataRegion implements IDataRegionForQuery {
return null;
}
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, sequence);
- if (tsFileProcessor == null) {
- return null;
- }
long[] costsForMetrics = new long[4];
tsFileProcessor.insert(insertRowNode, costsForMetrics);
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
@@ -1325,9 +1323,11 @@ public class DataRegion implements IDataRegionForQuery {
if (insertRowNode.allMeasurementFailed()) {
continue;
}
- TsFileProcessor tsFileProcessor =
- getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
- if (tsFileProcessor == null) {
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i],
areSequence[i]);
+ } catch (WriteProcessException e) {
+ insertRowsNode.getResults().put(i,
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
continue;
}
int finalI = i;
@@ -1335,43 +1335,28 @@ public class DataRegion implements IDataRegionForQuery {
tsFileProcessor,
(k, v) -> {
if (v == null) {
- v = new InsertRowsNode(insertRowsNode.getPlanNodeId());
- v.setSearchIndex(insertRowNode.getSearchIndex());
- v.setAligned(insertRowNode.isAligned());
- if (insertRowNode.isGeneratedByPipe()) {
- v.markAsGeneratedByPipe();
- }
- if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- v.markAsGeneratedByRemoteConsensusLeader();
- }
+ v = createGroupedInsertRowsNode(insertRowsNode, insertRowNode);
}
- if (v.isAligned() != insertRowNode.isAligned()) {
- v.setMixingAlignment(true);
- }
- v.addOneInsertRowNode(insertRowNode, finalI);
- v.updateProgressIndex(insertRowNode.getProgressIndex());
+ appendInsertRowNode(v, insertRowNode, finalI);
return v;
});
}
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
- TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor.insert(subInsertRowsNode, costsForMetrics);
+ List<TsFileProcessor> insertedProcessors =
+ insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, costsForMetrics);
+
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+ for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+ // check memtable size and may asyncTryToFlush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ }
+ }
} catch (WriteProcessException e) {
- insertRowsNode
- .getResults()
- .put(
- subInsertRowsNode.getInsertRowNodeIndexList().get(0),
- RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
-
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
- // check memtable size and may asyncTryToFlush the work memtable
- if (entry.getKey().shouldFlush()) {
- fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ recordInsertRowsFailure(insertRowsNode, subInsertRowsNode, e);
}
}
@@ -1382,6 +1367,127 @@ public class DataRegion implements IDataRegionForQuery {
return executedInsertRowNodeList;
}
+ private List<TsFileProcessor> insertRowsWithTypeConsistencyCheck(
+ TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode,
long[] costsForMetrics)
+ throws WriteProcessException {
+ try {
+ tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics);
+ return Collections.singletonList(tsFileProcessor);
+ } catch (DataTypeInconsistentException e) {
+ InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0);
+ long timePartitionId =
TimePartitionUtils.getTimePartitionId(firstRow.getTime());
+ // flush both MemTables so that the new type can be inserted into a new
MemTable
+ flushWorkingProcessorsForTimePartition(timePartitionId);
+ return retryInsertRowsAfterFlush(subInsertRowsNode, timePartitionId,
costsForMetrics);
+ }
+ }
+
+ private InsertRowsNode createGroupedInsertRowsNode(
+ final InsertRowsNode sourceInsertRowsNode, final InsertRowNode
firstInsertRowNode) {
+ final InsertRowsNode groupedInsertRowsNode =
sourceInsertRowsNode.emptyClone();
+ initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+ return groupedInsertRowsNode;
+ }
+
+ private InsertRowsNode createGroupedInsertRowsNode(
+ final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+ final InsertRowNode firstInsertRowNode) {
+ final InsertRowsNode groupedInsertRowsNode =
+ new InsertRowsNode(sourceInsertRowsNode.getPlanNodeId());
+ initializeGroupedInsertRowsNode(groupedInsertRowsNode, firstInsertRowNode);
+ return groupedInsertRowsNode;
+ }
+
+ private void initializeGroupedInsertRowsNode(
+ final InsertRowsNode groupedInsertRowsNode, final InsertRowNode
firstInsertRowNode) {
+ groupedInsertRowsNode.setSearchIndex(firstInsertRowNode.getSearchIndex());
+ groupedInsertRowsNode.setAligned(firstInsertRowNode.isAligned());
+ if (firstInsertRowNode.isGeneratedByPipe()) {
+ groupedInsertRowsNode.markAsGeneratedByPipe();
+ }
+ if (firstInsertRowNode.isGeneratedByRemoteConsensusLeader()) {
+ groupedInsertRowsNode.markAsGeneratedByRemoteConsensusLeader();
+ }
+ }
+
+ private void appendInsertRowNode(
+ final InsertRowsNode groupedInsertRowsNode,
+ final InsertRowNode insertRowNode,
+ final int insertRowNodeIndex) {
+ if (groupedInsertRowsNode.isAligned() != insertRowNode.isAligned()) {
+ groupedInsertRowsNode.setMixingAlignment(true);
+ }
+ groupedInsertRowsNode.addOneInsertRowNode(insertRowNode,
insertRowNodeIndex);
+
groupedInsertRowsNode.updateProgressIndex(insertRowNode.getProgressIndex());
+ }
+
+ private void flushWorkingProcessorsForTimePartition(final long
timePartitionId) {
+ TsFileProcessor workSequenceProcessor =
workSequenceTsFileProcessors.get(timePartitionId);
+ if (workSequenceProcessor != null) {
+ fileFlushPolicy.apply(this, workSequenceProcessor,
workSequenceProcessor.isSequence());
+ }
+ TsFileProcessor workUnsequenceProcessor =
workUnsequenceTsFileProcessors.get(timePartitionId);
+ if (workUnsequenceProcessor != null) {
+ fileFlushPolicy.apply(this, workUnsequenceProcessor,
workUnsequenceProcessor.isSequence());
+ }
+ }
+
+ private List<TsFileProcessor> retryInsertRowsAfterFlush(
+ final InsertRowsNode subInsertRowsNode,
+ final long timePartitionId,
+ final long[] costsForMetrics)
+ throws WriteProcessException {
+ final Map<TsFileProcessor, InsertRowsNode> retriedProcessorMap = new
HashMap<>();
+ for (int i = 0; i < subInsertRowsNode.getInsertRowNodeList().size(); i++) {
+ final InsertRowNode insertRowNode =
subInsertRowsNode.getInsertRowNodeList().get(i);
+ final boolean isSequence =
+ config.isEnableSeparateData()
+ && insertRowNode.getTime()
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ final TsFileProcessor retriedProcessor =
+ getOrCreateTsFileProcessor(timePartitionId, isSequence);
+ final int insertRowNodeIndex =
subInsertRowsNode.getInsertRowNodeIndexList().get(i);
+ retriedProcessorMap.compute(
+ retriedProcessor,
+ (k, v) -> {
+ if (v == null) {
+ v = createGroupedInsertRowsNode(subInsertRowsNode,
insertRowNode);
+ }
+ appendInsertRowNode(v, insertRowNode, insertRowNodeIndex);
+ return v;
+ });
+ }
+
+ final List<TsFileProcessor> insertedProcessors = new
ArrayList<>(retriedProcessorMap.size());
+ for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry :
retriedProcessorMap.entrySet()) {
+ final TsFileProcessor retriedProcessor = retriedEntry.getKey();
+ retriedProcessor.insertRows(retriedEntry.getValue(), costsForMetrics);
+ insertedProcessors.add(retriedProcessor);
+ }
+ return insertedProcessors;
+ }
+
+ private void recordInsertRowsFailure(
+ final InsertRowsNode sourceInsertRowsNode,
+ final InsertRowsNode failedInsertRowsNode,
+ final WriteProcessException exception) {
+ final TSStatus failureStatus =
+ RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+ for (Integer failedIndex :
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+ sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+ }
+ }
+
+ private void recordInsertRowsFailure(
+ final InsertRowsOfOneDeviceNode sourceInsertRowsNode,
+ final InsertRowsNode failedInsertRowsNode,
+ final WriteProcessException exception) {
+ final TSStatus failureStatus =
+ RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage());
+ for (Integer failedIndex :
failedInsertRowsNode.getInsertRowNodeIndexList()) {
+ sourceInsertRowsNode.getResults().put(failedIndex, failureStatus);
+ }
+ }
private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
for (InsertRowNode node : nodeList) {
node.updateLastCache(databaseName);
@@ -1440,7 +1546,8 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean
sequence) {
+ protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId,
boolean sequence)
+ throws WriteProcessException {
TsFileProcessor tsFileProcessor = null;
int retryCnt = 0;
do {
@@ -1466,7 +1573,7 @@ public class DataRegion implements IDataRegionForQuery {
"disk space is insufficient when creating TsFile processor, change
system mode to read-only",
e);
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
- break;
+ throw new WriteProcessException(e.getMessage(), e.getErrorCode(),
true);
} catch (IOException e) {
if (retryCnt < 3) {
logger.warn("meet IOException when creating TsFileProcessor, retry
it again", e);
@@ -1475,11 +1582,15 @@ public class DataRegion implements IDataRegionForQuery {
logger.error(
"meet IOException when creating TsFileProcessor, change system
mode to error", e);
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
- break;
+ throw new WriteProcessException(
+ String.format(
+ "Failed to create TsFileProcessor for database %s,
timePartitionId %s",
+ databaseName, timeRangeId),
+ e);
}
} catch (ExceedQuotaException e) {
logger.error(e.getMessage());
- break;
+ throw new WriteProcessException(e.getMessage(), e.getErrorCode(),
true);
}
} while (tsFileProcessor == null);
return tsFileProcessor;
@@ -3663,8 +3774,13 @@ public class DataRegion implements IDataRegionForQuery {
config.isEnableSeparateData()
&& insertRowNode.getTime()
> lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
- TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, isSequence);
- if (tsFileProcessor == null) {
+ final TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId,
isSequence);
+ } catch (WriteProcessException e) {
+ insertRowsOfOneDeviceNode
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
continue;
}
int finalI = i;
@@ -3672,39 +3788,27 @@ public class DataRegion implements IDataRegionForQuery {
tsFileProcessor,
(k, v) -> {
if (v == null) {
- v = new
InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
- v.setSearchIndex(insertRowNode.getSearchIndex());
- v.setAligned(insertRowNode.isAligned());
- if (insertRowNode.isGeneratedByPipe()) {
- v.markAsGeneratedByPipe();
- }
- if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- v.markAsGeneratedByRemoteConsensusLeader();
- }
+ v = createGroupedInsertRowsNode(insertRowsOfOneDeviceNode,
insertRowNode);
}
- v.addOneInsertRowNode(insertRowNode, finalI);
- v.updateProgressIndex(insertRowNode.getProgressIndex());
+ appendInsertRowNode(v, insertRowNode, finalI);
return v;
});
}
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
- TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor.insert(subInsertRowsNode, costsForMetrics);
+ List<TsFileProcessor> insertedProcessors =
+ insertRowsWithTypeConsistencyCheck(entry.getKey(),
subInsertRowsNode, costsForMetrics);
+
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
+ for (TsFileProcessor tsFileProcessor : insertedProcessors) {
+ // check memtable size and may asyncTryToFlush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ }
+ }
} catch (WriteProcessException e) {
- insertRowsOfOneDeviceNode
- .getResults()
- .put(
- subInsertRowsNode.getInsertRowNodeIndexList().get(0),
- RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- }
-
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-
- // check memtable size and may asyncTryToFlush the work memtable
- if (tsFileProcessor.shouldFlush()) {
- fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ recordInsertRowsFailure(insertRowsOfOneDeviceNode,
subInsertRowsNode, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 73fca57a1fd..45c6d6f86cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -41,7 +41,7 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
@Override
protected void registerSubtask() {
- this.connectorSubtaskId =
+ this.sinkSubtaskId =
SubscriptionSinkSubtaskManager.instance()
.register(
executor.get(),
@@ -56,22 +56,21 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
@Override
public void startSubtask() throws PipeException {
- SubscriptionSinkSubtaskManager.instance().start(connectorSubtaskId);
+ SubscriptionSinkSubtaskManager.instance().start(sinkSubtaskId);
}
@Override
public void stopSubtask() throws PipeException {
- SubscriptionSinkSubtaskManager.instance().stop(connectorSubtaskId);
+ SubscriptionSinkSubtaskManager.instance().stop(sinkSubtaskId);
}
@Override
public void dropSubtask() throws PipeException {
SubscriptionSinkSubtaskManager.instance()
- .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
+ .deregister(pipeName, creationTime, regionId, sinkSubtaskId);
}
public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
- return SubscriptionSinkSubtaskManager.instance()
- .getPipeConnectorPendingQueue(connectorSubtaskId);
+ return
SubscriptionSinkSubtaskManager.instance().getPipeConnectorPendingQueue(sinkSubtaskId);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 5082b8fc4f0..8eab3e6be49 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -27,14 +28,17 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.QueryId;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -62,6 +66,8 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.PlainDeviceID;
@@ -78,6 +84,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +94,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
public class DataRegionTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -126,6 +142,7 @@ public class DataRegionTest {
dataRegion.syncDeleteDataFiles();
StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
}
+ DataNodeSchemaCache.getInstance().cleanUp();
EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
CompactionTaskManager.getInstance().stop();
EnvironmentUtils.cleanEnv();
@@ -975,6 +992,189 @@ public class DataRegionTest {
dataRegion1.syncDeleteDataFiles();
}
+ @Test
+ public void testInsertRowPropagatesTsFileProcessorCreationFailure()
+ throws IllegalPathException, DataRegionException,
TsFileProcessorException {
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.fail_row");
+ dataRegion1.setTsFileProcessorSupplier(
+ (timePartitionId, sequence) -> {
+ throw new WriteProcessRejectException("mock creation failure");
+ });
+
+ final TSRecord record = new TSRecord(1, "root.fail_row");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1)));
+ final InsertRowNode insertRowNode = buildInsertRowNodeByTSRecord(record);
+
+ try {
+ dataRegion1.insert(insertRowNode);
+ Assert.fail("Expected WriteProcessRejectException");
+ } catch (WriteProcessRejectException e) {
+ Assert.assertTrue(e.getMessage().contains("mock creation failure"));
+ } catch (WriteProcessException e) {
+ Assert.fail("Expected WriteProcessRejectException but got " +
e.getClass().getSimpleName());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ }
+ }
+
+ @Test
+ public void testInsertRowsMarkAllFailedRowsForSameProcessor() throws
Exception {
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.fail_rows");
+ final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+ Mockito.doThrow(new WriteProcessException("mock insert rows failure"))
+ .when(processor)
+ .insertRows(any(InsertRowsNode.class), any(long[].class));
+ Mockito.when(processor.shouldFlush()).thenReturn(false);
+ Mockito.when(processor.isSequence()).thenReturn(true);
+ dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) ->
processor);
+
+ final List<Integer> indexList = Arrays.asList(0, 1);
+ final List<InsertRowNode> nodes = new ArrayList<>();
+ for (long time : new long[] {1, 2}) {
+ final TSRecord record = new TSRecord(time, "root.fail_rows");
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(time)));
+ nodes.add(buildInsertRowNodeByTSRecord(record));
+ }
+ final InsertRowsNode insertRowsNode = new InsertRowsNode(new
PlanNodeId(""), indexList, nodes);
+
+ try {
+ dataRegion1.insert(insertRowsNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ Assert.assertEquals(2, insertRowsNode.getResults().size());
+ Assert.assertEquals(
+ TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+ insertRowsNode.getResults().get(0).getCode());
+ Assert.assertEquals(
+ TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode(),
+ insertRowsNode.getResults().get(1).getCode());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ }
+ }
+
+ @Test
+ public void testInsertRowsLastCacheSkipsFailedRows() throws Exception {
+ final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+ COMMON_CONFIG.setLastCacheEnable(true);
+
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.cache_rows");
+ final TsFileProcessor successProcessor =
Mockito.mock(TsFileProcessor.class);
+ Mockito.when(successProcessor.shouldFlush()).thenReturn(false);
+ Mockito.when(successProcessor.isSequence()).thenReturn(true);
+ final long failingTime = TimePartitionUtils.getTimePartitionInterval() + 1;
+ final long failingPartitionId =
TimePartitionUtils.getTimePartitionId(failingTime);
+ dataRegion1.setTsFileProcessorSupplier(
+ (timePartitionId, sequence) -> {
+ if (timePartitionId == failingPartitionId) {
+ throw new WriteProcessException("mock row failure");
+ }
+ return successProcessor;
+ });
+
+ final MeasurementPath lastCachePath =
+ new MeasurementPath(
+ "root.cache_rows",
+ measurementId,
+ new MeasurementSchema(
+ measurementId, TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
+
DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(),
lastCachePath);
+
+ final List<Integer> indexList = Arrays.asList(0, 1);
+ final List<InsertRowNode> nodes = new ArrayList<>();
+ final long[] times = new long[] {1, failingTime};
+ final int[] values = new int[] {10, 20};
+ for (int i = 0; i < times.length; i++) {
+ final long time = times[i];
+ final TSRecord record = new TSRecord(time, "root.cache_rows");
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(values[i])));
+ nodes.add(buildInsertRowNodeByTSRecord(record));
+ }
+ final InsertRowsNode insertRowsNode = new InsertRowsNode(new
PlanNodeId(""), indexList, nodes);
+
+ try {
+ dataRegion1.insert(insertRowsNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ final TimeValuePair lastCache =
+ DataNodeSchemaCache.getInstance().getLastCache(lastCachePath);
+ Assert.assertNotNull(lastCache);
+ Assert.assertEquals(1, lastCache.getTimestamp());
+ Assert.assertEquals(10, lastCache.getValue().getInt());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+ }
+ }
+
+ @Test
+ public void testInsertTabletLastCacheSkipsFailedRows() throws Exception {
+ final boolean originalLastCacheEnable = COMMON_CONFIG.isLastCacheEnable();
+ COMMON_CONFIG.setLastCacheEnable(true);
+
+ final HookedDataRegion dataRegion1 = new HookedDataRegion(systemDir,
"root.cache_tablet");
+ final TsFileProcessor processor = Mockito.mock(TsFileProcessor.class);
+ Mockito.doAnswer(
+ invocation -> {
+ TSStatus[] results = invocation.getArgument(3);
+ results[0] = RpcUtils.SUCCESS_STATUS;
+ results[1] =
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
"mock row failure");
+ throw new WriteProcessException("mock tablet failure");
+ })
+ .when(processor)
+ .insertTablet(any(InsertTabletNode.class), anyInt(), anyInt(),
any(TSStatus[].class));
+ Mockito.when(processor.shouldFlush()).thenReturn(false);
+ Mockito.when(processor.isSequence()).thenReturn(true);
+ dataRegion1.setTsFileProcessorSupplier((timePartitionId, sequence) ->
processor);
+
+ final MeasurementPath lastCachePath =
+ new MeasurementPath(
+ "root.cache_tablet",
+ measurementId,
+ new MeasurementSchema(
+ measurementId, TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
+
DataNodeSchemaCache.getInstance().declareLastCache(dataRegion1.getDatabaseName(),
lastCachePath);
+
+ final String[] measurements = new String[] {measurementId};
+ final TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32};
+ final MeasurementSchema[] measurementSchemas =
+ new MeasurementSchema[] {
+ new MeasurementSchema(measurementId, TSDataType.INT32,
TSEncoding.PLAIN)
+ };
+ final long[] times = new long[] {1, 2};
+ final Object[] columns = new Object[] {new int[] {10, 20}};
+ final InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.cache_tablet"),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ null,
+ columns,
+ times.length);
+
+ try {
+ dataRegion1.insertTablet(insertTabletNode);
+ Assert.fail("Expected BatchProcessException");
+ } catch (BatchProcessException e) {
+ final TimeValuePair lastCache =
+ DataNodeSchemaCache.getInstance().getLastCache(lastCachePath);
+ Assert.assertNotNull(lastCache);
+ Assert.assertEquals(1, lastCache.getTimestamp());
+ Assert.assertEquals(10, lastCache.getValue().getInt());
+ } finally {
+ dataRegion1.syncDeleteDataFiles();
+ COMMON_CONFIG.setLastCacheEnable(originalLastCacheEnable);
+ }
+ }
+
@Test
public void testSmallReportProportionInsertRow()
throws WriteProcessException,
@@ -1490,6 +1690,32 @@ public class DataRegionTest {
}
}
+ private interface TsFileProcessorSupplier {
+ TsFileProcessor get(long timePartitionId, boolean sequence) throws
WriteProcessException;
+ }
+
+ private static class HookedDataRegion extends DummyDataRegion {
+ private TsFileProcessorSupplier tsFileProcessorSupplier;
+
+ private HookedDataRegion(String systemInfoDir, String storageGroupName)
+ throws DataRegionException {
+ super(systemInfoDir, storageGroupName);
+ }
+
+ private void setTsFileProcessorSupplier(TsFileProcessorSupplier
tsFileProcessorSupplier) {
+ this.tsFileProcessorSupplier = tsFileProcessorSupplier;
+ }
+
+ @Override
+ protected TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId,
boolean sequence)
+ throws WriteProcessException {
+ if (tsFileProcessorSupplier != null) {
+ return tsFileProcessorSupplier.get(timeRangeId, sequence);
+ }
+ return super.getOrCreateTsFileProcessor(timeRangeId, sequence);
+ }
+ }
+
// -- test for deleting data directly
// -- delete data and file only when:
// 1. tsfile is closed
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 3ff47a2c5e1..46280ede66c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -284,7 +284,7 @@ public class ClientPoolFactory {
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.build(),
- ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+ ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
.build()
@@ -310,7 +310,7 @@ public class ClientPoolFactory {
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
.build(),
- ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+ ThreadName.PIPE_ASYNC_SINK_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
.build()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 696b6b8ce07..9a8beac5a43 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -129,10 +129,10 @@ public enum ThreadName {
GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
GROUP_MANAGEMENT("groupManagement"),
// -------------------------- Compute --------------------------
- PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"),
+ PIPE_SOURCE_DISRUPTOR("Pipe-Source-Disruptor"),
PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
- PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
PIPE_CONSENSUS_EXECUTOR_POOL("Pipe-Consensus-Executor-Pool"),
+ PIPE_SINK_EXECUTOR_POOL("Pipe-Sink-Executor-Pool"),
PIPE_CONFIGNODE_EXECUTOR_POOL("Pipe-ConfigNode-Executor-Pool"),
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
PIPE_TSFILE_ASYNC_SEND_POOL("Pipe-TsFile-Async-Send-Pool"),
@@ -142,7 +142,7 @@ public enum ThreadName {
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER(
"Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
- PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
+ PIPE_ASYNC_SINK_CLIENT_POOL("Pipe-Async-Sink-Client-Pool"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"),
@@ -289,9 +289,9 @@ public enum ThreadName {
private static final Set<ThreadName> computeThreadNames =
new HashSet<>(
Arrays.asList(
- PIPE_EXTRACTOR_DISRUPTOR,
+ PIPE_SOURCE_DISRUPTOR,
PIPE_PROCESSOR_EXECUTOR_POOL,
- PIPE_CONNECTOR_EXECUTOR_POOL,
+ PIPE_SINK_EXECUTOR_POOL,
PIPE_CONSENSUS_EXECUTOR_POOL,
PIPE_CONFIGNODE_EXECUTOR_POOL,
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
@@ -300,7 +300,7 @@ public enum ThreadName {
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER,
- PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
+ PIPE_ASYNC_SINK_CLIENT_POOL,
PIPE_RECEIVER_AIR_GAP_AGENT,
PIPE_AIR_GAP_RECEIVER,
PIPE_PARALLEL_EXECUTION_POOL,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ec9ce06fd1d..3b7229927d1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -277,11 +277,11 @@ public class CommonConfig {
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500;
- private int pipeAsyncConnectorSelectorNumber =
+ private int pipeAsyncSinkSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
- private int pipeAsyncConnectorMaxClientNumber =
+ private int pipeAsyncSinkMaxClientNumber =
Math.max(32, Runtime.getRuntime().availableProcessors() * 2);
- private int pipeAsyncConnectorMaxTsFileClientNumber =
+ private int pipeAsyncSinkMaxTsFileClientNumber =
Math.max(16, Runtime.getRuntime().availableProcessors());
private boolean printLogWhenEncounterException = false;
@@ -289,8 +289,7 @@ public class CommonConfig {
private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
- private int pipeConnectorRequestSliceThresholdBytes =
- (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
+ private int pipeSinkRequestSliceThresholdBytes = (int)
(RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
private boolean isSeperatedPipeHeartbeatEnabled = true;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3;
@@ -1002,7 +1001,7 @@ public class CommonConfig {
}
public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
- final int fPipeConnectorHandshakeTimeoutMs =
this.pipeSinkHandshakeTimeoutMs;
+ final int fPipeSinkHandshakeTimeoutMs = this.pipeSinkHandshakeTimeoutMs;
try {
this.pipeSinkHandshakeTimeoutMs =
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
} catch (ArithmeticException e) {
@@ -1010,7 +1009,7 @@ public class CommonConfig {
logger.warn(
"Given pipe connector handshake timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs)
{
+ if (fPipeSinkHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) {
logger.info("pipeSinkHandshakeTimeoutMs is set to {}.",
this.pipeSinkHandshakeTimeoutMs);
}
}
@@ -1041,16 +1040,16 @@ public class CommonConfig {
}
public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
- final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
+ final int fPipeSinkTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
try {
this.pipeSinkTransferTimeoutMs =
Math.toIntExact(pipeSinkTransferTimeoutMs);
} catch (ArithmeticException e) {
this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
logger.warn(
- "Given pipe connector transfer timeout is too large, set to {} ms.",
Integer.MAX_VALUE);
+ "Given pipe sink transfer timeout is too large, set to {} ms.",
Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
- logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeSinkTransferTimeoutMs);
+ if (fPipeSinkTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+ logger.info("pipeSinkTransferTimeoutMs is set to {}.",
pipeSinkTransferTimeoutMs);
}
}
}
@@ -1064,7 +1063,7 @@ public class CommonConfig {
return;
}
this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
- logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeSinkReadFileBufferSize);
+ logger.info("pipeSinkReadFileBufferSize is set to {}.",
pipeSinkReadFileBufferSize);
}
public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
@@ -1164,60 +1163,58 @@ public class CommonConfig {
}
public int getPipeAsyncSinkSelectorNumber() {
- return pipeAsyncConnectorSelectorNumber;
+ return pipeAsyncSinkSelectorNumber;
}
- public void setPipeAsyncConnectorSelectorNumber(int
pipeAsyncConnectorSelectorNumber) {
- if (pipeAsyncConnectorSelectorNumber <= 0) {
+ public void setPipeAsyncSinkSelectorNumber(int pipeAsyncSinkSelectorNumber) {
+ if (pipeAsyncSinkSelectorNumber <= 0) {
logger.info(
- "pipeAsyncConnectorSelectorNumber should be greater than 0,
configuring it not to change.");
+ "pipeAsyncSinkSelectorNumber should be greater than 0, configuring
it not to change.");
return;
}
- pipeAsyncConnectorSelectorNumber = Math.max(4,
pipeAsyncConnectorSelectorNumber);
- if (this.pipeAsyncConnectorSelectorNumber ==
pipeAsyncConnectorSelectorNumber) {
+ pipeAsyncSinkSelectorNumber = Math.max(4, pipeAsyncSinkSelectorNumber);
+ if (this.pipeAsyncSinkSelectorNumber == pipeAsyncSinkSelectorNumber) {
return;
}
- this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
- logger.info("pipeAsyncConnectorSelectorNumber is set to {}.",
pipeAsyncConnectorSelectorNumber);
+ this.pipeAsyncSinkSelectorNumber = pipeAsyncSinkSelectorNumber;
+ logger.info("pipeAsyncSinkSelectorNumber is set to {}.",
pipeAsyncSinkSelectorNumber);
}
public int getPipeAsyncSinkMaxClientNumber() {
- return pipeAsyncConnectorMaxClientNumber;
+ return pipeAsyncSinkMaxClientNumber;
}
- public void setPipeAsyncConnectorMaxClientNumber(int
pipeAsyncConnectorMaxClientNumber) {
- if (pipeAsyncConnectorMaxClientNumber <= 0) {
+ public void setPipeAsyncSinkMaxClientNumber(int
pipeAsyncSinkMaxClientNumber) {
+ if (pipeAsyncSinkMaxClientNumber <= 0) {
logger.info(
- " pipeAsyncConnectorMaxClientNumber should be greater than 0,
configuring it not to change.");
+ " pipeAsyncSinkMaxClientNumber should be greater than 0, configuring
it not to change.");
return;
}
- pipeAsyncConnectorMaxClientNumber = Math.max(32,
pipeAsyncConnectorMaxClientNumber);
- if (this.pipeAsyncConnectorMaxClientNumber ==
pipeAsyncConnectorMaxClientNumber) {
+ pipeAsyncSinkMaxClientNumber = Math.max(32, pipeAsyncSinkMaxClientNumber);
+ if (this.pipeAsyncSinkMaxClientNumber == pipeAsyncSinkMaxClientNumber) {
return;
}
- this.pipeAsyncConnectorMaxClientNumber = pipeAsyncConnectorMaxClientNumber;
- logger.info(
- "pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxClientNumber);
+ this.pipeAsyncSinkMaxClientNumber = pipeAsyncSinkMaxClientNumber;
+ logger.info("pipeAsyncSinkMaxClientNumber is set to {}.",
pipeAsyncSinkMaxClientNumber);
}
public int getPipeAsyncSinkMaxTsFileClientNumber() {
- return pipeAsyncConnectorMaxTsFileClientNumber;
+ return pipeAsyncSinkMaxTsFileClientNumber;
}
- public void setPipeAsyncConnectorMaxTsFileClientNumber(
- int pipeAsyncConnectorMaxTsFileClientNumber) {
- if (pipeAsyncConnectorMaxTsFileClientNumber <= 0) {
+ public void setPipeAsyncSinkMaxTsFileClientNumber(int
pipeAsyncSinkMaxTsFileClientNumber) {
+ if (pipeAsyncSinkMaxTsFileClientNumber <= 0) {
logger.info(
- "pipeAsyncConnectorMaxTsFileClientNumber should be greater than 0,
configuring it not to change.");
+ "pipeAsyncSinkMaxTsFileClientNumber should be greater than 0,
configuring it not to change.");
return;
}
- pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16,
pipeAsyncConnectorMaxTsFileClientNumber);
- if (this.pipeAsyncConnectorMaxTsFileClientNumber ==
pipeAsyncConnectorMaxTsFileClientNumber) {
+ pipeAsyncSinkMaxTsFileClientNumber = Math.max(16,
pipeAsyncSinkMaxTsFileClientNumber);
+ if (this.pipeAsyncSinkMaxTsFileClientNumber ==
pipeAsyncSinkMaxTsFileClientNumber) {
return;
}
- this.pipeAsyncConnectorMaxTsFileClientNumber =
pipeAsyncConnectorMaxTsFileClientNumber;
+ this.pipeAsyncSinkMaxTsFileClientNumber =
pipeAsyncSinkMaxTsFileClientNumber;
logger.info(
- "pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxTsFileClientNumber);
+ "pipeAsyncSinkMaxTsFileClientNumber is set to {}.",
pipeAsyncSinkMaxTsFileClientNumber);
}
public boolean isPrintLogWhenEncounterException() {
@@ -1321,12 +1318,12 @@ public class CommonConfig {
return pipeSinkRetryIntervalMs;
}
- public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
- if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+ public void setPipeSinkRetryIntervalMs(long pipeSinkRetryIntervalMs) {
+ if (this.pipeSinkRetryIntervalMs == pipeSinkRetryIntervalMs) {
return;
}
- this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
- logger.info("pipeSinkRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ this.pipeSinkRetryIntervalMs = pipeSinkRetryIntervalMs;
+ logger.info("pipeSinkRetryIntervalMs is set to {}",
pipeSinkRetryIntervalMs);
}
public boolean isPipeSinkRetryLocallyForConnectionError() {
@@ -2127,18 +2124,16 @@ public class CommonConfig {
}
public int getPipeSinkRequestSliceThresholdBytes() {
- return pipeConnectorRequestSliceThresholdBytes;
+ return pipeSinkRequestSliceThresholdBytes;
}
- public void setPipeConnectorRequestSliceThresholdBytes(
- int pipeConnectorRequestSliceThresholdBytes) {
- if (this.pipeConnectorRequestSliceThresholdBytes ==
pipeConnectorRequestSliceThresholdBytes) {
+ public void setPipeSinkRequestSliceThresholdBytes(int
pipeSinkRequestSliceThresholdBytes) {
+ if (this.pipeSinkRequestSliceThresholdBytes ==
pipeSinkRequestSliceThresholdBytes) {
return;
}
- this.pipeConnectorRequestSliceThresholdBytes =
pipeConnectorRequestSliceThresholdBytes;
+ this.pipeSinkRequestSliceThresholdBytes =
pipeSinkRequestSliceThresholdBytes;
logger.info(
- "pipeConnectorRequestSliceThresholdBytes is set to {}",
- pipeConnectorRequestSliceThresholdBytes);
+ "pipeConnectorRequestSliceThresholdBytes is set to {}",
pipeSinkRequestSliceThresholdBytes);
}
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 02284671803..83db711e5ec 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -443,7 +443,7 @@ public class PipeDescriptor {
"rate_limiter_hot_reload_check_interval_ms",
String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
- config.setPipeConnectorRequestSliceThresholdBytes(
+ config.setPipeSinkRequestSliceThresholdBytes(
Integer.parseInt(
properties.getProperty(
"pipe_connector_request_slice_threshold_bytes",
@@ -613,7 +613,7 @@ public class PipeDescriptor {
"pipe_async_connector_selector_number",
isHotModify);
if (value != null) {
- config.setPipeAsyncConnectorSelectorNumber(Integer.parseInt(value));
+ config.setPipeAsyncSinkSelectorNumber(Integer.parseInt(value));
}
value =
@@ -623,7 +623,7 @@ public class PipeDescriptor {
"pipe_async_connector_max_client_number",
isHotModify);
if (value != null) {
- config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value));
+ config.setPipeAsyncSinkMaxClientNumber(Integer.parseInt(value));
}
value =
@@ -633,7 +633,7 @@ public class PipeDescriptor {
"pipe_async_connector_max_tsfile_client_number",
isHotModify);
if (value != null) {
-
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
+ config.setPipeAsyncSinkMaxTsFileClientNumber(Integer.parseInt(value));
}
value =