This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch refactor_bufferwrite_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_bufferwrite_new by
this push:
new f0fbf5e extract timestamp checking out of insert()
f0fbf5e is described below
commit f0fbf5e25897cf354f111dd36b10b89f8b4a11b4
Author: xiangdong huang <[email protected]>
AuthorDate: Wed Apr 24 15:51:15 2019 +0800
extract timestamp checking out of insert()
---
.../db/engine/tsfiledata/TsFileProcessor.java | 30 ++++++++++++++--------
1 file changed, 19 insertions(+), 11 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 194462f..5a1031c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -179,8 +179,10 @@ public class TsFileProcessor extends Processor {
unclosedFile = new File(unclosedFiles[0].getParentFile(),
unclosedFileName);
}
File[] datas = dataFolder
- .listFiles(x ->
!x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX) &&
x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
- Arrays.sort(datas, Comparator.comparingLong(x ->
Long.parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
+ .listFiles(x ->
!x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
+ &&
x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
+ Arrays.sort(datas, Comparator.comparingLong(x -> Long
+
.parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
for (File tsfile : datas) {
//TODO we'd better define a file suffix for TsFile, e.g., .ts
String[] names =
tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
@@ -208,7 +210,7 @@ public class TsFileProcessor extends Processor {
throw new BufferWriteProcessorException(String
.format("TsProcessor %s has more than one unclosed TsFile. please
repair it",
processorName));
- } else if (unclosedFileCount == 0){
+ } else if (unclosedFileCount == 0) {
unclosedFile = generateNewTsFilePath();
}
@@ -259,6 +261,10 @@ public class TsFileProcessor extends Processor {
}
+ protected boolean canWrite(String device, long timestamp) {
+ return !lastFlushedTimeForEachDevice.containsKey(device)
+ || timestamp <= lastFlushedTimeForEachDevice.get(device);
+ }
/**
* wrete a ts record into the memtable. If the memory usage is beyond the
memThreshold, an async
* flushing operation will be called.
@@ -271,7 +277,7 @@ public class TsFileProcessor extends Processor {
* @throws BufferWriteProcessorException if a flushing operation occurs and
failed.
*/
public int insert(InsertPlan plan) throws BufferWriteProcessorException,
IOException {
- if (lastFlushedTimeForEachDevice.containsKey(plan.getDeviceId()) &&
plan.getTime() <= lastFlushedTimeForEachDevice.get(plan.getDeviceId())) {
+ if (!canWrite(plan.getDeviceId(), plan.getTime())) {
return WRITE_REJECT_BY_TIME;
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
@@ -319,7 +325,7 @@ public class TsFileProcessor extends Processor {
long time = plan.getTime();
TSDataType type;
String measurement;
- for (int i=0; i < plan.getMeasurements().length; i++){
+ for (int i = 0; i < plan.getMeasurements().length; i++) {
measurement = plan.getMeasurements()[i];
type = fileSchemaRef.getMeasurementDataType(measurement);
workMemTable.write(deviceId, measurement, type, time,
plan.getValues()[i]);
@@ -327,8 +333,8 @@ public class TsFileProcessor extends Processor {
if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
}
- if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) ||
maxWrittenTimeForEachDeviceInCurrentFile
- .get(deviceId) < time) {
+ if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
+ || maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < time) {
maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
}
valueCount++;
@@ -344,8 +350,8 @@ public class TsFileProcessor extends Processor {
*/
public void delete(String deviceId, String measurementId, long timestamp)
throws IOException {
workMemTable.delete(deviceId, measurementId, timestamp);
- if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) &&
maxWrittenTimeForEachDeviceInCurrentFile
- .get(deviceId) < timestamp) {
+ if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
+ && maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < timestamp)
{
maxWrittenTimeForEachDeviceInCurrentFile
.put(deviceId, lastFlushedTimeForEachDevice.getOrDefault(deviceId,
0L));
}
@@ -358,7 +364,8 @@ public class TsFileProcessor extends Processor {
String fullPath = deviceId +
IoTDBConstant.PATH_SEPARATOR + measurementId;
Deletion deletion = new Deletion(fullPath,
versionController.nextVersion(), timestamp);
- if (deleteFlushTable || (currentResource.containsDevice(deviceId) &&
currentResource.getStartTime(deviceId) <= timestamp)) {
+ if (deleteFlushTable || (currentResource.containsDevice(deviceId)
+ && currentResource.getStartTime(deviceId) <= timestamp)) {
currentResource.getModFile().write(deletion);
}
for (TsFileResource resource : tsFileResources) {
@@ -366,7 +373,8 @@ public class TsFileProcessor extends Processor {
resource.getModFile().write(deletion);
}
}
- if (lastFlushedTimeForEachDevice.containsKey(deviceId) &&
lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
+ if (lastFlushedTimeForEachDevice.containsKey(deviceId)
+ && lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
lastFlushedTimeForEachDevice.put(deviceId, 0L);
}
}