This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef35e4537e Cleaned multiple potential problems in pipe module (#17396)
6ef35e4537e is described below
commit 6ef35e4537e623e17da2d486082edd5f1d44095d
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 09:34:25 2026 +0800
Cleaned multiple potential problems in pipe module (#17396)
* fix
* fix
* fix
* fix
* gras-shop
* fix
* spls
* fix
* pipe-dn
* logger-bug
* fix
---
.../runtime/heartbeat/PipeHeartbeat.java | 2 +-
.../pipe/source/IoTDBConfigRegionSource.java | 6 ++--
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 2 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +-
.../agent/task/connection/PipeEventCollector.java | 2 +-
.../sink/PipeRealtimePriorityBlockingQueue.java | 11 ++----
.../common/deletion/PipeDeleteDataNodeEvent.java | 3 +-
.../pipe/metric/overview/PipeResourceMetrics.java | 2 +-
.../receiver/PipeDataNodeReceiverMetrics.java | 4 +--
.../processor/aggregate/AggregateProcessor.java | 4 +--
.../iotconsensusv2/IoTConsensusV2Receiver.java | 25 +++++++++----
.../IoTConsensusV2ReceiverAgent.java | 2 +-
.../resource/memory/PipeDynamicMemoryBlock.java | 4 +--
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 4 ++-
.../client/IoTDBDataNodeAsyncClientManager.java | 4 +--
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 2 +-
.../websocket/WebSocketConnectorServer.java | 23 ++++++++----
.../util/builder/PipeTableModelTsFileBuilder.java | 9 ++---
.../util/builder/PipeTreeModelTsFileBuilder.java | 9 ++---
.../pipe/sink/util/builder/PipeTsFileBuilder.java | 4 +--
.../source/dataregion/IoTDBDataRegionSource.java | 2 +-
.../PipeRealtimeDataRegionHybridSource.java | 34 ++----------------
.../realtime/PipeRealtimeDataRegionLogSource.java | 42 ++++------------------
.../realtime/PipeRealtimeDataRegionSource.java | 35 ++----------------
.../PipeRealtimeDataRegionTsFileSource.java | 18 ++--------
.../TsFileDeduplicationBlockingPendingQueue.java | 2 +-
.../task/connection/BlockingPendingQueue.java | 21 +----------
.../pipe/datastructure/pattern/TreePattern.java | 11 ++++--
.../pipe/sink/client/IoTDBSyncClientManager.java | 2 +-
29 files changed, 99 insertions(+), 192 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 13a3c4b83d6..e7e9d2cd97d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -55,7 +55,7 @@ public class PipeHeartbeat {
// the final results and namely these dataNodes are omitted in
calculation.
remainingEventCountMap.put(
pipeMeta.getStaticMeta(),
- Objects.nonNull(pipeCompletedListFromAgent)
+ Objects.nonNull(pipeRemainingEventCountListFromAgent)
? pipeRemainingEventCountListFromAgent.get(i)
: 0L);
remainingTimeMap.put(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 07ebb638313..762e8c154e5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -153,8 +153,10 @@ public class IoTDBConfigRegionSource extends
IoTDBNonDataRegionSource {
@Override
public synchronized EnrichedEvent supply() throws Exception {
final EnrichedEvent event = super.supply();
- PipeEventCommitManager.getInstance()
- .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+ if (Objects.nonNull(event)) {
+ PipeEventCommitManager.getInstance()
+ .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+ }
return event;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 1e1d3388f75..01289109371 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -78,7 +78,7 @@ class PipeAgentLauncher {
curList.add(uninstalledOrConflictedPipePluginMetaList.get(index +
offset));
offset++;
}
- index += (offset + 1);
+ index += offset;
fetchAndSavePipePluginJars(curList);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index ea12513d647..8e6818dff32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -706,7 +706,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
needMemory,
freeMemorySizeInBytes,
- freeMemorySizeInBytes,
+ reservedMemorySizeInBytes,
PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes());
LOGGER.warn(message);
throw new PipeException(message);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index ec2a4850693..a22848ee3ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -238,7 +238,7 @@ public class PipeEventCollector implements EventCollector {
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}
- pendingQueue.directOffer(event);
+ pendingQueue.offer(event);
collectInvocationCount.incrementAndGet();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 2baebeedc18..6d227ac31fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -72,7 +72,7 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
}
@Override
- public boolean directOffer(final Event event) {
+ public boolean offer(final Event event) {
checkBeforeOffer(event);
if (event instanceof TsFileInsertionEvent) {
@@ -85,18 +85,13 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
((EnrichedEvent)
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
return false;
} else {
- return super.directOffer(event);
+ return super.offer(event);
}
}
- @Override
- public boolean waitedOffer(final Event event) {
- return directOffer(event);
- }
-
@Override
public boolean put(final Event event) {
- directOffer(event);
+ offer(event);
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
index d0f720c2743..f49f2b8bc83 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
@@ -193,7 +192,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent
implements Serializab
@Override
public void deserializeFromByteBuffer(final ByteBuffer buffer) {
isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
- deleteDataNode = (DeleteDataNode) PlanNodeType.deserialize(buffer);
+ deleteDataNode = (AbstractDeleteDataNode) PlanNodeType.deserialize(buffer);
progressIndex = deleteDataNode.getProgressIndex();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 4156aea4bc5..eeb840ce0c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -138,7 +138,7 @@ public class PipeResourceMetrics implements IMetricSet {
// phantom reference count
metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
- metricService.remove(MetricType.RATE,
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
+ metricService.remove(MetricType.COUNTER,
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
}
public void recordDiskIO(final long bytes) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
index 1e877a7b2a0..82defa546bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
@@ -338,14 +338,14 @@ public class PipeDataNodeReceiverMetrics implements
IMetricSet {
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
- "handshakeDatanodeV1");
+ "handshakeDataNodeV1");
metricService.remove(
MetricType.TIMER,
Metric.PIPE_DATANODE_RECEIVER.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
- "handshakeDatanodeV2");
+ "handshakeDataNodeV2");
metricService.remove(
MetricType.TIMER,
Metric.PIPE_DATANODE_RECEIVER.toString(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 10a72a0d9c5..43b09940780 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -744,7 +744,7 @@ public class AggregateProcessor implements PipeProcessor {
throw new UnsupportedOperationException(
String.format(
"The output tablet does not support column type %s",
- valueColumnTypes[rowIndex]));
+ valueColumnTypes[columnIndex]));
}
} else {
bitMaps[columnIndex].mark(rowIndex);
@@ -758,7 +758,7 @@ public class AggregateProcessor implements PipeProcessor {
int filteredCount = 0;
for (int i = 0; i < columnNameStringList.length; ++i) {
if (!bitMaps[i].isAllMarked()) {
- originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
+ originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index 77eb1d0a6fc..adf80654445 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -555,6 +555,15 @@ public class IoTConsensusV2Receiver {
}
}
+ if (req.getFileNames().size() < 2) {
+ return new TIoTConsensusV2TransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
+ String.format(
+ "Failed to seal file %s, because the number of files is
less than 2.",
+ req.getFileNames())));
+ }
+
// Sync here is necessary to ensure that the data is written to the
disk. Or data region may
// load the file before the data is written to the disk and cause
unexpected behavior after
// system restart. (e.g., empty file in data region's data directory)
@@ -952,6 +961,7 @@ public class IoTConsensusV2Receiver {
private class IoTConsensusV2TsFileWriterPool {
private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
private final List<IoTConsensusV2TsFileWriter>
iotConsensusV2TsFileWriterPool =
new ArrayList<>();
private final ConsensusPipeName consensusPipeName;
@@ -1014,15 +1024,18 @@ public class IoTConsensusV2Receiver {
while (!tsFileWriter.isPresent()) {
tsFileWriter =
iotConsensusV2TsFileWriterPool.stream().filter(item ->
!item.isUsed()).findFirst();
- Thread.sleep(RETRY_WAIT_TIME);
+ condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
}
tsFileWriter.get().setUsed(true);
tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
- LOGGER.warn(
- "IoTConsensusV2{}: receiver thread get interrupted when waiting
for borrowing tsFileWriter.",
- consensusPipeName);
+ final String errorStr =
+ String.format(
+ "IoTConsensusV2%s: receiver thread get interrupted when
waiting for borrowing tsFileWriter.",
+ consensusPipeName);
+ LOGGER.warn(errorStr);
+ throw new RuntimeException(errorStr);
} finally {
lock.unlock();
}
@@ -1057,7 +1070,7 @@ public class IoTConsensusV2Receiver {
&& tsFileWriter.isUsed()) {
try {
Thread.sleep(RETRY_WAIT_TIME);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
"IoTConsensusV2-PipeName-{}: receiver thread get
interrupted when exiting.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
index 481d8dd7370..be2659c1cf4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
@@ -243,7 +243,7 @@ public class IoTConsensusV2ReceiverAgent implements
ConsensusPipeReceiver {
ConsensusPipeName consensusPipeName = receiverEntry.getKey();
AtomicReference<IoTConsensusV2Receiver> receiverReference =
receiverEntry.getValue();
- if (receiverReference != null) {
+ if (receiverReference != null && receiverReference.get() !=
null) {
receiverReference.get().handleExit();
receiverReference.set(null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
index 4e33b871828..3dbac912db3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
@@ -44,7 +44,7 @@ public class PipeDynamicMemoryBlock {
PipeDynamicMemoryBlock(
final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long
memoryUsageInBytes) {
- this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
+ this.memoryUsageInBytes = Math.max(memoryUsageInBytes, 0);
this.fixedMemoryBlock = fixedMemoryBlock;
}
@@ -116,7 +116,7 @@ public class PipeDynamicMemoryBlock {
if (Double.isNaN(historyMemoryEfficiency)
|| Double.isInfinite(historyMemoryEfficiency)
|| historyMemoryEfficiency < 0.0) {
- currentMemoryEfficiency = 0.0;
+ historyMemoryEfficiency = 0.0;
}
this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 0ba204fcbd2..458434d8494 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -220,7 +220,9 @@ public class PipeMemoryWeightUtil {
totalSizeInBytes +=
NUM_BYTES_ARRAY_HEADER + (long) NUM_BYTES_OBJECT_REF *
measurementSchemas.size();
for (IMeasurementSchema measurementSchema : measurementSchemas) {
- InsertNodeMemoryEstimator.sizeOfMeasurementSchema((MeasurementSchema)
measurementSchema);
+ totalSizeInBytes +=
+ InsertNodeMemoryEstimator.sizeOfMeasurementSchema(
+ (MeasurementSchema) measurementSchema);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 73e2213eea6..88a79146295 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -477,7 +477,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
while (true) {
final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random()
* clientSize));
- if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
@@ -498,7 +498,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
long n = 0;
while (true) {
for (final TEndPoint targetNodeUrl : endPointList) {
- if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 622f4e4f0cd..7f904324bbb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -172,7 +172,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile event %s, because %s.",
- ((PipeDeleteDataNodeEvent) event).coreReportMessage(),
e.getMessage()),
+ ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 240de912f6f..491d40a1e45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,14 +92,18 @@ public class WebSocketConnectorServer extends
WebSocketServer {
final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
eventsWaitingForTransfer.remove(pipeName);
while (!eventTransferQueue.isEmpty()) {
- eventTransferQueue.forEach(
+ final List<EventWaitingForTransfer> eventWrappers;
+ synchronized (eventTransferQueue) {
+ eventWrappers = new ArrayList<>(eventTransferQueue);
+ eventTransferQueue.clear();
+ }
+ eventWrappers.forEach(
(eventWrapper) -> {
if (eventWrapper.event instanceof EnrichedEvent) {
((EnrichedEvent) eventWrapper.event)
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
}
});
- eventTransferQueue.clear();
synchronized (eventTransferQueue) {
eventTransferQueue.notifyAll();
}
@@ -270,8 +276,10 @@ public class WebSocketConnectorServer extends
WebSocketServer {
LOGGER.warn(
"The tablet of commitId: {} can't be parsed by client, it will be
retried later.", eventId);
- eventTransferQueue.put(
- new EventWaitingForTransfer(eventId, eventWrapper.connector,
eventWrapper.event));
+ synchronized (eventTransferQueue) {
+ eventTransferQueue.put(
+ new EventWaitingForTransfer(eventId, eventWrapper.connector,
eventWrapper.event));
+ }
}
@Override
@@ -321,7 +329,9 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
}
- queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(),
connector, event));
+ synchronized (queue) {
+ queue.put(new
EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
+ }
}
private class TransferThread extends Thread {
@@ -347,7 +357,8 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
try {
- final EventWaitingForTransfer queueElement = queue.take();
+ EventWaitingForTransfer queueElement;
+ queueElement = queue.take();
synchronized (queue) {
queue.notifyAll();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
index 52c18a10da9..c105a86ff77 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
@@ -160,18 +160,19 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
e.getMessage(),
e);
+ final File file = fileWriter.getIOWriter().getFile();
try {
fileWriter.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Batch id = {}: Failed to close the tsfile {} after failed to
write tablets into, because {}",
currentBatchId.get(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
closeException.getMessage(),
closeException);
} finally {
// Add current writing file to the list and delete the file
- sealedFiles.add(new Pair<>(dataBase,
fileWriter.getIOWriter().getFile()));
+ sealedFiles.add(new Pair<>(dataBase, file));
}
for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -181,7 +182,7 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
currentBatchId.get(),
deleteSuccess ? "Successfully" : "Failed to",
sealedFile.right.getPath(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
deleteSuccess ? "" : "Maybe the tsfile needs to be deleted
manually.");
}
sealedFiles.clear();
@@ -191,8 +192,8 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
throw e;
}
- fileWriter.close();
final File sealedFile = fileWriter.getIOWriter().getFile();
+ fileWriter.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
index d46180bd793..89827cd8421 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
@@ -155,18 +155,19 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
e.getMessage(),
e);
+ final File file = fileWriter.getIOWriter().getFile();
try {
fileWriter.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Batch id = {}: Failed to close the tsfile {} after failed to
write tablets into, because {}",
currentBatchId.get(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
closeException.getMessage(),
closeException);
} finally {
// Add current writing file to the list and delete the file
- sealedFiles.add(new Pair<>(null,
fileWriter.getIOWriter().getFile()));
+ sealedFiles.add(new Pair<>(null, file));
}
for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -176,7 +177,7 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
currentBatchId.get(),
deleteSuccess ? "Successfully" : "Failed to",
sealedFile.right.getPath(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
deleteSuccess ? "" : "Maybe the tsfile needs to be deleted
manually.");
}
sealedFiles.clear();
@@ -186,8 +187,8 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
throw e;
}
- fileWriter.close();
final File sealedFile = fileWriter.getIOWriter().getFile();
+ fileWriter.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
index 81557963822..217adeed43e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
@@ -106,9 +106,7 @@ public abstract class PipeTsFileBuilder {
return baseDir;
}
throw new PipeException(
- String.format(
- "Failed to create batch file dir %s. (Batch id = %s)",
- baseDir.getPath(), currentBatchId.get()));
+ String.format("Failed to create batch file dir. (Batch id = %s)",
currentBatchId.get()));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 32e5c300f7d..1b06aab8a07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -532,7 +532,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
watermarkIntervalInMs =
parameters.getLongOrDefault(
- Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY,
_SOURCE_WATERMARK_INTERVAL_KEY),
+ Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY,
SOURCE_WATERMARK_INTERVAL_KEY),
EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
} else if (parameters.hasAnyAttributes(
_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 0721683f4d2..c9e3f35288a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -58,7 +58,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
- extractDirectly(event);
+ pendingQueue.offer(event);
} else {
throw new UnsupportedOperationException(
String.format(
@@ -116,21 +116,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
if (state == TsFileEpoch.State.USING_BOTH) {
event.skipReportOnCommit();
}
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extractTabletInsertion: pending queue of
PipeRealtimeDataRegionHybridExtractor %s "
- + "has reached capacity, discard tablet event %s,
current state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the tablet event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
break;
default:
throw new UnsupportedOperationException(
@@ -176,21 +162,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extractTsFileInsertion: pending queue of
PipeRealtimeDataRegionHybridExtractor %s "
- + "has reached capacity, discard TsFile event %s,
current state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the tsfile event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
break;
default:
throw new UnsupportedOperationException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index 35a2c7190c8..579310b3f15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -40,7 +40,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
LoggerFactory.getLogger(PipeRealtimeDataRegionLogSource.class);
@Override
- protected void doExtract(PipeRealtimeEvent event) {
+ protected void doExtract(final PipeRealtimeEvent event) {
final Event eventToExtract = event.getEvent();
if (eventToExtract instanceof TabletInsertionEvent) {
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
- extractDirectly(event);
+ pendingQueue.offer(event);
} else {
throw new UnsupportedOperationException(
String.format(
@@ -60,27 +60,12 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
}
}
- private void extractTabletInsertion(PipeRealtimeEvent event) {
+ private void extractTabletInsertion(final PipeRealtimeEvent event) {
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TABLET);
-
- if (!pendingQueue.waitedOffer(event)) {
- // this would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s
"
- + "has reached capacity, discard tablet event %s, current
state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // ignore this event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
- private void extractTsFileInsertion(PipeRealtimeEvent event) {
+ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) event.getEvent();
if (!(tsFileInsertionEvent.isLoaded())) {
@@ -91,22 +76,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
}
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
-
- if (!pendingQueue.waitedOffer(event)) {
- // this would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s
"
- + "has reached capacity, discard loaded tsFile event %s,
current state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // ignore this event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 204c9c87d14..1e47a48e500 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -423,21 +423,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
return;
}
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // pendingQueue is unbounded, so it should never reach capacity.
- LOGGER.error(
- "extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
- + "has reached capacity, discard heartbeat event {}",
- this,
- event);
-
- // Do not report exception since the PipeHeartbeatEvent doesn't affect
- // the correction of pipe progress.
-
- // Ignore this event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
@@ -449,24 +435,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
return;
}
- extractDirectly(event);
- }
-
- protected void extractDirectly(final PipeRealtimeEvent event) {
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // Pending is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of %s %s " + "has reached capacity,
discard event %s",
- this.getClass().getSimpleName(), this, event);
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
}
protected void maySkipIndex4Event(final PipeRealtimeEvent event) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 8f74bc63fb3..98bfb30391a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -46,7 +46,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
}
if (event.getEvent() instanceof PipeDeleteDataNodeEvent) {
- extractDirectly(event);
+ pendingQueue.offer(event);
return;
}
@@ -59,21 +59,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
return;
}
- if (!pendingQueue.waitedOffer(event)) {
- // This would not happen, but just in case.
- // Pending is unbounded, so it should never reach capacity.
- final String errorMessage =
- String.format(
- "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor
%s "
- + "has reached capacity, discard TsFile event %s, current
state %s",
- this, event, event.getTsFileEpoch().getState(this));
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
-
- // Ignore the event.
-
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
false);
- }
+ pendingQueue.offer(event);
event.getTsFileEpoch().clearState(this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index e4a3a545a58..b1677c9ffe6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -65,7 +65,7 @@ public class TsFileDeduplicationBlockingPendingQueue extends
SubscriptionBlockin
@Override
public void directOffer(final Event event) {
- inputPendingQueue.directOffer(event);
+ inputPendingQueue.offer(event);
}
private synchronized Event filter(final Event event) { // make it
synchronized
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index aa93b093254..8773b03f9f3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -50,26 +50,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
this.eventCounter = eventCounter;
}
- public boolean waitedOffer(final E event) {
- checkBeforeOffer(event);
- try {
- final boolean offered =
- pendingQueue.offer(
- event,
-
PIPE_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(),
- TimeUnit.MILLISECONDS);
- if (offered) {
- eventCounter.increaseEventCount(event);
- }
- return offered;
- } catch (final InterruptedException e) {
- LOGGER.info("pending queue offer is interrupted.", e);
- Thread.currentThread().interrupt();
- return false;
- }
- }
-
- public boolean directOffer(final E event) {
+ public boolean offer(final E event) {
checkBeforeOffer(event);
final boolean offered = pendingQueue.offer(event);
if (offered) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 0d6e7fcdad5..6631f60a65c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -477,10 +477,17 @@ public abstract class TreePattern {
final List<TreePattern> sortedPatterns = new ArrayList<>(patterns);
sortedPatterns.sort(
(o1, o2) -> {
+ final List<PartialPath> p1List = o1.getBaseInclusionPaths();
+ final List<PartialPath> p2List = o2.getBaseInclusionPaths();
+
+ if (p1List.isEmpty()) {
+ return p2List.isEmpty() ? 1 : -1;
+ }
+
// We can only approximate comparison here since TreePattern
represents multiple paths.
// We use the first inclusion path as a representative.
- final PartialPath p1 = o1.getBaseInclusionPaths().get(0);
- final PartialPath p2 = o2.getBaseInclusionPaths().get(0);
+ final PartialPath p1 = p1List.get(0);
+ final PartialPath p2 = p2List.get(0);
// 1. Length: Shorter is generally broader (e.g., root.** vs
root.sg.d1)
final int lenCompare = Integer.compare(p1.getNodeLength(),
p2.getNodeLength());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 76c145d0dab..61250eaa038 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -374,7 +374,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
if (Boolean.TRUE.equals(nextClientAndStatus.getRight())
- && clientAndStatus.getLeft() != null) {
+ && nextClientAndStatus.getLeft() != null) {
return nextClientAndStatus;
}
}