This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 809b3aaff8e Pipe: Fixed the bug that a failed check sum may
permanently disable an air gap receiver & Optimized the air gap file offset
logic & Fixed the bug that not connected socket may die forever in "send"
(#14384)
809b3aaff8e is described below
commit 809b3aaff8ee5b1296ed516c438a3c9ca18bac5d
Author: Caideyipi <[email protected]>
AuthorDate: Fri Dec 13 12:55:54 2024 +0800
Pipe: Fixed the bug that a failed check sum may permanently disable an air
gap receiver & Optimized the air gap file offset logic & Fixed the bug that not
connected socket may die forever in "send" (#14384)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../api/exception/PipeConnectionException.java | 4 +-
.../iotdb/pipe/api/exception/PipeException.java | 6 +--
.../subtask/connector/PipeConnectorSubtask.java | 34 -------------
.../pipeconsensus/PipeConsensusAsyncConnector.java | 4 +-
.../pipeconsensus/PipeConsensusSyncConnector.java | 56 +++++++++++-----------
.../protocol/airgap/IoTDBAirGapReceiver.java | 45 ++++++++++++++---
.../protocol/airgap/IoTDBAirGapReceiverAgent.java | 7 ++-
.../task/subtask/SubscriptionConnectorSubtask.java | 9 ----
.../iotdb/commons/concurrent/ThreadName.java | 4 ++
.../PipeRuntimeConnectorCriticalException.java | 14 +++---
...meConnectorRetryTimesConfigurableException.java | 3 +-
.../pipe/PipeRuntimeCriticalException.java | 14 +++---
.../exception/pipe/PipeRuntimeException.java | 10 ++--
.../connector/protocol/IoTDBAirGapConnector.java | 6 ++-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 37 +++++++-------
.../commons/pipe/receiver/IoTDBReceiverAgent.java | 6 +--
16 files changed, 132 insertions(+), 127 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
index dc9d5e32968..ded27cc433a 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
@@ -24,11 +24,11 @@ public class PipeConnectionException extends PipeException {
public static final String CONNECTION_ERROR_FORMATTER =
"Error occurred while connecting to receiver %s:%s, please check network
connectivity or SSL configurations when enable SSL transmission";
- public PipeConnectionException(String message) {
+ public PipeConnectionException(final String message) {
super(message);
}
- public PipeConnectionException(String message, Throwable cause) {
+ public PipeConnectionException(final String message, final Throwable cause) {
super(message, cause);
}
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
index 3291f87be0d..7be6d6a5560 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
@@ -23,17 +23,17 @@ public class PipeException extends RuntimeException {
private final long timeStamp;
- public PipeException(String message) {
+ public PipeException(final String message) {
super(message);
this.timeStamp = System.currentTimeMillis();
}
- public PipeException(String message, long timeStamp) {
+ public PipeException(final String message, final long timeStamp) {
super(message);
this.timeStamp = timeStamp;
}
- public PipeException(String message, Throwable cause) {
+ public PipeException(final String message, final Throwable cause) {
super(message, cause);
this.timeStamp = System.currentTimeMillis();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 6171e50b95c..cc2d5b5b346 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -44,9 +44,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorSubtask.class);
@@ -295,37 +292,6 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
: 0;
}
- // For performance, this will not acquire lock and does not guarantee the
correct
- // result. However, this shall not cause any exceptions when concurrently
read & written.
- public int getEventCount(final String pipeName) {
- final AtomicInteger count = new AtomicInteger(0);
- try {
- inputPendingQueue.forEach(
- event -> {
- if (event instanceof EnrichedEvent
- && pipeName.equals(((EnrichedEvent) event).getPipeName())) {
- count.incrementAndGet();
- }
- });
- } catch (final Exception e) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Exception occurred when counting event of pipe {}, root cause:
{}",
- pipeName,
- ErrorHandlingUtils.getRootCause(e).getMessage(),
- e);
- }
- }
- // Avoid potential NPE in "getPipeName"
- final EnrichedEvent event =
- lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
- return count.get()
- + (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
- ? ((IoTDBDataRegionAsyncConnector)
outputPipeConnector).getRetryEventCount(pipeName)
- : 0)
- + (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1
: 0);
- }
-
//////////////////////////// Error report ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 4f56d1a847b..c0fa792da79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -406,8 +406,8 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
// Transfer deletion
if (event instanceof PipeDeleteDataNodeEvent) {
- PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent)
event;
- boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent);
+ final PipeDeleteDataNodeEvent deleteDataNodeEvent =
(PipeDeleteDataNodeEvent) event;
+ final boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent);
if (!enqueueResult) {
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 01b406674c9..c00527ec38d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -82,10 +82,10 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;
public PipeConsensusSyncConnector(
- List<TEndPoint> peers,
- int consensusGroupId,
- int thisDataNodeId,
- PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
+ final List<TEndPoint> peers,
+ final int consensusGroupId,
+ final int thisDataNodeId,
+ final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
// In PipeConsensus, one pipeConsensusTask corresponds to a
pipeConsensusConnector. Thus,
// `peers` here actually is a singletonList that contains one peer's
TEndPoint. But here we
// retain the implementation of list to cope with possible future expansion
@@ -98,7 +98,8 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
}
@Override
- public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
throws Exception {
super.customize(parameters, configuration);
if (isTabletBatchModeEnabled) {
@@ -125,7 +126,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
}
@Override
- public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
// Note: here we don't need to do type judgment here, because
PipeConsensus uses
// PIPE_CONSENSUS_PROCESSOR and will not change the event type like
//
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
@@ -135,12 +136,12 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
doTransfer();
}
} else {
- long startTime = System.nanoTime();
+ final long startTime = System.nanoTime();
doTransferWrapper((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
- long duration = System.nanoTime() - startTime;
+ final long duration = System.nanoTime() - startTime;
pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeConnectionException(
String.format(
"Failed to transfer tablet insertion event %s, because %s.",
@@ -150,18 +151,18 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
}
@Override
- public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
+ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
// Note: here we don't need to do type judgment here, because
PipeConsensus uses DO_NOTHING
// processor and will not change the event type like
//
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
try {
- long startTime = System.nanoTime();
+ final long startTime = System.nanoTime();
// In order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
doTransfer();
}
doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
- long duration = System.nanoTime() - startTime;
+ final long duration = System.nanoTime() - startTime;
pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
} catch (Exception e) {
throw new PipeConnectionException(
@@ -173,7 +174,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
}
@Override
- public void transfer(Event event) throws Exception {
+ public void transfer(final Event event) throws Exception {
// in order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
doTransfer();
@@ -209,7 +210,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
// }
tabletBatchBuilder.onSuccess();
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeConnectionException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -240,12 +241,12 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
throws PipeException {
final ProgressIndex progressIndex;
final TPipeConsensusTransferResp resp;
- TCommitId tCommitId =
+ final TCommitId tCommitId =
new TCommitId(
pipeDeleteDataNodeEvent.getCommitId(),
pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(),
pipeDeleteDataNodeEvent.getRebootTimes());
- TConsensusGroupId tConsensusGroupId =
+ final TConsensusGroupId tConsensusGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
@@ -259,7 +260,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
tConsensusGroupId,
progressIndex,
thisDataNodeId));
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeConnectionException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -310,12 +311,12 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
final InsertNode insertNode;
final ProgressIndex progressIndex;
final TPipeConsensusTransferResp resp;
- TCommitId tCommitId =
+ final TCommitId tCommitId =
new TCommitId(
pipeInsertNodeTabletInsertionEvent.getCommitId(),
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
pipeInsertNodeTabletInsertionEvent.getRebootTimes());
- TConsensusGroupId tConsensusGroupId =
+ final TConsensusGroupId tConsensusGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
@@ -338,7 +339,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
progressIndex,
thisDataNodeId));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeConnectionException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -362,7 +363,8 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
}
}
- private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
throws PipeException {
+ private void doTransfer(final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
+ throws PipeException {
final File tsFile = pipeTsFileInsertionEvent.getTsFile();
final File modFile = pipeTsFileInsertionEvent.getModFile();
final TPipeConsensusTransferResp resp;
@@ -411,7 +413,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
pipeTsFileInsertionEvent.getProgressIndex(),
thisDataNodeId));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeConnectionException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -436,11 +438,11 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
}
protected void transferFilePieces(
- File file,
- SyncPipeConsensusServiceClient syncPipeConsensusServiceClient,
- boolean isMultiFile,
- TCommitId tCommitId,
- TConsensusGroupId tConsensusGroupId)
+ final File file,
+ final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient,
+ final boolean isMultiFile,
+ final TCommitId tCommitId,
+ final TConsensusGroupId tConsensusGroupId)
throws PipeException, IOException {
final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index d24e097badf..96b07b4a678 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -109,9 +109,20 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
try {
final byte[] data = readData(inputStream);
+ // If check sum failed, it indicates that the length we read may not be
correct.
+ // Namely, there may be remaining bytes in the socket stream, which will
fail any subsequent
+ // attempts to read from that.
+ // We directly close the socket here.
if (!checkSum(data)) {
- LOGGER.warn("Checksum failed, receiverId: {}", receiverId);
- fail();
+ LOGGER.warn(
+ "Pipe air gap receiver {} closed because of checksum failed.
Socket: {}",
+ receiverId,
+ socket);
+ try {
+ fail();
+ } finally {
+ socket.close();
+ }
return;
}
@@ -134,21 +145,31 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
|| status.getCode()
==
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
LOGGER.info(
- "TSStatus:{} is encountered at the air gap receiver, will
ignore.", resp.getStatus());
+ "Pipe air gap receiver {}: TSStatus {} is encountered at the air
gap receiver, will ignore.",
+ receiverId,
+ resp.getStatus());
ok();
} else {
LOGGER.warn(
- "Handle data failed, receiverId: {}, status: {}, req: {}",
+ "Pipe air gap receiver {}: Handle data failed, status: {}, req:
{}",
receiverId,
resp.getStatus(),
req);
fail();
}
} catch (final PipeConnectionException e) {
- LOGGER.info("Socket closed when listening to data. Because: {}",
e.getMessage());
+ LOGGER.info(
+ "Pipe air gap receiver {}: Socket {} closed when listening to data.
Because: {}",
+ receiverId,
+ socket,
+ e.getMessage());
socket.close();
} catch (final Exception e) {
- LOGGER.warn("Exception during handling receiving, receiverId: {}",
receiverId, e);
+ LOGGER.warn(
+ "Pipe air gap receiver {}: Exception during handling receiving.
Socket: {}",
+ receiverId,
+ socket,
+ e);
fail();
}
}
@@ -169,7 +190,17 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
try {
final CRC32 crc32 = new CRC32();
crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
- return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN))
== crc32.getValue();
+
+ final long expectedChecksum =
BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN));
+ final long actualChecksum = crc32.getValue();
+ if (expectedChecksum != actualChecksum) {
+ LOGGER.warn(
+ "Pipe air gap receiver {}: checksum failed, expected: {}, actual:
{}",
+ receiverId,
+ expectedChecksum,
+ actualChecksum);
+ }
+ return expectedChecksum == actualChecksum;
} catch (final Exception e) {
// ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
index 3097b0fcac8..644bda1835c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
@@ -52,7 +52,12 @@ public class IoTDBAirGapReceiverAgent implements IService {
public void listen() {
try {
final Socket socket = serverSocket.accept();
- new Thread(new IoTDBAirGapReceiver(socket,
receiverId.incrementAndGet())).start();
+ final long airGapReceiverId = receiverId.incrementAndGet();
+ final Thread airGapReceiverThread =
+ new Thread(new IoTDBAirGapReceiver(socket, airGapReceiverId));
+ airGapReceiverThread.setName(
+ ThreadName.PIPE_AIR_GAP_RECEIVER.getName() + "-" + airGapReceiverId);
+ airGapReceiverThread.start();
} catch (final IOException e) {
LOGGER.warn("Unhandled exception during pipe air gap receiver
listening", e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index 9b7587381d9..bf1155b4b58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -93,13 +93,4 @@ public class SubscriptionConnectorSubtask extends
PipeConnectorSubtask {
return SubscriptionAgent.broker().executePrefetch(consumerGroupId,
topicName);
}
-
- //////////////////////////// APIs provided for metric framework
////////////////////////////
-
- @Override
- public int getEventCount(final String pipeName) {
- // count the number of pipe events in sink queue and prefetching queue,
note that can safely
- // ignore lastEvent
- return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId,
topicName);
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index fee09496730..ec4a1a7c2cd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -144,6 +144,7 @@ public enum ThreadName {
"Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
+ PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
@@ -295,9 +296,12 @@ public enum ThreadName {
PIPE_RUNTIME_HEARTBEAT,
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
+ PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER,
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_RECEIVER_AIR_GAP_AGENT,
+ PIPE_AIR_GAP_RECEIVER,
SUBSCRIPTION_EXECUTOR_POOL,
+ SUBSCRIPTION_RUNTIME_META_SYNCER,
WINDOW_EVALUATION_SERVICE,
STATEFUL_TRIGGER_INFORMATION_UPDATER));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
index d33cfc16514..f5e2d22f158 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
@@ -31,16 +31,16 @@ import java.util.Objects;
public class PipeRuntimeConnectorCriticalException extends
PipeRuntimeCriticalException {
- public PipeRuntimeConnectorCriticalException(String message) {
+ public PipeRuntimeConnectorCriticalException(final String message) {
super(message);
}
- public PipeRuntimeConnectorCriticalException(String message, long timeStamp)
{
+ public PipeRuntimeConnectorCriticalException(final String message, final
long timeStamp) {
super(message, timeStamp);
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
return obj instanceof PipeRuntimeConnectorCriticalException
&& Objects.equals(getMessage(),
((PipeRuntimeConnectorCriticalException) obj).getMessage())
&& Objects.equals(getTimeStamp(), ((PipeRuntimeException)
obj).getTimeStamp());
@@ -52,21 +52,21 @@ public class PipeRuntimeConnectorCriticalException extends
PipeRuntimeCriticalEx
}
@Override
- public void serialize(ByteBuffer byteBuffer) {
+ public void serialize(final ByteBuffer byteBuffer) {
PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(byteBuffer);
ReadWriteIOUtils.write(getMessage(), byteBuffer);
ReadWriteIOUtils.write(getTimeStamp(), byteBuffer);
}
@Override
- public void serialize(OutputStream stream) throws IOException {
+ public void serialize(final OutputStream stream) throws IOException {
PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(stream);
ReadWriteIOUtils.write(getMessage(), stream);
ReadWriteIOUtils.write(getTimeStamp(), stream);
}
public static PipeRuntimeConnectorCriticalException deserializeFrom(
- PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
+ final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
final String message = ReadWriteIOUtils.readString(byteBuffer);
switch (version) {
case VERSION_1:
@@ -80,7 +80,7 @@ public class PipeRuntimeConnectorCriticalException extends
PipeRuntimeCriticalEx
}
public static PipeRuntimeConnectorCriticalException deserializeFrom(
- PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
+ final PipeRuntimeMetaVersion version, final InputStream stream) throws
IOException {
final String message = ReadWriteIOUtils.readString(stream);
switch (version) {
case VERSION_1:
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
index d5c70b9eb90..42244ea7d4b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
@@ -24,7 +24,8 @@ public class
PipeRuntimeConnectorRetryTimesConfigurableException
private final int retryTimes;
- public PipeRuntimeConnectorRetryTimesConfigurableException(String message,
int retryTimes) {
+ public PipeRuntimeConnectorRetryTimesConfigurableException(
+ final String message, final int retryTimes) {
super(message);
this.retryTimes = retryTimes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
index aa725d94f74..c6db38e23fa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
@@ -31,37 +31,37 @@ import java.util.Objects;
public class PipeRuntimeCriticalException extends PipeRuntimeException {
- public PipeRuntimeCriticalException(String message) {
+ public PipeRuntimeCriticalException(final String message) {
super(message);
}
- public PipeRuntimeCriticalException(String message, long timeStamp) {
+ public PipeRuntimeCriticalException(final String message, final long
timeStamp) {
super(message, timeStamp);
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
return obj instanceof PipeRuntimeCriticalException
&& Objects.equals(getMessage(), ((PipeRuntimeCriticalException)
obj).getMessage())
&& Objects.equals(getTimeStamp(), ((PipeRuntimeException)
obj).getTimeStamp());
}
@Override
- public void serialize(ByteBuffer byteBuffer) {
+ public void serialize(final ByteBuffer byteBuffer) {
PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(byteBuffer);
ReadWriteIOUtils.write(getMessage(), byteBuffer);
ReadWriteIOUtils.write(getTimeStamp(), byteBuffer);
}
@Override
- public void serialize(OutputStream stream) throws IOException {
+ public void serialize(final OutputStream stream) throws IOException {
PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(stream);
ReadWriteIOUtils.write(getMessage(), stream);
ReadWriteIOUtils.write(getTimeStamp(), stream);
}
public static PipeRuntimeCriticalException deserializeFrom(
- PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
+ final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
final String message = ReadWriteIOUtils.readString(byteBuffer);
switch (version) {
case VERSION_1:
@@ -74,7 +74,7 @@ public class PipeRuntimeCriticalException extends
PipeRuntimeException {
}
public static PipeRuntimeCriticalException deserializeFrom(
- PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
+ final PipeRuntimeMetaVersion version, final InputStream stream) throws
IOException {
final String message = ReadWriteIOUtils.readString(stream);
switch (version) {
case VERSION_1:
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
index 3bcc96b8e85..24fa4a1edda 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
@@ -28,16 +28,16 @@ import java.util.Objects;
public abstract class PipeRuntimeException extends PipeException {
- protected PipeRuntimeException(String message) {
+ protected PipeRuntimeException(final String message) {
super(message);
}
- protected PipeRuntimeException(String message, long timeStamp) {
+ protected PipeRuntimeException(final String message, final long timeStamp) {
super(message, timeStamp);
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
return obj instanceof PipeRuntimeException
&& Objects.equals(getMessage(), ((PipeRuntimeException)
obj).getMessage())
&& Objects.equals(getTimeStamp(), ((PipeRuntimeException)
obj).getTimeStamp());
@@ -48,7 +48,7 @@ public abstract class PipeRuntimeException extends
PipeException {
return Objects.hash(getMessage(), getTimeStamp());
}
- public abstract void serialize(ByteBuffer byteBuffer);
+ public abstract void serialize(final ByteBuffer byteBuffer);
- public abstract void serialize(OutputStream stream) throws IOException;
+ public abstract void serialize(final OutputStream stream) throws IOException;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 8972fb424b3..a404b202cfd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -40,6 +40,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -62,7 +63,7 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
private final TEndPoint endPoint;
- public AirGapSocket(String ip, int port) {
+ public AirGapSocket(final String ip, final int port) {
this.endPoint = new TEndPoint(ip, port);
}
@@ -292,7 +293,8 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
final String pipeName, final long creationTime, final AirGapSocket
socket, byte[] bytes)
throws IOException {
if (!socket.isConnected()) {
- return false;
+ throw new SocketException(
+ String.format("Socket %s is closed, will try to handshake", socket));
}
bytes = compressIfNeeded(bytes);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 1ab1be21454..872c1bce436 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -288,16 +288,18 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
try {
updateWritingFileIfNeeded(req.getFileName(), isSingleFile);
+ // If the request is through air gap, the sender will resend the file
piece from the beginning
+ // of the file. So the receiver should reset the offset of the writing
file to the beginning
+ // of the file.
+ if (isRequestThroughAirGap && req.getStartWritingOffset() <
writingFileWriter.length()) {
+ writingFileWriter.setLength(req.getStartWritingOffset());
+ }
+
if (!isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
- if (isRequestThroughAirGap
- || !writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- // 1. If the request is through air gap, the sender will resend the
file piece from the
- // beginning of the file. So the receiver should reset the offset of
the writing file to
- // the beginning of the file.
- // 2. If the file is a tsFile, then the content will not be changed
for a specific
- // filename. However, for other files (mod, snapshot, etc.) the
content varies for the
- // same name in different times, then we must rewrite the file to
apply the newest
- // version.
+ if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ // If the file is a tsFile, then the content will not be changed for
a specific filename.
+ // However, for other files (mod, snapshot, etc.) the content varies
for the same name in
+ // different times, then we must rewrite the file to apply the
newest version.
writingFileWriter.setLength(0);
}
@@ -317,7 +319,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
writingFileWriter.write(req.getFilePiece());
return PipeTransferFilePieceResp.toTPipeTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to write file piece from req {}.",
receiverId.get(), req, e);
final TSStatus status =
@@ -377,8 +379,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
writingFile.getPath());
}
- private boolean isFileExistedAndNameCorrect(String fileName) {
- return writingFile != null && writingFile.getName().equals(fileName);
+ private boolean isFileExistedAndNameCorrect(final String fileName) {
+ return writingFile != null && writingFile.exists() &&
writingFile.getName().equals(fileName);
}
private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
@@ -393,7 +395,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
writingFile == null ? 0 : writingFile.length());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to close current writing file writer {},
because {}.",
receiverId.get(),
@@ -414,6 +416,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
private void deleteCurrentWritingFile() {
if (writingFile != null) {
deleteFile(writingFile);
+ writingFile = null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -422,7 +425,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
}
- private void deleteFile(File file) {
+ private void deleteFile(final File file) {
if (file.exists()) {
try {
FileUtils.delete(file);
@@ -430,7 +433,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Receiver id = {}: Original writing file {} was deleted.",
receiverId.get(),
file.getPath());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to delete original writing file {},
because {}.",
receiverId.get(),
@@ -512,7 +515,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
status.getMessage());
}
return new TPipeTransferResp(status);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(),
@@ -597,7 +600,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
status);
}
return new TPipeTransferResp(status);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(), files, req, e);
return new TPipeTransferResp(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
index 97e582f2407..c09812f2003 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
@@ -104,7 +104,7 @@ public abstract class IoTDBReceiverAgent {
handleClientExit(null);
}
- public final void handleClientExit(String key) {
+ public final void handleClientExit(final String key) {
final IoTDBReceiver receiver = getReceiverWithSpecifiedClient(key);
if (receiver != null) {
receiver.handleExit();
@@ -116,14 +116,14 @@ public abstract class IoTDBReceiverAgent {
try {
FileUtils.deleteDirectory(receiverFileDir);
LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
}
try {
FileUtils.forceMkdir(receiverFileDir);
LOGGER.info("Create pipe receiver dir {} successfully.",
receiverFileDir);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
}
}