This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8ee9863838b Repair task may create compaction log file twice in
sequence and unsequence dir (#13413)
8ee9863838b is described below
commit 8ee9863838b7213fa516ebb13611057d7f8d99f2
Author: shuwenwei <[email protected]>
AuthorDate: Fri Sep 6 14:13:31 2024 +0800
Repair task may create compaction log file twice in sequence and unsequence
dir (#13413)
* fix bug
* fix insertion compaction rollback
* fix estimate
* modify estimate method
* fix compile
* fix bug
---
.../task/InsertionCrossSpaceCompactionTask.java | 8 +++++++-
.../task/RepairUnsortedFileCompactionTask.java | 7 -------
.../estimator/CompactionEstimateUtils.java | 23 ++++++++++++----------
3 files changed, 20 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
index 32c92678eef..e2f31a28ffa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
@@ -48,6 +49,7 @@ import java.util.stream.Stream;
public class InsertionCrossSpaceCompactionTask extends AbstractCompactionTask {
private Phaser phaser;
+ private boolean failToPassOverlapValidation = false;
public InsertionCrossSpaceCompactionTask(
Phaser phaser,
@@ -182,6 +184,9 @@ public class InsertionCrossSpaceCompactionTask extends
AbstractCompactionTask {
targetFile,
String.format("%.2f", costTime));
} catch (Exception e) {
+ if (e instanceof CompactionValidationFailedException) {
+ failToPassOverlapValidation = true;
+ }
isSuccess = false;
handleException(LOGGER, e);
recover();
@@ -293,7 +298,8 @@ public class InsertionCrossSpaceCompactionTask extends
AbstractCompactionTask {
|| !targetFile.resourceFileExists()
|| (unseqFileToInsert != null
&& unseqFileToInsert.modFileExists()
- && !targetFile.modFileExists());
+ && !targetFile.modFileExists())
+ || failToPassOverlapValidation;
}
private void rollback() throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index 622434b6225..e4a2c568682 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -136,13 +136,6 @@ public class RepairUnsortedFileCompactionTask extends
InnerSpaceCompactionTask {
calculateSourceFilesAndTargetFiles();
isHoldingWriteLock = new boolean[this.filesView.sourceFilesInLog.size()];
Arrays.fill(isHoldingWriteLock, false);
- String dataDirectory = sourceFile.getTsFile().getParent();
- logFile =
- new File(
- dataDirectory
- + File.separator
- + filesView.targetFilesInPerformer.get(0).getTsFile().getName()
- + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
index c919ed96b08..d1e242952f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java
@@ -33,6 +33,7 @@ import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -105,30 +106,32 @@ public class CompactionEstimateUtils {
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
}
- long cost = -1L;
- long modsFileSize = 0L;
+ long cost = 0L;
+ Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
try {
for (TsFileResource resource : resources) {
if (resource.modFileExists()) {
- modsFileSize += resource.getModFile().getSize();
+ cost += resource.getModFile().getSize();
}
try (CompactionTsFileReader reader =
new CompactionTsFileReader(resource.getTsFilePath(), taskType)) {
- cost = Math.max(cost,
getMaxTimeseriesMetadataOfOneDeviceSize(reader));
+ for (Map.Entry<IDeviceID, Long> entry :
getDeviceMetadataSizeMap(reader).entrySet()) {
+ deviceMetadataSizeMap.merge(entry.getKey(), entry.getValue(),
Long::sum);
+ }
}
}
- return cost + modsFileSize;
+ return cost +
deviceMetadataSizeMap.values().stream().max(Long::compareTo).orElse(0L);
} finally {
CompactionEstimateUtils.releaseReadLock(resources);
}
}
- public static long
getMaxTimeseriesMetadataOfOneDeviceSize(CompactionTsFileReader reader)
+ public static Map<IDeviceID, Long>
getDeviceMetadataSizeMap(CompactionTsFileReader reader)
throws IOException {
+ Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
- long maxSize = 0;
while (deviceIterator.hasNext()) {
- deviceIterator.next();
+ IDeviceID deviceID = deviceIterator.next().getLeft();
MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
long totalTimeseriesMetadataSizeOfCurrentDevice = 0;
@@ -137,9 +140,9 @@ public class CompactionEstimateUtils {
for (Pair<Long, Long> offsetPair :
timeseriesMetadataOffsetByDevice.values()) {
totalTimeseriesMetadataSizeOfCurrentDevice += (offsetPair.right -
offsetPair.left);
}
- maxSize = Math.max(maxSize, totalTimeseriesMetadataSizeOfCurrentDevice);
+ deviceMetadataSizeMap.put(deviceID,
totalTimeseriesMetadataSizeOfCurrentDevice);
}
- return maxSize;
+ return deviceMetadataSizeMap;
}
public static boolean shouldAccurateEstimate(long roughEstimatedMemCost) {