This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch compation-log
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/compation-log by this push:
new 48832f9afb5 modify StorageEngine
48832f9afb5 is described below
commit 48832f9afb581ecd90a8219f6db18323db1a158d
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 20 12:02:33 2025 +0800
modify StorageEngine
---
.../iotdb/db/storageengine/StorageEngine.java | 56 ++++++++++++----------
1 file changed, 30 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 18d43931ba4..f25e04045aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -112,7 +112,11 @@ import java.util.stream.Stream;
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
public class StorageEngine implements IService {
- private static final Logger LOGGER =
LoggerFactory.getLogger(StorageEngine.class);
+ private static final Logger OTHER_LOGGER =
LoggerFactory.getLogger(StorageEngine.class);
+ private static final Logger WRITE_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.WRITE_LOGGER_NAME);
+ private static final Logger READ_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.QUERY_LOGGER_NAME);
+ private static final Logger COMPACTION_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
@@ -205,7 +209,7 @@ public class StorageEngine implements IService {
try {
WALRecoverManager.getInstance().recover();
} catch (WALException e) {
- LOGGER.error("Fail to recover wal.", e);
+ OTHER_LOGGER.error("Fail to recover wal.", e);
}
}
@@ -215,7 +219,7 @@ public class StorageEngine implements IService {
() -> {
checkResults(futures, "StorageEngine failed to recover.");
isReadyForReadAndWrite.set(true);
- LOGGER.info(
+ OTHER_LOGGER.info(
"Storage Engine recover cost: {}s.",
(System.currentTimeMillis() - startRecoverTime) / 1000);
@@ -245,12 +249,12 @@ public class StorageEngine implements IService {
try {
dataRegion = buildNewDataRegion(sgName, dataRegionId);
} catch (DataRegionException e) {
- LOGGER.error(
+ OTHER_LOGGER.error(
"Failed to recover data region {}[{}]", sgName,
dataRegionId.getId(), e);
return null;
}
dataRegionMap.put(dataRegionId, dataRegion);
- LOGGER.info(
+ OTHER_LOGGER.info(
"Data regions have been recovered {}/{}",
readyDataRegionNum.incrementAndGet(),
recoverDataRegionNum);
@@ -307,7 +311,7 @@ public class StorageEngine implements IService {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
- LOGGER.warn("Storage engine failed to set up.", e);
+ OTHER_LOGGER.warn("Storage engine failed to set up.", e);
Thread.currentThread().interrupt();
return;
}
@@ -328,7 +332,7 @@ public class StorageEngine implements IService {
CONFIG.getSeqMemtableFlushCheckInterval(),
CONFIG.getSeqMemtableFlushCheckInterval(),
TimeUnit.MILLISECONDS);
- LOGGER.info("start sequence memtable timed flush check thread
successfully.");
+ WRITE_LOGGER.info("start sequence memtable timed flush check thread
successfully.");
}
// timed flush unsequence memtable
if (CONFIG.isEnableTimedFlushUnseqMemtable()) {
@@ -341,7 +345,7 @@ public class StorageEngine implements IService {
CONFIG.getUnseqMemtableFlushCheckInterval(),
CONFIG.getUnseqMemtableFlushCheckInterval(),
TimeUnit.MILLISECONDS);
- LOGGER.info("start unsequence memtable timed flush check thread
successfully.");
+ WRITE_LOGGER.info("start unsequence memtable timed flush check thread
successfully.");
}
}
@@ -388,7 +392,7 @@ public class StorageEngine implements IService {
checkResults(futures, "async recover tsfile resource meets
error.");
recoverRepairData();
isReadyForNonReadWriteFunctions.set(true);
- LOGGER.info(
+ OTHER_LOGGER.info(
"TsFile Resource recover cost: {}s.",
(System.currentTimeMillis() - startRecoverTime) / 1000);
},
@@ -438,7 +442,7 @@ public class StorageEngine implements IService {
try {
pool.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.warn("{} still doesn't exit after 30s", poolName);
+ OTHER_LOGGER.warn("{} still doesn't exit after 30s", poolName);
Thread.currentThread().interrupt();
}
}
@@ -458,7 +462,7 @@ public class StorageEngine implements IService {
public DataRegion buildNewDataRegion(String databaseName, DataRegionId
dataRegionId)
throws DataRegionException {
DataRegion dataRegion;
- LOGGER.info(
+ WRITE_LOGGER.info(
"construct a data region instance, the database is {}, Thread is {}",
databaseName,
Thread.currentThread().getId());
@@ -484,7 +488,7 @@ public class StorageEngine implements IService {
/** flush command Sync asyncCloseOneProcessor all file node processors. */
public void syncCloseAllProcessor() {
- LOGGER.info("Start closing all database processor");
+ WRITE_LOGGER.info("Start closing all database processor");
List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
@@ -500,7 +504,7 @@ public class StorageEngine implements IService {
}
public void forceCloseAllProcessor() throws TsFileProcessorException {
- LOGGER.info("Start force closing all database processor");
+ WRITE_LOGGER.info("Start force closing all database processor");
List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
if (dataRegion != null) {
@@ -597,7 +601,7 @@ public class StorageEngine implements IService {
if
(!CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart())
{
return false;
}
- LOGGER.info("start repair data");
+ COMPACTION_LOGGER.info("start repair data");
List<DataRegion> dataRegionList = new ArrayList<>(dataRegionMap.values());
cachedThreadPool.submit(new
UnsortedFileRepairTaskScheduler(dataRegionList, false));
return true;
@@ -614,7 +618,7 @@ public class StorageEngine implements IService {
if
(!CompactionScheduleTaskManager.getRepairTaskManagerInstance().hasRunningRepairTask())
{
return;
}
- LOGGER.info("stop repair data");
+ COMPACTION_LOGGER.info("stop repair data");
try {
repairDataTaskManager.markRepairTaskStopping();
repairDataTaskManager.abortRepairTask();
@@ -691,7 +695,7 @@ public class StorageEngine implements IService {
String msg =
"Unable to find the configuration file. Some modifications are made
only in memory.";
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, msg);
- LOGGER.warn(msg);
+ OTHER_LOGGER.warn(msg);
try {
IoTDBDescriptor.getInstance().loadHotModifiedProps(newConfigProperties);
IoTDBDescriptor.getInstance().reloadMetricProperties(newConfigProperties);
@@ -801,7 +805,7 @@ public class StorageEngine implements IService {
try {
FileUtils.deleteDirectory(regionSnapshotDir);
} catch (IOException e) {
- LOGGER.error("Failed to delete snapshot dir {}",
regionSnapshotDir, e);
+ OTHER_LOGGER.error("Failed to delete snapshot dir {}",
regionSnapshotDir, e);
}
}
}
@@ -811,7 +815,7 @@ public class StorageEngine implements IService {
WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId);
FileMetrics.getInstance().deleteRegion(region.getDatabaseName(),
region.getDataRegionId());
} catch (Exception e) {
- LOGGER.error(
+ OTHER_LOGGER.error(
"Error occurs when deleting data region {}-{}",
region.getDatabaseName(),
region.getDataRegionId(),
@@ -935,7 +939,7 @@ public class StorageEngine implements IService {
final DataRegion dataRegion = getDataRegion(dataRegionId);
if (dataRegion == null) {
- LOGGER.warn(
+ WRITE_LOGGER.warn(
"DataRegion {} not found on this DataNode when writing piece node"
+ "of TsFile {} (maybe due to region migration), will skip.",
dataRegionId,
@@ -946,7 +950,7 @@ public class StorageEngine implements IService {
try {
loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
} catch (IOException e) {
- LOGGER.warn(
+ WRITE_LOGGER.warn(
"IO error when writing piece node of TsFile {} to DataRegion {}.",
pieceNode.getTsFile(),
dataRegionId,
@@ -955,7 +959,7 @@ public class StorageEngine implements IService {
status.setMessage(e.getMessage());
return status;
} catch (Exception e) {
- LOGGER.warn(
+ WRITE_LOGGER.warn(
"Exception occurred when writing piece node of TsFile {} to
DataRegion {}.",
pieceNode.getTsFile(),
dataRegionId,
@@ -1004,7 +1008,7 @@ public class StorageEngine implements IService {
status.setMessage(String.format("Wrong load command %s.",
loadCommand));
}
} catch (Exception e) {
- LOGGER.error("Execute load command {} error.", loadCommand, e);
+ WRITE_LOGGER.error("Execute load command {} error.", loadCommand, e);
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
}
@@ -1014,18 +1018,18 @@ public class StorageEngine implements IService {
/** reboot timed flush sequence/unsequence memtable thread */
public void rebootTimedService() throws ShutdownException {
- LOGGER.info("Start rebooting all timed service.");
+ OTHER_LOGGER.info("Start rebooting all timed service.");
// exclude ttl check thread
stopTimedServiceAndThrow(seqMemtableTimedFlushCheckThread,
"SeqMemtableTimedFlushCheckThread");
stopTimedServiceAndThrow(
unseqMemtableTimedFlushCheckThread,
"UnseqMemtableTimedFlushCheckThread");
- LOGGER.info("Stop all timed service successfully, and now restart them.");
+ OTHER_LOGGER.info("Stop all timed service successfully, and now restart
them.");
startTimedService();
- LOGGER.info("Reboot all timed service successfully");
+ OTHER_LOGGER.info("Reboot all timed service successfully");
}
private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String
poolName)
@@ -1035,7 +1039,7 @@ public class StorageEngine implements IService {
try {
pool.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.warn("{} still doesn't exit after 30s", poolName);
+ OTHER_LOGGER.warn("{} still doesn't exit after 30s", poolName);
throw new ShutdownException(e);
}
}