This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2cade679777 Active Load: Add scan filtering judgment & Fix failed
directory without hot reload & Log downgrade & Remove rounds for scheduled tasks
2cade679777 is described below
commit 2cade679777d44845ea04841635ee479bb3f1afc
Author: YC27 <[email protected]>
AuthorDate: Tue Aug 27 17:42:47 2024 +0800
Active Load: Add scan filtering judgment & Fix failed directory without hot
reload & Log downgrade & Remove rounds for scheduled tasks
---
.../load/active/ActiveLoadDirScanner.java | 3 +-
.../load/active/ActiveLoadPendingQueue.java | 4 +++
.../active/ActiveLoadScheduledExecutorService.java | 9 +-----
.../load/active/ActiveLoadTsFileLoader.java | 33 +++++++++++-----------
4 files changed, 24 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 8c2a2787529..22375261e13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -85,6 +85,7 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
(file.getName().endsWith(RESOURCE) ||
file.getName().endsWith(MODS))
? getTsFilePath(file.getAbsolutePath())
: file.getAbsolutePath())
+ .filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file))
.filter(this::isTsFileCompleted)
.limit(currentAllowedPendingSize)
.forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file,
isGeneratedByPipe));
@@ -161,7 +162,7 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
}
ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]);
} catch (final IOException e) {
- LOGGER.warn("Failed to count active listening dirs file number.", e);
+ LOGGER.debug("Failed to count active listening dirs file number.", e);
}
return fileCount[0];
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 667266feb5a..f04d846c6ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -63,6 +63,10 @@ public class ActiveLoadPendingQueue {
ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1);
}
+ public synchronized boolean isFilePendingOrLoading(final String file) {
+ return loadingFileSet.contains(file) || pendingFileSet.contains(file);
+ }
+
public int size() {
return pendingFileQueue.size() + loadingFileSet.size();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
index c7b62b9bab7..610918e5a13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -47,7 +47,6 @@ public abstract class ActiveLoadScheduledExecutorService {
IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
private final ScheduledExecutorService scheduledExecutorService;
private Future<?> future;
- private long rounds;
private final List<Pair<WrappedRunnable, Long>> jobs = new
CopyOnWriteArrayList<>();
@@ -74,8 +73,6 @@ public abstract class ActiveLoadScheduledExecutorService {
public synchronized void start() {
if (future == null) {
- rounds = 0;
-
future =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
scheduledExecutorService,
@@ -88,12 +85,8 @@ public abstract class ActiveLoadScheduledExecutorService {
}
private void execute() {
- ++rounds;
-
for (final Pair<WrappedRunnable, Long> periodicalJob : jobs) {
- if (rounds % periodicalJob.right == 0) {
- periodicalJob.left.run();
- }
+ periodicalJob.left.run();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index b130e58e2f1..e237dc2077f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
@@ -61,6 +62,8 @@ public class ActiveLoadTsFileLoader {
private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadTsFileLoader.class);
+ private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
private static final int MAX_PENDING_SIZE = 1000;
private final ActiveLoadPendingQueue pendingQueue = new
ActiveLoadPendingQueue();
@@ -80,20 +83,19 @@ public class ActiveLoadTsFileLoader {
}
private void initFailDirIfNecessary() {
- if (failDir.get() == null) {
+ if (!Objects.equals(failDir.get(),
IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
synchronized (failDir) {
- if (failDir.get() == null) {
- final File failDirFile =
- new
File(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ if (!Objects.equals(failDir.get(),
IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
+ final File failDirFile = new
File(IOTDB_CONFIG.getLoadActiveListeningFailDir());
try {
FileUtils.forceMkdir(failDirFile);
} catch (final IOException e) {
LOGGER.warn(
"Error occurred during creating fail directory {} for active
load.",
- failDirFile,
+ failDirFile.getAbsoluteFile(),
e);
}
-
failDir.set(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+ failDir.set(IOTDB_CONFIG.getLoadActiveListeningFailDir());
}
}
}
@@ -105,8 +107,8 @@ public class ActiveLoadTsFileLoader {
if (activeLoadExecutor.get() == null) {
activeLoadExecutor.set(
new WrappedThreadPoolExecutor(
-
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
-
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum(),
+ IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum(),
+ IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum(),
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
@@ -117,9 +119,7 @@ public class ActiveLoadTsFileLoader {
}
final int targetCorePoolSize =
- Math.min(
- pendingQueue.size(),
-
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningMaxThreadNum());
+ Math.min(pendingQueue.size(),
IOTDB_CONFIG.getLoadActiveListeningMaxThreadNum());
if (activeLoadExecutor.get().getCorePoolSize() != targetCorePoolSize) {
activeLoadExecutor.get().setCorePoolSize(targetCorePoolSize);
@@ -163,10 +163,7 @@ public class ActiveLoadTsFileLoader {
private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
final long maxRetryTimes =
- Math.max(
- 1,
-
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds()
- << 1);
+ Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds()
<< 1);
long currentRetryTimes = 0;
while (true) {
@@ -200,7 +197,7 @@ public class ActiveLoadTsFileLoader {
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+ IOTDB_CONFIG.getQueryTimeoutThreshold())
.status;
}
@@ -258,6 +255,10 @@ public class ActiveLoadTsFileLoader {
}
}
+ public boolean isFilePendingOrLoading(final String filePath) {
+ return pendingQueue.isFilePendingOrLoading(filePath);
+ }
+
// Metrics
public long countAndReportFailedFileNumber() {
final long[] fileCount = {0};