This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch hybrid-connot-unpin in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6dd47bdd465144acdb4451ef3c7b4853f7ae49b9 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Oct 13 13:26:15 2023 +0800 refactor --- ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java | 7 +- .../thrift/async/IoTDBThriftAsyncConnector.java | 5 +- .../PipeRealtimeDataRegionHybridExtractor.java | 112 ++++++++++++++------- .../pipe/extractor/realtime/epoch/TsFileEpoch.java | 3 +- .../realtime/epoch/TsFileEpochManager.java | 4 +- .../db/pipe/resource/wal/PipeWALResource.java | 2 +- .../apache/iotdb/commons/conf/CommonConfig.java | 4 +- 7 files changed, 95 insertions(+), 42 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java index 2993b88da14..9de0dc84678 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java @@ -48,7 +48,8 @@ public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends PipeTransferBat throws IOException, WALPipeException { final TPipeTransferReq req = buildTabletInsertionReq(event); - if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) { + if (requestCommitIds.isEmpty() + || !requestCommitIds.get(requestCommitIds.size() - 1).equals(requestCommitId)) { reqs.add(req); if (event instanceof EnrichedEvent) { @@ -86,4 +87,8 @@ public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends PipeTransferBat public List<Long> deepcopyRequestCommitIds() { return new ArrayList<>(requestCommitIds); } + + public long getLastCommitId() { + return requestCommitIds.isEmpty() ? -1 : requestCommitIds.get(requestCommitIds.size() - 1); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index 42cdd8e7818..84b8865604c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@ -487,7 +487,10 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { return; } - final long requestCommitId = commitIdGenerator.incrementAndGet(); + // requestCommitId can not be generated by commitIdGenerator because the commit id must + // be bind to a specific InsertTabletEvent or TsFileInsertionEvent, otherwise the commit + // process will be stuck. + final long requestCommitId = tabletBatchBuilder.getLastCommitId(); final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler = new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index a452577858f..257479a1d62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -86,7 +86,21 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio // size of wal buffer), the write operation will be throttled, so we should not extract any // more tablet events. // 3. The number of tsfile events in the pending queue has exceeded the limit. - event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); + event + .getTsFileEpoch() + .migrateState( + this, + state -> { + switch (state) { + case EMPTY: + case USING_TSFILE: + return TsFileEpoch.State.USING_TSFILE; + case USING_TABLET: + case USING_BOTH: + default: + return TsFileEpoch.State.USING_BOTH; + } + }); } final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); @@ -97,6 +111,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio break; case EMPTY: case USING_TABLET: + case USING_BOTH: if (!pendingQueue.waitedOffer(event)) { // this would not happen, but just in case. // pendingQueue is unbounded, so it should never reach capacity. @@ -127,13 +142,24 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio .getTsFileEpoch() .migrateState( this, - state -> - state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state); + state -> { + switch (state) { + case EMPTY: + case USING_TSFILE: + return TsFileEpoch.State.USING_TSFILE; + case USING_TABLET: + return TsFileEpoch.State.USING_TABLET; + case USING_BOTH: + default: + return TsFileEpoch.State.USING_BOTH; + } + }); final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); switch (state) { case EMPTY: case USING_TSFILE: + case USING_BOTH: if (!pendingQueue.waitedOffer(event)) { // this would not happen, but just in case. // pendingQueue is unbounded, so it should never reach capacity. @@ -265,23 +291,29 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio state -> (state.equals(TsFileEpoch.State.EMPTY)) ? TsFileEpoch.State.USING_TABLET : state); - if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) { - if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { - return event.getEvent(); - } else { - // if the event's reference count can not be increased, it means the data represented by - // this event is not reliable anymore. but the data represented by this event - // has been carried by the following tsfile event, so we can just discard this event. - event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); - LOGGER.warn( - "Discard tablet event {} because it is not reliable anymore. " - + "Change the state of TsFileEpoch to USING_TSFILE.", - event); + final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); + switch (state) { + case USING_TSFILE: + // if the state is USING_TSFILE, discard the event and poll the next one. return null; - } + case EMPTY: + case USING_TABLET: + case USING_BOTH: + default: + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { + return event.getEvent(); + } else { + // if the event's reference count can not be increased, it means the data represented by + // this event is not reliable anymore. but the data represented by this event + // has been carried by the following tsfile event, so we can just discard this event. + event.getTsFileEpoch().migrateState(this, s -> TsFileEpoch.State.USING_BOTH); + LOGGER.warn( + "Discard tablet event {} because it is not reliable anymore. " + + "Change the state of TsFileEpoch to USING_TSFILE.", + event); + return null; + } } - // if the state is USING_TSFILE, discard the event and poll the next one. - return null; } private Event supplyTsFileInsertion(PipeRealtimeEvent event) { @@ -299,26 +331,34 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return state; }); - if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) { - if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { - return event.getEvent(); - } else { - // if the event's reference count can not be increased, it means the data represented by - // this event is not reliable anymore. the data has been lost. we simply discard this event - // and report the exception to PipeRuntimeAgent. - final String errorMessage = - String.format( - "TsFile Event %s can not be supplied because " - + "the reference count can not be increased, " - + "the data represented by this event is lost", - event.getEvent()); - LOGGER.error(errorMessage); - PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); + switch (state) { + case USING_TABLET: + // if the state is USING_TABLET, discard the event and poll the next one. return null; - } + case EMPTY: + case USING_TSFILE: + case USING_BOTH: + default: + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { + return event.getEvent(); + } else { + // if the event's reference count can not be increased, it means the data represented by + // this event is not reliable anymore. the data has been lost. we simply discard this + // event + // and report the exception to PipeRuntimeAgent. + final String errorMessage = + String.format( + "TsFile Event %s can not be supplied because " + + "the reference count can not be increased, " + + "the data represented by this event is lost", + event.getEvent()); + LOGGER.error(errorMessage); + PipeAgent.runtime() + .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + return null; + } } - // if the state is USING_TABLET, discard the event and poll the next one. - return null; } private Event supplyHeartbeat(PipeRealtimeEvent event) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java index 55a500bb1ed..f57408a9058 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java @@ -63,6 +63,7 @@ public class TsFileEpoch { public enum State { EMPTY, USING_TABLET, - USING_TSFILE + USING_TSFILE, + USING_BOTH } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java index cfa57760985..5b92828c00c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java @@ -53,9 +53,11 @@ public class TsFileEpochManager { return new TsFileEpoch(path); }); + final TsFileEpoch epoch = filePath2Epoch.remove(filePath); + LOGGER.info("All data in TsFileEpoch {} was extracted", epoch); return new PipeRealtimeEvent( event, - filePath2Epoch.remove(filePath), + epoch, resource.getDevices().stream() .collect(Collectors.toMap(device -> device, device -> EMPTY_MEASUREMENT_ARRAY)), event.getPattern()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java index 358e319dafa..951ad738460 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java @@ -40,7 +40,7 @@ public abstract class PipeWALResource implements Closeable { private final AtomicInteger referenceCount; - public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 60; + public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 20; private final AtomicLong lastLogicalPinTime; private final AtomicBoolean isPhysicallyPinned; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index ccd3e8e8c90..e6bf01dff7f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -170,7 +170,9 @@ public class CommonConfig { private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes private int pipeConnectorReadFileBufferSize = 8388608; private long pipeConnectorRetryIntervalMs = 1000L; - private int pipeConnectorPendingQueueSize = 16; + // recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum * + // pipeAsyncConnectorCoreClientNumber + private int pipeConnectorPendingQueueSize = 256; private boolean pipeConnectorRPCThriftCompressionEnabled = false; private int pipeAsyncConnectorSelectorNumber = 1;
