This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_bufferwrite_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_bufferwrite_new by 
this push:
     new f0fbf5e  extract timestamp checking out of insert()
f0fbf5e is described below

commit f0fbf5e25897cf354f111dd36b10b89f8b4a11b4
Author: xiangdong huang <[email protected]>
AuthorDate: Wed Apr 24 15:51:15 2019 +0800

    extract timestamp checking out of insert()
---
 .../db/engine/tsfiledata/TsFileProcessor.java      | 30 ++++++++++++++--------
 1 file changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 194462f..5a1031c 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -179,8 +179,10 @@ public class TsFileProcessor extends Processor {
           unclosedFile = new File(unclosedFiles[0].getParentFile(), 
unclosedFileName);
         }
         File[] datas = dataFolder
-            .listFiles(x -> 
!x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX) && 
x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
-        Arrays.sort(datas, Comparator.comparingLong(x -> 
Long.parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
+            .listFiles(x -> 
!x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
+                && 
x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
+        Arrays.sort(datas, Comparator.comparingLong(x -> Long
+            
.parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
         for (File tsfile : datas) {
           //TODO we'd better define a file suffix for TsFile, e.g., .ts
           String[] names = 
tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
@@ -208,7 +210,7 @@ public class TsFileProcessor extends Processor {
       throw new BufferWriteProcessorException(String
           .format("TsProcessor %s has more than one unclosed TsFile. please 
repair it",
               processorName));
-    } else if (unclosedFileCount == 0){
+    } else if (unclosedFileCount == 0) {
       unclosedFile = generateNewTsFilePath();
     }
 
@@ -259,6 +261,10 @@ public class TsFileProcessor extends Processor {
 
   }
 
+  protected boolean canWrite(String device, long timestamp) {
+    return !lastFlushedTimeForEachDevice.containsKey(device)
+        || timestamp <= lastFlushedTimeForEachDevice.get(device);
+  }
   /**
    * wrete a ts record into the memtable. If the memory usage is beyond the 
memThreshold, an async
    * flushing operation will be called.
@@ -271,7 +277,7 @@ public class TsFileProcessor extends Processor {
    * @throws BufferWriteProcessorException if a flushing operation occurs and 
failed.
    */
   public int insert(InsertPlan plan) throws BufferWriteProcessorException, 
IOException {
-    if (lastFlushedTimeForEachDevice.containsKey(plan.getDeviceId()) && 
plan.getTime() <= lastFlushedTimeForEachDevice.get(plan.getDeviceId())) {
+    if (!canWrite(plan.getDeviceId(), plan.getTime())) {
       return WRITE_REJECT_BY_TIME;
     }
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
@@ -319,7 +325,7 @@ public class TsFileProcessor extends Processor {
     long time = plan.getTime();
     TSDataType type;
     String measurement;
-    for (int i=0; i < plan.getMeasurements().length; i++){
+    for (int i = 0; i < plan.getMeasurements().length; i++) {
       measurement = plan.getMeasurements()[i];
       type = fileSchemaRef.getMeasurementDataType(measurement);
       workMemTable.write(deviceId, measurement, type, time, 
plan.getValues()[i]);
@@ -327,8 +333,8 @@ public class TsFileProcessor extends Processor {
     if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
       minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
     }
-    if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) || 
maxWrittenTimeForEachDeviceInCurrentFile
-        .get(deviceId) < time) {
+    if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
+        || maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < time) {
       maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
     }
     valueCount++;
@@ -344,8 +350,8 @@ public class TsFileProcessor extends Processor {
    */
   public void delete(String deviceId, String measurementId, long timestamp) 
throws IOException {
     workMemTable.delete(deviceId, measurementId, timestamp);
-    if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) && 
maxWrittenTimeForEachDeviceInCurrentFile
-        .get(deviceId) < timestamp) {
+    if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
+        && maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < timestamp) 
{
       maxWrittenTimeForEachDeviceInCurrentFile
           .put(deviceId, lastFlushedTimeForEachDevice.getOrDefault(deviceId, 
0L));
     }
@@ -358,7 +364,8 @@ public class TsFileProcessor extends Processor {
     String fullPath = deviceId +
         IoTDBConstant.PATH_SEPARATOR + measurementId;
     Deletion deletion = new Deletion(fullPath, 
versionController.nextVersion(), timestamp);
-    if (deleteFlushTable || (currentResource.containsDevice(deviceId) && 
currentResource.getStartTime(deviceId) <= timestamp)) {
+    if (deleteFlushTable || (currentResource.containsDevice(deviceId)
+        && currentResource.getStartTime(deviceId) <= timestamp)) {
       currentResource.getModFile().write(deletion);
     }
     for (TsFileResource resource : tsFileResources) {
@@ -366,7 +373,8 @@ public class TsFileProcessor extends Processor {
         resource.getModFile().write(deletion);
       }
     }
-    if (lastFlushedTimeForEachDevice.containsKey(deviceId) && 
lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
+    if (lastFlushedTimeForEachDevice.containsKey(deviceId)
+        && lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
       lastFlushedTimeForEachDevice.put(deviceId, 0L);
     }
   }

Reply via email to