This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new cdcd0cc improve TagLogFile force flush (#4019)
cdcd0cc is described below
commit cdcd0cccabb4ac36dd0e17fc35aa4b5c0957cb14
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Sep 23 15:33:42 2021 +0800
improve TagLogFile force flush (#4019)
---
.../resources/conf/iotdb-engine.properties | 5 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++
.../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 10 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 ++
.../iotdb/db/metadata/logfile/TagLogFile.java | 86 +++++++++-------------
5 files changed, 69 insertions(+), 50 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 259996e..0fe904b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -263,6 +263,11 @@ timestamp_precision=ms
# the unit is byte
# tag_attribute_total_size=700
+# interval num for tag and attribute records when force flushing to disk
+# When a certain amount of tag and attribute records is reached, they will be
force flushed to disk
+# It is possible to lose at most tag_attribute_flush_interval records
+# tag_attribute_flush_interval=1000
+
# In one insert (one device, one timestamp, multiple measurements),
# if enable partial insert, one measurement failure will not impact other
measurements
# enable_partial_insert=true
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f9b8b43..3688151 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -638,6 +638,9 @@ public class IoTDBConfig {
// max size for tag and attribute of one time series
private int tagAttributeTotalSize = 700;
+ // Interval num of tag and attribute records when force flushing to disk
+ private int tagAttributeFlushInterval = 1000;
+
// In one insert (one device, one timestamp, multiple measurements),
// if enable partial insert, one measurement failure will not impact other
measurements
private boolean enablePartialInsert = true;
@@ -2062,6 +2065,14 @@ public class IoTDBConfig {
this.tagAttributeTotalSize = tagAttributeTotalSize;
}
+ public int getTagAttributeFlushInterval() {
+ return tagAttributeFlushInterval;
+ }
+
+ public void setTagAttributeFlushInterval(int tagAttributeFlushInterval) {
+ this.tagAttributeFlushInterval = tagAttributeFlushInterval;
+ }
+
public int getPrimitiveArraySize() {
return primitiveArraySize;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 53b418c..f761794 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@ -88,6 +88,10 @@ public class IoTDBConfigCheck {
private static final String TAG_ATTRIBUTE_SIZE_STRING =
"tag_attribute_total_size";
private static String tagAttributeTotalSize =
String.valueOf(config.getTagAttributeTotalSize());
+ private static final String TAG_ATTRIBUTE_FLUSH_INTERVAL =
"tag_attribute_flush_interval";
+ private static String tagAttributeFlushInterval =
+ String.valueOf(config.getTagAttributeFlushInterval());
+
private static final String MAX_DEGREE_OF_INDEX_STRING =
"max_degree_of_index_node";
private static String maxDegreeOfIndexNode =
String.valueOf(TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode());
@@ -151,6 +155,7 @@ public class IoTDBConfigCheck {
systemProperties.put(TSFILE_FILE_SYSTEM_STRING, tsfileFileSystem);
systemProperties.put(ENABLE_PARTITION_STRING,
String.valueOf(enablePartition));
systemProperties.put(TAG_ATTRIBUTE_SIZE_STRING, tagAttributeTotalSize);
+ systemProperties.put(TAG_ATTRIBUTE_FLUSH_INTERVAL,
tagAttributeFlushInterval);
systemProperties.put(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode);
systemProperties.put(VIRTUAL_STORAGE_GROUP_NUM, virtualStorageGroupNum);
systemProperties.put(TIME_ENCODER_KEY, timeEncoderValue);
@@ -255,6 +260,7 @@ public class IoTDBConfigCheck {
properties.setProperty(IOTDB_VERSION_STRING, IoTDBConstant.VERSION);
properties.setProperty(ENABLE_PARTITION_STRING,
String.valueOf(enablePartition));
properties.setProperty(TAG_ATTRIBUTE_SIZE_STRING, tagAttributeTotalSize);
+ properties.setProperty(TAG_ATTRIBUTE_FLUSH_INTERVAL,
tagAttributeFlushInterval);
properties.setProperty(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode);
properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
@@ -320,6 +326,10 @@ public class IoTDBConfigCheck {
printErrorLogAndExit(TAG_ATTRIBUTE_SIZE_STRING);
}
+ if
(!(properties.getProperty(TAG_ATTRIBUTE_FLUSH_INTERVAL).equals(tagAttributeFlushInterval)))
{
+ printErrorLogAndExit(TAG_ATTRIBUTE_FLUSH_INTERVAL);
+ }
+
if
(!(properties.getProperty(MAX_DEGREE_OF_INDEX_STRING).equals(maxDegreeOfIndexNode)))
{
printErrorLogAndExit(MAX_DEGREE_OF_INDEX_STRING);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7696908..6844bb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -624,6 +624,13 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty(
"tag_attribute_total_size",
String.valueOf(conf.getTagAttributeTotalSize()))));
+
+ conf.setTagAttributeFlushInterval(
+ Integer.parseInt(
+ properties.getProperty(
+ "tag_attribute_flush_interval",
+ String.valueOf(conf.getTagAttributeFlushInterval()))));
+
conf.setPrimitiveArraySize(
(Integer.parseInt(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java
b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java
index f82d9b7..701f5d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/TagLogFile.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
@@ -46,7 +47,9 @@ public class TagLogFile implements AutoCloseable {
private static final int MAX_LENGTH =
IoTDBDescriptor.getInstance().getConfig().getTagAttributeTotalSize();
- private static final byte FILL_BYTE = 0;
+ private static final int RECORD_FLUSH_INTERVAL =
+ IoTDBDescriptor.getInstance().getConfig().getTagAttributeFlushInterval();
+ private int unFlushedRecordNum = 0;
public TagLogFile(String schemaDir, String logFileName) throws IOException {
@@ -66,8 +69,7 @@ public class TagLogFile implements AutoCloseable {
logFile.toPath(),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
- StandardOpenOption.CREATE,
- StandardOpenOption.DSYNC);
+ StandardOpenOption.CREATE);
// move the current position to the tail of the file
this.fileChannel.position(fileChannel.size());
}
@@ -94,73 +96,57 @@ public class TagLogFile implements AutoCloseable {
public long write(Map<String, String> tagMap, Map<String, String>
attributeMap)
throws IOException, MetadataException {
ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
- synchronized (this) {
- // get offset and write data should be atomic operation
- long offset = fileChannel.position();
- fileChannel.write(byteBuffer);
- return offset;
- }
+ return write(byteBuffer, -1);
}
/** This method does not modify this file's current position. */
public void write(Map<String, String> tagMap, Map<String, String>
attributeMap, long position)
throws IOException, MetadataException {
ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
+ write(byteBuffer, position);
+ }
+
+ /**
+ * @param byteBuffer the data of record to be persisted
+ * @param position the target position to store the record in tagFile
+ * @return beginning position of the record in tagFile
+ */
+ private synchronized long write(ByteBuffer byteBuffer, long position) throws
IOException {
+ if (position < 0) {
+ // append the record to file tail
+ position = fileChannel.size();
+ }
fileChannel.write(byteBuffer, position);
+ unFlushedRecordNum++;
+ if (unFlushedRecordNum >= RECORD_FLUSH_INTERVAL) {
+ fileChannel.force(true);
+ unFlushedRecordNum = 0;
+ }
+ return position;
}
private ByteBuffer convertMapToByteBuffer(
Map<String, String> tagMap, Map<String, String> attributeMap) throws
MetadataException {
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
- int length = serializeMap(tagMap, byteBuffer, 0);
- length = serializeMap(attributeMap, byteBuffer, length);
-
- // fill the remaining space
- for (int i = length + 1; i <= MAX_LENGTH; i++) {
- byteBuffer.put(FILL_BYTE);
- }
+ serializeMap(tagMap, byteBuffer);
+ serializeMap(attributeMap, byteBuffer);
- // persist to the disk
- byteBuffer.flip();
+ // set position to 0 and the content in this buffer could be read
+ byteBuffer.position(0);
return byteBuffer;
}
- private int serializeMap(Map<String, String> map, ByteBuffer byteBuffer, int
length)
+ private void serializeMap(Map<String, String> map, ByteBuffer byteBuffer)
throws MetadataException {
- if (map == null) {
- length += Integer.BYTES;
- if (length > MAX_LENGTH) {
- throw new MetadataException(LENGTH_EXCEED_MSG);
+ try {
+ if (map == null) {
+ ReadWriteIOUtils.write(0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(map, byteBuffer);
}
- ReadWriteIOUtils.write(0, byteBuffer);
- return length;
- }
- length += Integer.BYTES;
- if (length > MAX_LENGTH) {
+ } catch (BufferOverflowException e) {
throw new MetadataException(LENGTH_EXCEED_MSG);
}
- ReadWriteIOUtils.write(map.size(), byteBuffer);
- byte[] bytes;
- for (Map.Entry<String, String> entry : map.entrySet()) {
- // serialize key
- bytes = entry.getKey().getBytes();
- length += (4 + bytes.length);
- if (length > MAX_LENGTH) {
- throw new MetadataException(LENGTH_EXCEED_MSG);
- }
- ReadWriteIOUtils.write(bytes.length, byteBuffer);
- byteBuffer.put(bytes);
-
- // serialize value
- bytes = entry.getValue().getBytes();
- length += (4 + bytes.length);
- if (length > MAX_LENGTH) {
- throw new MetadataException(LENGTH_EXCEED_MSG);
- }
- ReadWriteIOUtils.write(bytes.length, byteBuffer);
- byteBuffer.put(bytes);
- }
- return length;
}
@Override