This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 5c2c080bc23 Active Load: Add scan filtering judgment & Fix failed 
directory without hot reload & Log downgrade & Remove rounds for scheduled 
tasks (#13310) (#13317)
5c2c080bc23 is described below

commit 5c2c080bc23e9db02d37e734d3cfe10b7e9cacaf
Author: YC27 <[email protected]>
AuthorDate: Tue Aug 27 19:49:30 2024 +0800

    Active Load: Add scan filtering judgment & Fix failed directory without hot 
reload & Log downgrade & Remove rounds for scheduled tasks (#13310) (#13317)
---
 .../load/active/ActiveLoadDirScanner.java          |  3 +-
 .../load/active/ActiveLoadPendingQueue.java        |  4 +++
 .../active/ActiveLoadScheduledExecutorService.java |  9 +-----
 .../load/active/ActiveLoadTsFileLoader.java        | 33 +++++++++++-----------
 4 files changed, 24 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 8c2a2787529..22375261e13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -85,6 +85,7 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
                   (file.getName().endsWith(RESOURCE) || 
file.getName().endsWith(MODS))
                       ? getTsFilePath(file.getAbsolutePath())
                       : file.getAbsolutePath())
+          .filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file))
           .filter(this::isTsFileCompleted)
           .limit(currentAllowedPendingSize)
           .forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, 
isGeneratedByPipe));
@@ -161,7 +162,7 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
       }
       
ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]);
     } catch (final IOException e) {
-      LOGGER.warn("Failed to count active listening dirs file number.", e);
+      LOGGER.debug("Failed to count active listening dirs file number.", e);
     }
     return fileCount[0];
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 667266feb5a..f04d846c6ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -63,6 +63,10 @@ public class ActiveLoadPendingQueue {
     ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1);
   }
 
+  public synchronized boolean isFilePendingOrLoading(final String file) {
+    return loadingFileSet.contains(file) || pendingFileSet.contains(file);
+  }
+
   public int size() {
     return pendingFileQueue.size() + loadingFileSet.size();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
index c7b62b9bab7..610918e5a13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -47,7 +47,6 @@ public abstract class ActiveLoadScheduledExecutorService {
       IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
   private final ScheduledExecutorService scheduledExecutorService;
   private Future<?> future;
-  private long rounds;
 
   private final List<Pair<WrappedRunnable, Long>> jobs = new 
CopyOnWriteArrayList<>();
 
@@ -74,8 +73,6 @@ public abstract class ActiveLoadScheduledExecutorService {
 
   public synchronized void start() {
     if (future == null) {
-      rounds = 0;
-
       future =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
               scheduledExecutorService,
@@ -88,12 +85,8 @@ public abstract class ActiveLoadScheduledExecutorService {
   }
 
   private void execute() {
-    ++rounds;
-
     for (final Pair<WrappedRunnable, Long> periodicalJob : jobs) {
-      if (rounds % periodicalJob.right == 0) {
-        periodicalJob.left.run();
-      }
+      periodicalJob.left.run();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index b130e58e2f1..e237dc2077f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
@@ -61,6 +62,8 @@ public class ActiveLoadTsFileLoader {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadTsFileLoader.class);
 
+  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
   private static final int MAX_PENDING_SIZE = 1000;
   private final ActiveLoadPendingQueue pendingQueue = new 
ActiveLoadPendingQueue();
 
@@ -80,20 +83,19 @@ public class ActiveLoadTsFileLoader {
   }
 
   private void initFailDirIfNecessary() {
-    if (failDir.get() == null) {
+    if (!Objects.equals(failDir.get(), 
IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
       synchronized (failDir) {
-        if (failDir.get() == null) {
-          final File failDirFile =
-              new 
File(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+        if (!Objects.equals(failDir.get(), 
IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
+          final File failDirFile = new 
File(IOTDB_CONFIG.getLoadActiveListeningFailDir());
           try {
             FileUtils.forceMkdir(failDirFile);
           } catch (final IOException e) {
             LOGGER.warn(
                 "Error occurred during creating fail directory {} for active 
load.",
-                failDirFile,
+                failDirFile.getAbsoluteFile(),
                 e);
           }
-          
failDir.set(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+          failDir.set(IOTDB_CONFIG.getLoadActiveListeningFailDir());
         }
       }
     }
@@ -105,8 +107,8 @@ public class ActiveLoadTsFileLoader {
         if (activeLoadExecutor.get() == null) {
           activeLoadExecutor.set(
               new WrappedThreadPoolExecutor(
-                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
-                  
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
+                  IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum(),
+                  IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum(),
                   0L,
                   TimeUnit.SECONDS,
                   new LinkedBlockingQueue<>(),
@@ -117,9 +119,7 @@ public class ActiveLoadTsFileLoader {
     }
 
     final int targetCorePoolSize =
-        Math.min(
-            pendingQueue.size(),
-            
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum());
+        Math.min(pendingQueue.size(), 
IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum());
 
     if (activeLoadExecutor.get().getCorePoolSize() != targetCorePoolSize) {
       activeLoadExecutor.get().setCorePoolSize(targetCorePoolSize);
@@ -163,10 +163,7 @@ public class ActiveLoadTsFileLoader {
 
   private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
     final long maxRetryTimes =
-        Math.max(
-            1,
-            
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds()
-                << 1);
+        Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds() 
<< 1);
     long currentRetryTimes = 0;
 
     while (true) {
@@ -200,7 +197,7 @@ public class ActiveLoadTsFileLoader {
             "",
             ClusterPartitionFetcher.getInstance(),
             ClusterSchemaFetcher.getInstance(),
-            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+            IOTDB_CONFIG.getQueryTimeoutThreshold())
         .status;
   }
 
@@ -258,6 +255,10 @@ public class ActiveLoadTsFileLoader {
     }
   }
 
+  public boolean isFilePendingOrLoading(final String filePath) {
+    return pendingQueue.isFilePendingOrLoading(filePath);
+  }
+
   // Metrics
   public long countAndReportFailedFileNumber() {
     final long[] fileCount = {0};

Reply via email to