This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2937a005a0b288914f5e078c99841e0c77979bba Author: VGalaxies <[email protected]> AuthorDate: Wed May 7 11:26:22 2025 +0800 Subscription: detect outdated subscription event for tsfile message (#15430) (cherry picked from commit 9997004babb4bba11ef0e6b0ddd38c287a3800d9) --- .../base/AbstractSubscriptionConsumer.java | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index 83ff755ddfb..88fab21ef4f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -723,7 +723,7 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { final Path filePath = getFilePath(commitContext, topicName, fileName, true, true); final File file = filePath.toFile(); try (final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw")) { - return Optional.of(pollFileInternal(commitContext, fileName, file, fileWriter, timer)); + return pollFileInternal(commitContext, fileName, file, fileWriter, timer); } catch (final Exception e) { if (!(e instanceof SubscriptionPollTimeoutException)) { inFlightFilesCommitContextSet.remove(commitContext); @@ -736,7 +736,7 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { } } - private SubscriptionMessage pollFileInternal( + private Optional<SubscriptionMessage> pollFileInternal( final SubscriptionCommitContext commitContext, final String rawFileName, final File file, @@ -769,13 +769,10 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { final List<SubscriptionPollResponse> responses = pollFileInternal(commitContext, writingOffset, timer.remainingMs()); - // It's agreed that the server will always return at least one response, even in case of - // failure. + // If responses is empty, it means that some outdated subscription events may be being polled, + // so just return. if (responses.isEmpty()) { - final String errorMessage = - String.format("SubscriptionConsumer %s poll empty response", this); - LOGGER.warn(errorMessage); - throw new SubscriptionRuntimeNonCriticalException(errorMessage); + return Optional.empty(); } // only one SubscriptionEvent polled currently @@ -884,10 +881,11 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { // generate subscription message inFlightFilesCommitContextSet.remove(commitContext); - return new SubscriptionMessage( - commitContext, - file.getAbsolutePath(), - ((FileSealPayload) payload).getDatabaseName()); + return Optional.of( + new SubscriptionMessage( + commitContext, + file.getAbsolutePath(), + ((FileSealPayload) payload).getDatabaseName())); } case ERROR: {
