This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8a4b8ea366 [IOTDB-4517] Fix cross_space and unseq_space compaction are
slower at common timeseries and template timeseries (#7433)
8a4b8ea366 is described below
commit 8a4b8ea3661f87b9b0cfcb712fe64101598cddab
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Oct 4 21:24:08 2022 +0800
[IOTDB-4517] Fix cross_space and unseq_space compaction are slower at
common timeseries and template timeseries (#7433)
---
.../impl/ReadPointCompactionPerformer.java | 40 +++++++++++----
.../writer/CrossSpaceCompactionWriter.java | 58 +++++++++++++++++-----
.../writer/InnerSpaceCompactionWriter.java | 38 ++++++++++----
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 9 ++--
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 15 ++++++
5 files changed, 123 insertions(+), 37 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index eafb5609f3..c902011beb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -59,6 +59,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -190,22 +191,31 @@ public class ReadPointCompactionPerformer
throws IOException, InterruptedException, IllegalPathException,
ExecutionException {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
deviceIterator.iterateNotAlignedSeries(device, false);
- List<String> allMeasurements =
- new
ArrayList<>(deviceIterator.getAllSchemasOfCurrentDevice().keySet());
+ Map<String, MeasurementSchema> schemaMap =
deviceIterator.getAllSchemasOfCurrentDevice();
+ List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
allMeasurements.sort((String::compareTo));
int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
- Map<String, MeasurementSchema> schemaMap =
deviceIterator.getAllSchemasOfCurrentDevice();
// construct sub tasks and start compacting measurements in parallel
- compactionWriter.startChunkGroup(device, false);
- for (int taskCount = 0; taskCount < allMeasurements.size(); ) {
+ if (subTaskNums > 0) {
+ // assign the measurements for each subtask
+ List<String>[] measurementListArray = new List[subTaskNums];
+ for (int i = 0, size = allMeasurements.size(); i < size; ++i) {
+ int index = i % subTaskNums;
+ if (measurementListArray[index] == null) {
+ measurementListArray[index] = new LinkedList<>();
+ }
+ measurementListArray[index].add(allMeasurements.get(i));
+ }
+
+ compactionWriter.startChunkGroup(device, false);
List<Future<Void>> futures = new ArrayList<>();
- for (int i = 0; i < subTaskNums && taskCount < allMeasurements.size();
i++) {
+ for (int i = 0; i < subTaskNums; ++i) {
futures.add(
CompactionTaskManager.getInstance()
.submitSubTask(
new ReadPointPerformerSubTask(
device,
-
Collections.singletonList(allMeasurements.get(taskCount++)),
+ measurementListArray[i],
fragmentInstanceContext,
queryDataSource,
compactionWriter,
@@ -215,11 +225,9 @@ public class ReadPointCompactionPerformer
for (Future<Void> future : futures) {
future.get();
}
- // sync all the subtask, and check the writer chunk metadata size
compactionWriter.checkAndMayFlushChunkMetadata();
+ compactionWriter.endChunkGroup();
}
-
- compactionWriter.endChunkGroup();
}
private static void updateDeviceStartTimeAndEndTime(
@@ -294,10 +302,20 @@ public class ReadPointCompactionPerformer
tsBlock.getPositionCount());
} else {
IPointReader pointReader = tsBlock.getTsBlockSingleColumnIterator();
+ TimeValuePair timeValuePair = null;
+ boolean updateFirstTime = false;
while (pointReader.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+ timeValuePair = pointReader.nextTimeValuePair();
+ if (!updateFirstTime) {
+ // update start time
+ writer.updateStartTimeAndEndTime(device,
timeValuePair.getTimestamp(), subTaskId);
+ updateFirstTime = true;
+ }
writer.write(
timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue(), subTaskId);
+ }
+ // update end time
+ if (timeValuePair != null) {
writer.updateStartTimeAndEndTime(device,
timeValuePair.getTimestamp(), subTaskId);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index d192c0f6d7..07cc2019aa 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -32,6 +32,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
// target fileIOWriters
@@ -57,6 +59,12 @@ public class CrossSpaceCompactionWriter extends
AbstractCompactionWriter {
// current chunk group header size
private int chunkGroupHeaderSize;
+ private AtomicLong[] startTimeForCurDeviceForEachFile;
+ private AtomicLong[] endTimeForCurDeviceForEachFile;
+ private AtomicBoolean[] hasCurDeviceForEachFile;
+ private AtomicLong[][] startTimeForEachDevice = new AtomicLong[subTaskNum][];
+ private AtomicLong[][] endTimeForEachDevice = new AtomicLong[subTaskNum][];
+
public CrossSpaceCompactionWriter(
List<TsFileResource> targetResources, List<TsFileResource>
seqFileResources)
throws IOException {
@@ -64,6 +72,9 @@ public class CrossSpaceCompactionWriter extends
AbstractCompactionWriter {
currentDeviceEndTime = new long[seqFileResources.size()];
isEmptyFile = new boolean[seqFileResources.size()];
isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+ startTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+ endTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+ hasCurDeviceForEachFile = new AtomicBoolean[targetResources.size()];
long memorySizeForEachWriter =
(long)
(SystemInfo.getInstance().getMemorySizeForCompaction()
@@ -76,8 +87,19 @@ public class CrossSpaceCompactionWriter extends
AbstractCompactionWriter {
this.fileWriterList.add(
new TsFileIOWriter(targetResources.get(i).getTsFile(), true,
memorySizeForEachWriter));
isEmptyFile[i] = true;
+ startTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MAX_VALUE);
+ endTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MIN_VALUE);
+ hasCurDeviceForEachFile[i] = new AtomicBoolean(false);
}
this.seqTsFileResources = seqFileResources;
+ for (int i = 0, size = targetResources.size(); i < subTaskNum; ++i) {
+ startTimeForEachDevice[i] = new AtomicLong[size];
+ endTimeForEachDevice[i] = new AtomicLong[size];
+ for (int j = 0; j < size; ++j) {
+ startTimeForEachDevice[i][j] = new AtomicLong(Long.MAX_VALUE);
+ endTimeForEachDevice[i][j] = new AtomicLong(Long.MIN_VALUE);
+ }
+ }
}
@Override
@@ -102,6 +124,16 @@ public class CrossSpaceCompactionWriter extends
AbstractCompactionWriter {
}
isDeviceExistedInTargetFiles[i] = false;
}
+ for (int i = 0, size = targetTsFileResources.size(); i < size; ++i) {
+ for (int j = 0; j < subTaskNum; ++j) {
+ targetTsFileResources
+ .get(i)
+ .updateStartTime(deviceId,
startTimeForEachDevice[j][i].getAndSet(Long.MAX_VALUE));
+ targetTsFileResources
+ .get(i)
+ .updateEndTime(deviceId,
endTimeForEachDevice[j][i].getAndSet(Long.MIN_VALUE));
+ }
+ }
deviceId = null;
}
@@ -115,6 +147,9 @@ public class CrossSpaceCompactionWriter extends
AbstractCompactionWriter {
public void write(long timestamp, Object value, int subTaskId) throws
IOException {
checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
writeDataPoint(timestamp, value, subTaskId);
+ int fileIndex = seqFileIndexArray[subTaskId];
+ startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp,
Math::min);
+ endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp,
Math::max);
if (measurementPointCountArray[subTaskId] % 10 == 0) {
checkChunkSizeAndMayOpenANewChunk(
fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
@@ -131,12 +166,10 @@ public class CrossSpaceCompactionWriter extends
AbstractCompactionWriter {
checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(),
subTaskId);
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl)
this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
- synchronized (this) {
- // we need to synchronized here to avoid multi-thread competition in
sub-task
- TsFileResource resource =
targetTsFileResources.get(seqFileIndexArray[subTaskId]);
- resource.updateStartTime(device, timestamps.getStartTime());
- resource.updateEndTime(device, timestamps.getEndTime());
- }
+ int fileIndex = seqFileIndexArray[subTaskId];
+ startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(
+ timestamps.getStartTime(), Math::min);
+
endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamps.getEndTime(),
Math::max);
checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]),
subTaskId);
isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
isEmptyFile[seqFileIndexArray[subTaskId]] = false;
@@ -215,12 +248,11 @@ public class CrossSpaceCompactionWriter extends
AbstractCompactionWriter {
@Override
public void updateStartTimeAndEndTime(String device, long time, int
subTaskId) {
- synchronized (this) {
- int fileIndex = seqFileIndexArray[subTaskId];
- TsFileResource resource = targetTsFileResources.get(fileIndex);
- // we need to synchronized here to avoid multi-thread competition in
sub-task
- resource.updateStartTime(device, time);
- resource.updateEndTime(device, time);
- }
+ int fileIndex = seqFileIndexArray[subTaskId];
+ // using synchronized will lead to significant performance loss,
+ // so we use atomic long here to accelerate
+ startTimeForCurDeviceForEachFile[fileIndex].accumulateAndGet(time,
Math::min);
+ endTimeForCurDeviceForEachFile[fileIndex].accumulateAndGet(time,
Math::max);
+ hasCurDeviceForEachFile[fileIndex].set(true);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index 2c3c2e58ad..978067e800 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -26,15 +26,22 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InnerSpaceCompactionWriter.class);
private TsFileIOWriter fileWriter;
private boolean isEmptyFile;
private TsFileResource resource;
+ private AtomicLong[] startTimeOfCurDevice;
+ private AtomicLong[] endTimeOfCurDevice;
public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws
IOException {
long sizeForFileWriter =
@@ -47,6 +54,14 @@ public class InnerSpaceCompactionWriter extends
AbstractCompactionWriter {
this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true,
sizeForFileWriter);
isEmptyFile = true;
resource = targetFileResource;
+ int concurrentThreadNum =
+ Math.max(1,
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+ startTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+ endTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+ for (int i = 0; i < concurrentThreadNum; ++i) {
+ startTimeOfCurDevice[i] = new AtomicLong(Long.MAX_VALUE);
+ endTimeOfCurDevice[i] = new AtomicLong(Long.MIN_VALUE);
+ }
}
@Override
@@ -58,6 +73,14 @@ public class InnerSpaceCompactionWriter extends
AbstractCompactionWriter {
@Override
public void endChunkGroup() throws IOException {
+ for (int i = 0; i < startTimeOfCurDevice.length; ++i) {
+ resource.updateStartTime(
+ fileWriter.getCurrentChunkGroupDeviceId(),
startTimeOfCurDevice[i].get());
+ resource.updateEndTime(
+ fileWriter.getCurrentChunkGroupDeviceId(),
endTimeOfCurDevice[i].get());
+ startTimeOfCurDevice[i].set(Long.MAX_VALUE);
+ endTimeOfCurDevice[i].set(Long.MIN_VALUE);
+ }
fileWriter.endChunkGroup();
}
@@ -82,11 +105,10 @@ public class InnerSpaceCompactionWriter extends
AbstractCompactionWriter {
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl)
this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
- synchronized (this) {
- // we need to synchronized here to avoid multi-thread competition in
sub-task
- resource.updateStartTime(device, timestamps.getStartTime());
- resource.updateEndTime(device, timestamps.getEndTime());
- }
+ startTimeOfCurDevice[subTaskId].set(
+ Math.min(startTimeOfCurDevice[subTaskId].get(),
timestamps.getStartTime()));
+ endTimeOfCurDevice[subTaskId].set(
+ Math.max(endTimeOfCurDevice[subTaskId].get(),
timestamps.getEndTime()));
isEmptyFile = false;
}
@@ -109,10 +131,8 @@ public class InnerSpaceCompactionWriter extends
AbstractCompactionWriter {
@Override
public void updateStartTimeAndEndTime(String device, long time, int
subTaskId) {
// we need to synchronized here to avoid multi-thread competition in
sub-task
- synchronized (this) {
- resource.updateStartTime(device, time);
- resource.updateEndTime(device, time);
- }
+
startTimeOfCurDevice[subTaskId].set(Math.min(startTimeOfCurDevice[subTaskId].get(),
time));
+
endTimeOfCurDevice[subTaskId].set(Math.max(endTimeOfCurDevice[subTaskId].get(),
time));
}
@Override
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index e1e589a890..3a21287038 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -290,10 +290,11 @@ public class ChunkMetadata implements IChunkMetadata {
}
public long calculateRamSize() {
- return CHUNK_METADATA_FIXED_RAM_SIZE
- + RamUsageEstimator.sizeOf(tsFilePrefixPath)
- + RamUsageEstimator.sizeOf(measurementUid)
- + statistics.calculateRamSize();
+ long memSize = CHUNK_METADATA_FIXED_RAM_SIZE;
+ memSize += RamUsageEstimator.sizeOf(tsFilePrefixPath);
+ memSize += RamUsageEstimator.sizeOf(measurementUid);
+ memSize += statistics.calculateRamSize();
+ return memSize;
}
public static long calculateRamSize(String measurementId, TSDataType
dataType) {
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 6cf00f8b84..f2e73d56ae 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -117,6 +117,7 @@ public class TsFileIOWriter implements AutoCloseable {
protected boolean enableMemoryControl = false;
private Path lastSerializePath = null;
protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
+ private volatile int chunkMetadataCount = 0;
public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";
/** empty construct function. */
@@ -287,6 +288,7 @@ public class TsFileIOWriter implements AutoCloseable {
if (enableMemoryControl) {
this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
}
+ chunkMetadataCount++;
chunkMetadataList.add(currentChunkMetadata);
currentChunkMetadata = null;
}
@@ -609,6 +611,13 @@ public class TsFileIOWriter implements AutoCloseable {
// This function should be called after all data of an aligned device has
been written
if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
try {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Flushing chunk metadata, total size is {}, count is {}, avg
size is {}",
+ currentChunkMetadataSize,
+ chunkMetadataCount,
+ currentChunkMetadataSize / chunkMetadataCount);
+ }
sortAndFlushChunkMetadata();
} catch (IOException e) {
logger.error("Meets exception when flushing metadata to temp file for
{}", file, e);
@@ -649,6 +658,8 @@ public class TsFileIOWriter implements AutoCloseable {
if (chunkMetadataList != null) {
chunkMetadataList.clear();
}
+ chunkMetadataCount = 0;
+ currentChunkMetadataSize = 0;
}
private void writeChunkMetadataToTempFile(
@@ -676,4 +687,8 @@ public class TsFileIOWriter implements AutoCloseable {
ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
buffer.writeTo(tempOutput);
}
+
+ public String getCurrentChunkGroupDeviceId() {
+ return currentChunkGroupDeviceId;
+ }
}