This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a4b8ea366 [IOTDB-4517] Fix cross_space and unseq_space compaction are 
slower at common timeseries and template timeseries (#7433)
8a4b8ea366 is described below

commit 8a4b8ea3661f87b9b0cfcb712fe64101598cddab
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Oct 4 21:24:08 2022 +0800

    [IOTDB-4517] Fix cross_space and unseq_space compaction are slower at 
common timeseries and template timeseries (#7433)
---
 .../impl/ReadPointCompactionPerformer.java         | 40 +++++++++++----
 .../writer/CrossSpaceCompactionWriter.java         | 58 +++++++++++++++++-----
 .../writer/InnerSpaceCompactionWriter.java         | 38 ++++++++++----
 .../iotdb/tsfile/file/metadata/ChunkMetadata.java  |  9 ++--
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  | 15 ++++++
 5 files changed, 123 insertions(+), 37 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index eafb5609f3..c902011beb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -59,6 +59,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -190,22 +191,31 @@ public class ReadPointCompactionPerformer
       throws IOException, InterruptedException, IllegalPathException, 
ExecutionException {
     MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
         deviceIterator.iterateNotAlignedSeries(device, false);
-    List<String> allMeasurements =
-        new 
ArrayList<>(deviceIterator.getAllSchemasOfCurrentDevice().keySet());
+    Map<String, MeasurementSchema> schemaMap = 
deviceIterator.getAllSchemasOfCurrentDevice();
+    List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
     allMeasurements.sort((String::compareTo));
     int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
-    Map<String, MeasurementSchema> schemaMap = 
deviceIterator.getAllSchemasOfCurrentDevice();
     // construct sub tasks and start compacting measurements in parallel
-    compactionWriter.startChunkGroup(device, false);
-    for (int taskCount = 0; taskCount < allMeasurements.size(); ) {
+    if (subTaskNums > 0) {
+      // assign the measurements for each subtask
+      List<String>[] measurementListArray = new List[subTaskNums];
+      for (int i = 0, size = allMeasurements.size(); i < size; ++i) {
+        int index = i % subTaskNums;
+        if (measurementListArray[index] == null) {
+          measurementListArray[index] = new LinkedList<>();
+        }
+        measurementListArray[index].add(allMeasurements.get(i));
+      }
+
+      compactionWriter.startChunkGroup(device, false);
       List<Future<Void>> futures = new ArrayList<>();
-      for (int i = 0; i < subTaskNums && taskCount < allMeasurements.size(); 
i++) {
+      for (int i = 0; i < subTaskNums; ++i) {
         futures.add(
             CompactionTaskManager.getInstance()
                 .submitSubTask(
                     new ReadPointPerformerSubTask(
                         device,
-                        
Collections.singletonList(allMeasurements.get(taskCount++)),
+                        measurementListArray[i],
                         fragmentInstanceContext,
                         queryDataSource,
                         compactionWriter,
@@ -215,11 +225,9 @@ public class ReadPointCompactionPerformer
       for (Future<Void> future : futures) {
         future.get();
       }
-      // sync all the subtask, and check the writer chunk metadata size
       compactionWriter.checkAndMayFlushChunkMetadata();
+      compactionWriter.endChunkGroup();
     }
-
-    compactionWriter.endChunkGroup();
   }
 
   private static void updateDeviceStartTimeAndEndTime(
@@ -294,10 +302,20 @@ public class ReadPointCompactionPerformer
             tsBlock.getPositionCount());
       } else {
         IPointReader pointReader = tsBlock.getTsBlockSingleColumnIterator();
+        TimeValuePair timeValuePair = null;
+        boolean updateFirstTime = false;
         while (pointReader.hasNextTimeValuePair()) {
-          TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+          timeValuePair = pointReader.nextTimeValuePair();
+          if (!updateFirstTime) {
+            // update start time
+            writer.updateStartTimeAndEndTime(device, 
timeValuePair.getTimestamp(), subTaskId);
+            updateFirstTime = true;
+          }
           writer.write(
               timeValuePair.getTimestamp(), 
timeValuePair.getValue().getValue(), subTaskId);
+        }
+        // update end time
+        if (timeValuePair != null) {
           writer.updateStartTimeAndEndTime(device, 
timeValuePair.getTimestamp(), subTaskId);
         }
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index d192c0f6d7..07cc2019aa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -32,6 +32,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   // target fileIOWriters
@@ -57,6 +59,12 @@ public class CrossSpaceCompactionWriter extends 
AbstractCompactionWriter {
   // current chunk group header size
   private int chunkGroupHeaderSize;
 
+  private AtomicLong[] startTimeForCurDeviceForEachFile;
+  private AtomicLong[] endTimeForCurDeviceForEachFile;
+  private AtomicBoolean[] hasCurDeviceForEachFile;
+  private AtomicLong[][] startTimeForEachDevice = new AtomicLong[subTaskNum][];
+  private AtomicLong[][] endTimeForEachDevice = new AtomicLong[subTaskNum][];
+
   public CrossSpaceCompactionWriter(
       List<TsFileResource> targetResources, List<TsFileResource> 
seqFileResources)
       throws IOException {
@@ -64,6 +72,9 @@ public class CrossSpaceCompactionWriter extends 
AbstractCompactionWriter {
     currentDeviceEndTime = new long[seqFileResources.size()];
     isEmptyFile = new boolean[seqFileResources.size()];
     isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+    startTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+    endTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+    hasCurDeviceForEachFile = new AtomicBoolean[targetResources.size()];
     long memorySizeForEachWriter =
         (long)
             (SystemInfo.getInstance().getMemorySizeForCompaction()
@@ -76,8 +87,19 @@ public class CrossSpaceCompactionWriter extends 
AbstractCompactionWriter {
       this.fileWriterList.add(
           new TsFileIOWriter(targetResources.get(i).getTsFile(), true, 
memorySizeForEachWriter));
       isEmptyFile[i] = true;
+      startTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MAX_VALUE);
+      endTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MIN_VALUE);
+      hasCurDeviceForEachFile[i] = new AtomicBoolean(false);
     }
     this.seqTsFileResources = seqFileResources;
+    for (int i = 0, size = targetResources.size(); i < subTaskNum; ++i) {
+      startTimeForEachDevice[i] = new AtomicLong[size];
+      endTimeForEachDevice[i] = new AtomicLong[size];
+      for (int j = 0; j < size; ++j) {
+        startTimeForEachDevice[i][j] = new AtomicLong(Long.MAX_VALUE);
+        endTimeForEachDevice[i][j] = new AtomicLong(Long.MIN_VALUE);
+      }
+    }
   }
 
   @Override
@@ -102,6 +124,16 @@ public class CrossSpaceCompactionWriter extends 
AbstractCompactionWriter {
       }
       isDeviceExistedInTargetFiles[i] = false;
     }
+    for (int i = 0, size = targetTsFileResources.size(); i < size; ++i) {
+      for (int j = 0; j < subTaskNum; ++j) {
+        targetTsFileResources
+            .get(i)
+            .updateStartTime(deviceId, 
startTimeForEachDevice[j][i].getAndSet(Long.MAX_VALUE));
+        targetTsFileResources
+            .get(i)
+            .updateEndTime(deviceId, 
endTimeForEachDevice[j][i].getAndSet(Long.MIN_VALUE));
+      }
+    }
     deviceId = null;
   }
 
@@ -115,6 +147,9 @@ public class CrossSpaceCompactionWriter extends 
AbstractCompactionWriter {
   public void write(long timestamp, Object value, int subTaskId) throws 
IOException {
     checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
     writeDataPoint(timestamp, value, subTaskId);
+    int fileIndex = seqFileIndexArray[subTaskId];
+    startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, 
Math::min);
+    endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, 
Math::max);
     if (measurementPointCountArray[subTaskId] % 10 == 0) {
       checkChunkSizeAndMayOpenANewChunk(
           fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
@@ -131,12 +166,10 @@ public class CrossSpaceCompactionWriter extends 
AbstractCompactionWriter {
     checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), 
subTaskId);
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) 
this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
-    synchronized (this) {
-      // we need to synchronized here to avoid multi-thread competition in 
sub-task
-      TsFileResource resource = 
targetTsFileResources.get(seqFileIndexArray[subTaskId]);
-      resource.updateStartTime(device, timestamps.getStartTime());
-      resource.updateEndTime(device, timestamps.getEndTime());
-    }
+    int fileIndex = seqFileIndexArray[subTaskId];
+    startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(
+        timestamps.getStartTime(), Math::min);
+    
endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamps.getEndTime(),
 Math::max);
     
checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]),
 subTaskId);
     isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
     isEmptyFile[seqFileIndexArray[subTaskId]] = false;
@@ -215,12 +248,11 @@ public class CrossSpaceCompactionWriter extends 
AbstractCompactionWriter {
 
   @Override
   public void updateStartTimeAndEndTime(String device, long time, int 
subTaskId) {
-    synchronized (this) {
-      int fileIndex = seqFileIndexArray[subTaskId];
-      TsFileResource resource = targetTsFileResources.get(fileIndex);
-      // we need to synchronized here to avoid multi-thread competition in 
sub-task
-      resource.updateStartTime(device, time);
-      resource.updateEndTime(device, time);
-    }
+    int fileIndex = seqFileIndexArray[subTaskId];
+    // using synchronized will lead to significant performance loss,
+    // so we use atomic long here to accelerate
+    startTimeForCurDeviceForEachFile[fileIndex].accumulateAndGet(time, 
Math::min);
+    endTimeForCurDeviceForEachFile[fileIndex].accumulateAndGet(time, 
Math::max);
+    hasCurDeviceForEachFile[fileIndex].set(true);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index 2c3c2e58ad..978067e800 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -26,15 +26,22 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InnerSpaceCompactionWriter.class);
   private TsFileIOWriter fileWriter;
 
   private boolean isEmptyFile;
   private TsFileResource resource;
+  private AtomicLong[] startTimeOfCurDevice;
+  private AtomicLong[] endTimeOfCurDevice;
 
   public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws 
IOException {
     long sizeForFileWriter =
@@ -47,6 +54,14 @@ public class InnerSpaceCompactionWriter extends 
AbstractCompactionWriter {
     this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, 
sizeForFileWriter);
     isEmptyFile = true;
     resource = targetFileResource;
+    int concurrentThreadNum =
+        Math.max(1, 
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+    startTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+    endTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+    for (int i = 0; i < concurrentThreadNum; ++i) {
+      startTimeOfCurDevice[i] = new AtomicLong(Long.MAX_VALUE);
+      endTimeOfCurDevice[i] = new AtomicLong(Long.MIN_VALUE);
+    }
   }
 
   @Override
@@ -58,6 +73,14 @@ public class InnerSpaceCompactionWriter extends 
AbstractCompactionWriter {
 
   @Override
   public void endChunkGroup() throws IOException {
+    for (int i = 0; i < startTimeOfCurDevice.length; ++i) {
+      resource.updateStartTime(
+          fileWriter.getCurrentChunkGroupDeviceId(), 
startTimeOfCurDevice[i].get());
+      resource.updateEndTime(
+          fileWriter.getCurrentChunkGroupDeviceId(), 
endTimeOfCurDevice[i].get());
+      startTimeOfCurDevice[i].set(Long.MAX_VALUE);
+      endTimeOfCurDevice[i].set(Long.MIN_VALUE);
+    }
     fileWriter.endChunkGroup();
   }
 
@@ -82,11 +105,10 @@ public class InnerSpaceCompactionWriter extends 
AbstractCompactionWriter {
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) 
this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
-    synchronized (this) {
-      // we need to synchronized here to avoid multi-thread competition in 
sub-task
-      resource.updateStartTime(device, timestamps.getStartTime());
-      resource.updateEndTime(device, timestamps.getEndTime());
-    }
+    startTimeOfCurDevice[subTaskId].set(
+        Math.min(startTimeOfCurDevice[subTaskId].get(), 
timestamps.getStartTime()));
+    endTimeOfCurDevice[subTaskId].set(
+        Math.max(endTimeOfCurDevice[subTaskId].get(), 
timestamps.getEndTime()));
     isEmptyFile = false;
   }
 
@@ -109,10 +131,8 @@ public class InnerSpaceCompactionWriter extends 
AbstractCompactionWriter {
   @Override
   public void updateStartTimeAndEndTime(String device, long time, int 
subTaskId) {
     // we need to synchronized here to avoid multi-thread competition in 
sub-task
-    synchronized (this) {
-      resource.updateStartTime(device, time);
-      resource.updateEndTime(device, time);
-    }
+    
startTimeOfCurDevice[subTaskId].set(Math.min(startTimeOfCurDevice[subTaskId].get(),
 time));
+    
endTimeOfCurDevice[subTaskId].set(Math.max(endTimeOfCurDevice[subTaskId].get(), 
time));
   }
 
   @Override
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index e1e589a890..3a21287038 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -290,10 +290,11 @@ public class ChunkMetadata implements IChunkMetadata {
   }
 
   public long calculateRamSize() {
-    return CHUNK_METADATA_FIXED_RAM_SIZE
-        + RamUsageEstimator.sizeOf(tsFilePrefixPath)
-        + RamUsageEstimator.sizeOf(measurementUid)
-        + statistics.calculateRamSize();
+    long memSize = CHUNK_METADATA_FIXED_RAM_SIZE;
+    memSize += RamUsageEstimator.sizeOf(tsFilePrefixPath);
+    memSize += RamUsageEstimator.sizeOf(measurementUid);
+    memSize += statistics.calculateRamSize();
+    return memSize;
   }
 
   public static long calculateRamSize(String measurementId, TSDataType 
dataType) {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 6cf00f8b84..f2e73d56ae 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -117,6 +117,7 @@ public class TsFileIOWriter implements AutoCloseable {
   protected boolean enableMemoryControl = false;
   private Path lastSerializePath = null;
   protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
+  private volatile int chunkMetadataCount = 0;
   public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";
 
   /** empty construct function. */
@@ -287,6 +288,7 @@ public class TsFileIOWriter implements AutoCloseable {
     if (enableMemoryControl) {
       this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
     }
+    chunkMetadataCount++;
     chunkMetadataList.add(currentChunkMetadata);
     currentChunkMetadata = null;
   }
@@ -609,6 +611,13 @@ public class TsFileIOWriter implements AutoCloseable {
     // This function should be called after all data of an aligned device has 
been written
     if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
       try {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Flushing chunk metadata, total size is {}, count is {}, avg 
size is {}",
+              currentChunkMetadataSize,
+              chunkMetadataCount,
+              currentChunkMetadataSize / chunkMetadataCount);
+        }
         sortAndFlushChunkMetadata();
       } catch (IOException e) {
         logger.error("Meets exception when flushing metadata to temp file for 
{}", file, e);
@@ -649,6 +658,8 @@ public class TsFileIOWriter implements AutoCloseable {
     if (chunkMetadataList != null) {
       chunkMetadataList.clear();
     }
+    chunkMetadataCount = 0;
+    currentChunkMetadataSize = 0;
   }
 
   private void writeChunkMetadataToTempFile(
@@ -676,4 +687,8 @@ public class TsFileIOWriter implements AutoCloseable {
     ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
     buffer.writeTo(tempOutput);
   }
+
+  public String getCurrentChunkGroupDeviceId() {
+    return currentChunkGroupDeviceId;
+  }
 }

Reply via email to