This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new cdcd0cc  improve TagLogFile force flush (#4019)
cdcd0cc is described below

commit cdcd0cccabb4ac36dd0e17fc35aa4b5c0957cb14
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Sep 23 15:33:42 2021 +0800

    improve TagLogFile force flush (#4019)
---
 .../resources/conf/iotdb-engine.properties         |  5 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 10 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  7 ++
 .../iotdb/db/metadata/logfile/TagLogFile.java      | 86 +++++++++-------------
 5 files changed, 69 insertions(+), 50 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 259996e..0fe904b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -263,6 +263,11 @@ timestamp_precision=ms
 # the unit is byte
 # tag_attribute_total_size=700
 
+# interval num for tag and attribute records when force flushing to disk
+# When a certain amount of tag and attribute records is reached, they will be 
force flushed to disk
+# It is possible to lose at most tag_attribute_flush_interval records
+# tag_attribute_flush_interval=1000
+
 # In one insert (one device, one timestamp, multiple measurements),
 # if enable partial insert, one measurement failure will not impact other 
measurements
 # enable_partial_insert=true
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f9b8b43..3688151 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -638,6 +638,9 @@ public class IoTDBConfig {
   // max size for tag and attribute of one time series
   private int tagAttributeTotalSize = 700;
 
+  // Interval num of tag and attribute records when force flushing to disk
+  private int tagAttributeFlushInterval = 1000;
+
   // In one insert (one device, one timestamp, multiple measurements),
   // if enable partial insert, one measurement failure will not impact other 
measurements
   private boolean enablePartialInsert = true;
@@ -2062,6 +2065,14 @@ public class IoTDBConfig {
     this.tagAttributeTotalSize = tagAttributeTotalSize;
   }
 
+  public int getTagAttributeFlushInterval() {
+    return tagAttributeFlushInterval;
+  }
+
+  public void setTagAttributeFlushInterval(int tagAttributeFlushInterval) {
+    this.tagAttributeFlushInterval = tagAttributeFlushInterval;
+  }
+
   public int getPrimitiveArraySize() {
     return primitiveArraySize;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 53b418c..f761794 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@ -88,6 +88,10 @@ public class IoTDBConfigCheck {
   private static final String TAG_ATTRIBUTE_SIZE_STRING = 
"tag_attribute_total_size";
   private static String tagAttributeTotalSize = 
String.valueOf(config.getTagAttributeTotalSize());
 
+  private static final String TAG_ATTRIBUTE_FLUSH_INTERVAL = 
"tag_attribute_flush_interval";
+  private static String tagAttributeFlushInterval =
+      String.valueOf(config.getTagAttributeFlushInterval());
+
   private static final String MAX_DEGREE_OF_INDEX_STRING = 
"max_degree_of_index_node";
   private static String maxDegreeOfIndexNode =
       
String.valueOf(TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode());
@@ -151,6 +155,7 @@ public class IoTDBConfigCheck {
     systemProperties.put(TSFILE_FILE_SYSTEM_STRING, tsfileFileSystem);
     systemProperties.put(ENABLE_PARTITION_STRING, 
String.valueOf(enablePartition));
     systemProperties.put(TAG_ATTRIBUTE_SIZE_STRING, tagAttributeTotalSize);
+    systemProperties.put(TAG_ATTRIBUTE_FLUSH_INTERVAL, 
tagAttributeFlushInterval);
     systemProperties.put(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode);
     systemProperties.put(VIRTUAL_STORAGE_GROUP_NUM, virtualStorageGroupNum);
     systemProperties.put(TIME_ENCODER_KEY, timeEncoderValue);
@@ -255,6 +260,7 @@ public class IoTDBConfigCheck {
       properties.setProperty(IOTDB_VERSION_STRING, IoTDBConstant.VERSION);
       properties.setProperty(ENABLE_PARTITION_STRING, 
String.valueOf(enablePartition));
       properties.setProperty(TAG_ATTRIBUTE_SIZE_STRING, tagAttributeTotalSize);
+      properties.setProperty(TAG_ATTRIBUTE_FLUSH_INTERVAL, 
tagAttributeFlushInterval);
       properties.setProperty(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode);
       properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
 
@@ -320,6 +326,10 @@ public class IoTDBConfigCheck {
       printErrorLogAndExit(TAG_ATTRIBUTE_SIZE_STRING);
     }
 
+    if 
(!(properties.getProperty(TAG_ATTRIBUTE_FLUSH_INTERVAL).equals(tagAttributeFlushInterval)))
 {
+      printErrorLogAndExit(TAG_ATTRIBUTE_FLUSH_INTERVAL);
+    }
+
     if 
(!(properties.getProperty(MAX_DEGREE_OF_INDEX_STRING).equals(maxDegreeOfIndexNode)))
 {
       printErrorLogAndExit(MAX_DEGREE_OF_INDEX_STRING);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7696908..6844bb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -624,6 +624,13 @@ public class IoTDBDescriptor {
           Integer.parseInt(
               properties.getProperty(
                   "tag_attribute_total_size", 
String.valueOf(conf.getTagAttributeTotalSize()))));
+
+      conf.setTagAttributeFlushInterval(
+          Integer.parseInt(
+              properties.getProperty(
+                  "tag_attribute_flush_interval",
+                  String.valueOf(conf.getTagAttributeFlushInterval()))));
+
       conf.setPrimitiveArraySize(
           (Integer.parseInt(
               properties.getProperty(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java
index f82d9b7..701f5d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
@@ -46,7 +47,9 @@ public class TagLogFile implements AutoCloseable {
   private static final int MAX_LENGTH =
       IoTDBDescriptor.getInstance().getConfig().getTagAttributeTotalSize();
 
-  private static final byte FILL_BYTE = 0;
+  private static final int RECORD_FLUSH_INTERVAL =
+      IoTDBDescriptor.getInstance().getConfig().getTagAttributeFlushInterval();
+  private int unFlushedRecordNum = 0;
 
   public TagLogFile(String schemaDir, String logFileName) throws IOException {
 
@@ -66,8 +69,7 @@ public class TagLogFile implements AutoCloseable {
             logFile.toPath(),
             StandardOpenOption.READ,
             StandardOpenOption.WRITE,
-            StandardOpenOption.CREATE,
-            StandardOpenOption.DSYNC);
+            StandardOpenOption.CREATE);
     // move the current position to the tail of the file
     this.fileChannel.position(fileChannel.size());
   }
@@ -94,73 +96,57 @@ public class TagLogFile implements AutoCloseable {
   public long write(Map<String, String> tagMap, Map<String, String> 
attributeMap)
       throws IOException, MetadataException {
     ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
-    synchronized (this) {
-      // get offset and write data should be atomic operation
-      long offset = fileChannel.position();
-      fileChannel.write(byteBuffer);
-      return offset;
-    }
+    return write(byteBuffer, -1);
   }
 
   /** This method does not modify this file's current position. */
   public void write(Map<String, String> tagMap, Map<String, String> 
attributeMap, long position)
       throws IOException, MetadataException {
     ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
+    write(byteBuffer, position);
+  }
+
+  /**
+   * @param byteBuffer the data of record to be persisted
+   * @param position the target position to store the record in tagFile
+   * @return beginning position of the record in tagFile
+   */
+  private synchronized long write(ByteBuffer byteBuffer, long position) throws 
IOException {
+    if (position < 0) {
+      // append the record to file tail
+      position = fileChannel.size();
+    }
     fileChannel.write(byteBuffer, position);
+    unFlushedRecordNum++;
+    if (unFlushedRecordNum >= RECORD_FLUSH_INTERVAL) {
+      fileChannel.force(true);
+      unFlushedRecordNum = 0;
+    }
+    return position;
   }
 
   private ByteBuffer convertMapToByteBuffer(
       Map<String, String> tagMap, Map<String, String> attributeMap) throws 
MetadataException {
     ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
-    int length = serializeMap(tagMap, byteBuffer, 0);
-    length = serializeMap(attributeMap, byteBuffer, length);
-
-    // fill the remaining space
-    for (int i = length + 1; i <= MAX_LENGTH; i++) {
-      byteBuffer.put(FILL_BYTE);
-    }
+    serializeMap(tagMap, byteBuffer);
+    serializeMap(attributeMap, byteBuffer);
 
-    // persist to the disk
-    byteBuffer.flip();
+    // set position to 0 and the content in this buffer could be read
+    byteBuffer.position(0);
     return byteBuffer;
   }
 
-  private int serializeMap(Map<String, String> map, ByteBuffer byteBuffer, int 
length)
+  private void serializeMap(Map<String, String> map, ByteBuffer byteBuffer)
       throws MetadataException {
-    if (map == null) {
-      length += Integer.BYTES;
-      if (length > MAX_LENGTH) {
-        throw new MetadataException(LENGTH_EXCEED_MSG);
+    try {
+      if (map == null) {
+        ReadWriteIOUtils.write(0, byteBuffer);
+      } else {
+        ReadWriteIOUtils.write(map, byteBuffer);
       }
-      ReadWriteIOUtils.write(0, byteBuffer);
-      return length;
-    }
-    length += Integer.BYTES;
-    if (length > MAX_LENGTH) {
+    } catch (BufferOverflowException e) {
       throw new MetadataException(LENGTH_EXCEED_MSG);
     }
-    ReadWriteIOUtils.write(map.size(), byteBuffer);
-    byte[] bytes;
-    for (Map.Entry<String, String> entry : map.entrySet()) {
-      // serialize key
-      bytes = entry.getKey().getBytes();
-      length += (4 + bytes.length);
-      if (length > MAX_LENGTH) {
-        throw new MetadataException(LENGTH_EXCEED_MSG);
-      }
-      ReadWriteIOUtils.write(bytes.length, byteBuffer);
-      byteBuffer.put(bytes);
-
-      // serialize value
-      bytes = entry.getValue().getBytes();
-      length += (4 + bytes.length);
-      if (length > MAX_LENGTH) {
-        throw new MetadataException(LENGTH_EXCEED_MSG);
-      }
-      ReadWriteIOUtils.write(bytes.length, byteBuffer);
-      byteBuffer.put(bytes);
-    }
-    return length;
   }
 
   @Override

Reply via email to