This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7c88b7d [IOTDB-58] Replace list by array and refactor TsFileResource
(#163)
7c88b7d is described below
commit 7c88b7d78ed66bb46613e48969ac0cccfc795d1d
Author: Xiangdong Huang <[email protected]>
AuthorDate: Sat Apr 27 15:41:38 2019 +0800
[IOTDB-58] Replace list by array and refactor TsFileResource (#163)
* BufferWriteProcessor: remove fileName (because insertFilePath is enough)
* TsFileResource: remove relativePath; using autoRead for automatically
initialize startTimeMap and endTimeMap
* IMemTable, AbstractMemTable: add returned value of delete()
* TsRecord: change dataPoints from List to Array. And in many other
classes, if Array is enough, then replace List<> by Array.
* For insertion, add a new method write(long time, Object value) to replace
write(long time, String value), because the latter has to call a switch-case
and something like Integer.valueOf(value).
* move recovery function from TsFileSequenceReader to its subclass
* add append option in TsFile writer and add a subclass of
TsFileSequenceReader for supporting auto repair
* fix stream unclosed when exception occurs when creating TsFileReader
* reject writes when the memory is dangerous
* faster memtable.getSortedTimeValuePairList
* reduce memory cost when sorting data in memory
* fix equals() method in InsertPlan
* make TsFileWriter as AutoClosable; make TsFileReadWriteTest more concise
* make stop tsfileProcess more safe
---
iotdb/iotdb/conf/logback.xml | 2 -
.../java/org/apache/iotdb/db/engine/Processor.java | 2 +-
.../engine/bufferwrite/BufferWriteProcessor.java | 36 ++--
.../bufferwrite/RestorableTsFileIOWriter.java | 19 +-
.../iotdb/db/engine/filenode/FileNodeManager.java | 24 ++-
.../db/engine/filenode/FileNodeProcessor.java | 87 ++++-----
.../iotdb/db/engine/filenode/TsFileResource.java | 216 ++++++++++++---------
.../iotdb/db/engine/memtable/AbstractMemTable.java | 14 +-
.../apache/iotdb/db/engine/memtable/IMemTable.java | 7 +-
.../db/engine/memtable/IWritableMemChunk.java | 2 +
.../db/engine/memtable/MemTableFlushUtil.java | 10 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 27 ++-
.../io/OverflowedTsFileIOWriter.java} | 29 +--
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 18 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 4 +-
.../iotdb/db/qp/logical/crud/InsertOperator.java | 12 +-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 25 +--
.../iotdb/db/qp/strategy/LogicalGenerator.java | 8 +-
.../db/query/control/QueryResourceManager.java | 6 +-
.../EngineExecutorWithoutTimeGenerator.java | 1 -
.../iotdb/db/sync/receiver/SyncServiceImpl.java | 91 ++++-----
.../java/org/apache/iotdb/db/utils/MemUtils.java | 24 +++
.../writelog/manager/MultiFileLogNodeManager.java | 1 +
.../db/writelog/manager/WriteLogNodeManager.java | 6 +-
.../db/writelog/replay/ConcreteLogReplayer.java | 12 +-
.../iotdb/db/writelog/transfer/CodecInstances.java | 18 +-
.../bufferwrite/BufferWriteProcessorNewTest.java | 1 -
.../db/engine/filenode/TsFileResourceTest.java | 12 +-
.../engine/overflow/io/OverflowResourceTest.java | 24 +--
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 4 +-
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 8 +-
.../apache/iotdb/db/writelog/PerformanceTest.java | 14 +-
.../org/apache/iotdb/db/writelog/RecoverTest.java | 12 +-
.../iotdb/db/writelog/WriteLogNodeManagerTest.java | 8 +-
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 25 +--
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 8 +-
.../transfer/PhysicalPlanLogTransferTest.java | 2 +-
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 4 +-
.../apache/iotdb/tsfile/read/ReadOnlyTsFile.java | 2 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 13 +-
.../iotdb/tsfile/write/TsFileReadWriteTest.java | 4 +-
41 files changed, 454 insertions(+), 388 deletions(-)
diff --git a/iotdb/iotdb/conf/logback.xml b/iotdb/iotdb/conf/logback.xml
index 1f8a915..2629723 100644
--- a/iotdb/iotdb/conf/logback.xml
+++ b/iotdb/iotdb/conf/logback.xml
@@ -111,7 +111,6 @@
</appender>
<logger level="info" name="org.apache.iotdb.db.service"/>
<logger level="info" name="org.apache.iotdb.db.conf"/>
-
<!-- a log appender that collect all log records whose level is greather
than debug-->
<appender class="ch.qos.logback.core.rolling.RollingFileAppender"
name="FILEALL">
<file>${IOTDB_HOME}/logs/log_all.log</file>
@@ -130,7 +129,6 @@
<level>INFO</level>
</filter>
</appender>
-
<root level="info">
<appender-ref ref="FILEDEBUG"/>
<appender-ref ref="FILEWARN"/>
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index cba7463..5c515ff 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.db.exception.ProcessorException;
public abstract class Processor {
private final ReadWriteLock lock;
- private String processorName;
+ protected String processorName;
/**
* Construct processor using name space seriesPath
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index e770e85..f8ed671 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.bufferwrite;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -77,7 +78,6 @@ public class BufferWriteProcessor extends Processor {
private long valueCount = 0;
private String baseDir;
- private String fileName;
private String insertFilePath;
private String bufferWriteRelativePath;
@@ -108,7 +108,6 @@ public class BufferWriteProcessor extends Processor {
bufferwriteCloseAction =
parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
filenodeFlushAction =
parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
-
reopen(fileName);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
try {
@@ -128,22 +127,11 @@ public class BufferWriteProcessor extends Processor {
if (!isClosed) {
return;
}
- this.fileName = fileName;
- String bDir = baseDir;
- if (bDir.length() > 0 && bDir.charAt(bDir.length() - 1) !=
File.separatorChar) {
- bDir = bDir + File.separatorChar;
- }
- String dataDirPath = bDir + getProcessorName();
- File dataDir = new File(dataDirPath);
- if (!dataDir.exists()) {
- dataDir.mkdirs();
- LOGGER.debug("The bufferwrite processor data dir doesn't exists, create
new directory {}.",
- dataDirPath);
- }
- this.insertFilePath = new File(dataDir, fileName).getPath();
- bufferWriteRelativePath = getProcessorName() + File.separatorChar +
fileName;
+ new File(baseDir, processorName).mkdirs();
+ this.insertFilePath = Paths.get(baseDir, processorName,
fileName).toString();
+ bufferWriteRelativePath = processorName + File.separatorChar + fileName;
try {
- writer = new RestorableTsFileIOWriter(getProcessorName(),
insertFilePath);
+ writer = new RestorableTsFileIOWriter(processorName, insertFilePath);
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
}
@@ -162,6 +150,7 @@ public class BufferWriteProcessor extends Processor {
}
}
+
/**
* write one data point to the buffer write.
*
@@ -433,7 +422,7 @@ public class BufferWriteProcessor extends Processor {
LOGGER.info(
"Close bufferwrite processor {}, the file name is {}, start time
is {}, end time is {}, "
+ "time consumption is {}ms",
- getProcessorName(), fileName,
+ getProcessorName(), insertFilePath,
DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
closeEndTime - closeStartTime);
@@ -489,9 +478,6 @@ public class BufferWriteProcessor extends Processor {
return baseDir;
}
- public String getFileName() {
- return fileName;
- }
public String getFileRelativePath() {
return bufferWriteRelativePath;
@@ -561,12 +547,16 @@ public class BufferWriteProcessor extends Processor {
}
BufferWriteProcessor that = (BufferWriteProcessor) o;
return Objects.equals(baseDir, that.baseDir) &&
- Objects.equals(fileName, that.fileName);
+ Objects.equals(insertFilePath, that.insertFilePath);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), baseDir, fileName);
+ return Objects.hash(super.hashCode(), baseDir, insertFilePath);
+ }
+
+ public String getInsertFilePath() {
+ return insertFilePath;
}
@Override
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
index 7898246..a7542b5 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
@@ -58,7 +58,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
private static final int TS_METADATA_BYTE_SIZE = 4;
private static final int TS_POSITION_BYTE_SIZE = 8;
- private static final String RESTORE_SUFFIX = ".restore";
+ public static final String RESTORE_SUFFIX = ".restore";
private static final String DEFAULT_MODE = "rw";
private int lastFlushedChunkGroupIndex = 0;
@@ -83,7 +83,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
private boolean isNewResource = false;
- RestorableTsFileIOWriter(String processorName, String insertFilePath) throws
IOException {
+ public RestorableTsFileIOWriter(String processorName, String insertFilePath)
throws IOException {
super();
this.insertFilePath = insertFilePath;
this.restoreFilePath = insertFilePath + RESTORE_SUFFIX;
@@ -121,7 +121,6 @@ public class RestorableTsFileIOWriter extends
TsFileIOWriter {
} catch (IOException e) {
LOGGER.info("remove unsealed tsfile restore file failed: ", e);
}
-
this.out = new DefaultTsFileOutput(new FileOutputStream(insertFile));
this.chunkGroupMetaDataList = new ArrayList<>();
lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
@@ -157,8 +156,8 @@ public class RestorableTsFileIOWriter extends
TsFileIOWriter {
TsDeviceMetadata tsDeviceMetadata = new TsDeviceMetadata();
this.getAppendedRowGroupMetadata();
tsDeviceMetadata.setChunkGroupMetadataList(this.append);
- RandomAccessFile out = new RandomAccessFile(restoreFilePath, DEFAULT_MODE);
- try {
+
+ try (RandomAccessFile out = new RandomAccessFile(restoreFilePath,
DEFAULT_MODE)) {
if (out.length() > 0) {
out.seek(out.length() - TS_POSITION_BYTE_SIZE);
}
@@ -172,8 +171,6 @@ public class RestorableTsFileIOWriter extends
TsFileIOWriter {
// write tsfile position using byte[8] which is a long
byte[] lastPositionBytes = BytesUtils.longToBytes(lastPosition);
out.write(lastPositionBytes);
- } finally {
- out.close();
}
}
@@ -222,7 +219,7 @@ public class RestorableTsFileIOWriter extends
TsFileIOWriter {
* @param dataType the value type
* @return chunks' metadata
*/
- List<ChunkMetaData> getMetadatas(String deviceId, String measurementId,
TSDataType dataType) {
+ public List<ChunkMetaData> getMetadatas(String deviceId, String
measurementId, TSDataType dataType) {
List<ChunkMetaData> chunkMetaDatas = new ArrayList<>();
if (metadatas.containsKey(deviceId) &&
metadatas.get(deviceId).containsKey(measurementId)) {
for (ChunkMetaData chunkMetaData :
metadatas.get(deviceId).get(measurementId)) {
@@ -241,7 +238,7 @@ public class RestorableTsFileIOWriter extends
TsFileIOWriter {
return insertFilePath;
}
- String getRestoreFilePath() {
+ public String getRestoreFilePath() {
return restoreFilePath;
}
@@ -261,7 +258,7 @@ public class RestorableTsFileIOWriter extends
TsFileIOWriter {
* add all appendChunkGroupMetadatas into memory. After calling this method,
other classes can
* read these metadata.
*/
- void appendMetadata() {
+ public void appendMetadata() {
if (!append.isEmpty()) {
for (ChunkGroupMetaData rowGroupMetaData : append) {
for (ChunkMetaData chunkMetaData :
rowGroupMetaData.getChunkMetaDataList()) {
@@ -290,7 +287,7 @@ public class RestorableTsFileIOWriter extends
TsFileIOWriter {
try {
Files.delete(Paths.get(restoreFilePath));
} catch (IOException e) {
- LOGGER.info("delete restore file {} failed, because ", restoreFilePath,
e);
+ LOGGER.info("delete restore file {} failed, because {}",
restoreFilePath, e.getMessage());
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index baf9fa6..22778bb 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -325,11 +325,13 @@ public class FileNodeManager implements IStatistic,
IService {
throws FileNodeManagerException {
try {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- List<String> measurementList = new ArrayList<>();
- List<String> insertValues = new ArrayList<>();
+ String[] measurementList = new String[tsRecord.dataPointList.size()];
+ String[] insertValues = new String[tsRecord.dataPointList.size()];
+ int i=0;
for (DataPoint dp : tsRecord.dataPointList) {
- measurementList.add(dp.getMeasurementId());
- insertValues.add(dp.getValue().toString());
+ measurementList[i] = dp.getMeasurementId();
+ insertValues[i] = dp.getValue().toString();
+ i++;
}
logNode.write(new InsertPlan(2, tsRecord.deviceId, tsRecord.time,
measurementList,
insertValues));
@@ -410,7 +412,7 @@ public class FileNodeManager implements IStatistic,
IService {
String bufferwriteBaseDir = bufferWriteProcessor.getBaseDir();
String bufferwriteRelativePath =
bufferWriteProcessor.getFileRelativePath();
try {
- fileNodeProcessor.addIntervalFileNode(bufferwriteBaseDir,
bufferwriteRelativePath);
+ fileNodeProcessor.addIntervalFileNode(new File(new
File(bufferwriteBaseDir), bufferwriteRelativePath));
} catch (Exception e) {
if (!isMonitor) {
updateStatHashMapWhenFail(tsRecord);
@@ -447,7 +449,7 @@ public class FileNodeManager implements IStatistic,
IService {
"The filenode processor {} will close the bufferwrite processor, "
+ "because the size[{}] of tsfile {} reaches the threshold {}",
filenodeName,
MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
- bufferWriteProcessor.getFileName(), MemUtils.bytesCntToStr(
+ bufferWriteProcessor.getInsertFilePath(), MemUtils.bytesCntToStr(
IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold()));
}
@@ -568,14 +570,16 @@ public class FileNodeManager implements IStatistic,
IService {
Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
throws FileNodeManagerException {
if (!processorMap.containsKey(processorName)) {
+ //TODO do we need to call processorIterator.remove() ?
LOGGER.warn("The processorMap doesn't contain the filenode processor
{}.", processorName);
return;
}
LOGGER.info("Try to delete the filenode processor {}.", processorName);
FileNodeProcessor processor = processorMap.get(processorName);
if (!processor.tryWriteLock()) {
- LOGGER.warn("Can't get the write lock of the filenode processor {}.",
processorName);
- return;
+ throw new FileNodeManagerException(String
+ .format("Can't delete the filenode processor %s because Can't get
the write lock.",
+ processorName));
}
try {
@@ -605,7 +609,7 @@ public class FileNodeManager implements IStatistic,
IService {
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
try {
fileNodeProcessor.deleteBufferWrite(deviceId, measurementId, timestamp);
- } catch (IOException e) {
+ } catch (BufferWriteProcessorException | IOException e) {
throw new FileNodeManagerException(e);
} finally {
fileNodeProcessor.writeUnlock();
@@ -722,7 +726,7 @@ public class FileNodeManager implements IStatistic,
IService {
// append file to storage group.
fileNodeProcessor.appendFile(appendFile, appendFilePath);
} catch (FileNodeProcessorException e) {
- LOGGER.error("Cannot append the file {} to {}",
appendFile.getFilePath(), fileNodeName, e);
+ LOGGER.error("Cannot append the file {} to {}",
appendFile.getFile().getAbsolutePath(), fileNodeName, e);
throw new FileNodeManagerException(e);
} finally {
fileNodeProcessor.writeUnlock();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index d97594c..6428fc6 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -256,8 +256,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
} catch (FileNodeProcessorException e) {
LOGGER.error(
"The fileNode processor {} encountered an error when recoverying
restore " +
- "information.",
- processorName, e);
+ "information.", processorName);
throw new FileNodeProcessorException(e);
}
// TODO deep clone the lastupdate time
@@ -346,10 +345,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
/**
* add interval FileNode.
*/
- void addIntervalFileNode(String baseDir, String fileName) throws
ActionException {
+ void addIntervalFileNode(File file) throws ActionException, IOException {
- TsFileResource tsFileResource = new
TsFileResource(OverflowChangeType.NO_CHANGE, baseDir,
- fileName);
+ TsFileResource tsFileResource = new TsFileResource(file, false);
this.currentTsFileResource = tsFileResource;
newFileNodes.add(tsFileResource);
fileNodeProcessorStore.setNewFileNodes(newFileNodes);
@@ -384,22 +382,6 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
return currentTsFileResource.getStartTime(deviceId);
}
- /**
- * clear filenode.
- */
- public void clearFileNode() {
- isOverflowed = false;
- emptyTsFileResource = new TsFileResource(OverflowChangeType.NO_CHANGE,
null);
- newFileNodes = new ArrayList<>();
- isMerging = FileNodeProcessorStatus.NONE;
- numOfMergeFile = 0;
- fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
- fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
- fileNodeProcessorStore.setNewFileNodes(newFileNodes);
- fileNodeProcessorStore.setNumOfMergeFile(numOfMergeFile);
- fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource);
- }
-
private void addAllFileIntoIndex(List<TsFileResource> fileList) {
// clear map
invertedIndexOfFiles.clear();
@@ -451,7 +433,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
currentTsFileResource = newFileNodes.get(newFileNodes.size() - 1);
// this bufferwrite file is not close by normal operation
- String damagedFilePath = newFileNodes.get(newFileNodes.size() -
1).getFilePath();
+ String damagedFilePath = newFileNodes.get(newFileNodes.size() -
1).getFile().getAbsolutePath();
String[] fileNames = damagedFilePath.split("\\" + File.separator);
// all information to recovery the damaged file.
// contains file seriesPath, action parameters and processorName
@@ -572,7 +554,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
*/
public OverflowProcessor getOverflowProcessor() {
if (overflowProcessor == null || overflowProcessor.isClosed()) {
- LOGGER.error("The overflow processor is null or closed when getting the
overflowProcessor");
+ LOGGER.error("The overflow processor is null when getting the
overflowProcessor");
}
return overflowProcessor;
}
@@ -581,6 +563,15 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
return overflowProcessor != null && !overflowProcessor.isClosed();
}
+ public void setBufferwriteProcessroToClosed() {
+
+ bufferWriteProcessor = null;
+ }
+
+ public boolean hasBufferwriteProcessor() {
+
+ return bufferWriteProcessor != null;
+ }
/**
* set last update time.
@@ -785,8 +776,13 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed()
&& !newFileNodes.get(newFileNodes.size() -
1).getStartTimeMap().isEmpty()) {
unsealedTsFile = new UnsealedTsFile();
- unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() -
1).getFilePath());
-
+ unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() -
1).getFile().getAbsolutePath());
+ if (bufferWriteProcessor == null) {
+ throw new FileNodeProcessorException(String.format(
+ "The last of tsfile %s in filenode processor %s is not closed, "
+ + "but the bufferwrite processor is null.",
+ newFileNodes.get(newFileNodes.size() -
1).getFile().getAbsolutePath(), getProcessorName()));
+ }
try {
bufferwritedata = bufferWriteProcessor
.queryBufferWriteData(deviceId, measurementId, dataType,
mSchema.getProps());
@@ -824,12 +820,12 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
public void appendFile(TsFileResource appendFile, String appendFilePath)
throws FileNodeProcessorException {
try {
- if (!new File(appendFile.getFilePath()).getParentFile().exists()) {
- new File(appendFile.getFilePath()).getParentFile().mkdirs();
+ if (!appendFile.getFile().getParentFile().exists()) {
+ appendFile.getFile().getParentFile().mkdirs();
}
// move file
File originFile = new File(appendFilePath);
- File targetFile = new File(appendFile.getFilePath());
+ File targetFile = appendFile.getFile();
if (!originFile.exists()) {
throw new FileNodeProcessorException(
String.format("The appended file %s does not exist.",
appendFilePath));
@@ -837,7 +833,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
if (targetFile.exists()) {
throw new FileNodeProcessorException(
String.format("The appended target file %s already exists.",
- appendFile.getFilePath()));
+ appendFile.getFile().getAbsolutePath()));
}
if (!originFile.renameTo(targetFile)) {
LOGGER.warn("File renaming failed when appending new file. Origin: {},
Target: {}",
@@ -899,7 +895,7 @@ public class FileNodeProcessor extends Processor implements
IStatistic {
}
java.nio.file.Path link =
FileSystems.getDefault().getPath(newFile.getPath());
java.nio.file.Path target = FileSystems.getDefault()
- .getPath(tsFileResource.getFilePath());
+ .getPath(tsFileResource.getFile().getAbsolutePath());
Files.createLink(link, target);
overlapFiles.add(newFile.getPath());
break;
@@ -1213,8 +1209,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
}
}
TsFileResource node = new TsFileResource(startTimeMap, endTimeMap,
- tsFileResource.getOverflowChangeType(),
tsFileResource.getBaseDirIndex(),
- tsFileResource.getRelativePath());
+ tsFileResource.getOverflowChangeType(), tsFileResource.getFile());
result.add(node);
}
}
@@ -1345,7 +1340,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// add the restore file, if the last file is not closed
if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed()) {
String bufferFileRestorePath =
- newFileNodes.get(newFileNodes.size() - 1).getFilePath() +
RESTORE_FILE_SUFFIX;
+ newFileNodes.get(newFileNodes.size() -
1).getFile().getAbsolutePath() + RESTORE_FILE_SUFFIX;
bufferFiles.add(bufferFileRestorePath);
}
@@ -1404,7 +1399,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
private void collectBufferWriteFiles(Set<String> bufferFiles) {
for (TsFileResource bufferFileNode : newFileNodes) {
- String bufferFilePath = bufferFileNode.getFilePath();
+ String bufferFilePath = bufferFileNode.getFile().getAbsolutePath();
if (bufferFilePath != null) {
bufferFiles.add(bufferFilePath);
}
@@ -1450,7 +1445,6 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// losing some modification.
mergeDeleteLock.lock();
QueryContext context = new QueryContext();
-
try {
FileReaderManager.getInstance().increaseFileReaderReference(backupIntervalFile.getFilePath(),
true);
@@ -1477,7 +1471,6 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
// query one measurement in the special deviceId
String measurementId = path.getMeasurement();
TSDataType dataType = mManager.getSeriesType(path.getFullPath());
-
OverflowSeriesDataSource overflowSeriesDataSource =
overflowProcessor.queryMerge(deviceId,
measurementId, dataType, true, context);
Filter timeFilter = FilterFactory
@@ -1515,8 +1508,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
if (mergeFileWriter != null) {
mergeFileWriter.endFile(fileSchema);
}
-
backupIntervalFile.setBaseDirIndex(directories.getTsFileFolderIndex(mergeBaseDir));
- backupIntervalFile.setRelativePath(mergeFileName);
+ backupIntervalFile.setFile(new File(mergeBaseDir + File.separator +
mergeFileName));
backupIntervalFile.setOverflowChangeType(OverflowChangeType.NO_CHANGE);
backupIntervalFile.setStartTimeMap(startTimeMap);
backupIntervalFile.setEndTimeMap(endTimeMap);
@@ -1673,8 +1665,9 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
return false;
}
if (!newMultiPassLock.writeLock().tryLock()) {
- LOGGER.info("The filenode {} can't be closed, because it can't get
newMultiPassLock {}",
- getProcessorName(), newMultiPassLock);
+ LOGGER.warn(
+ "The filenode {} can't be closed, because it can't get
newMultiPassLock {}. The newMultiPassTokenSet is {}",
+ getProcessorName(), newMultiPassLock, newMultiPassTokenSet);
return false;
}
@@ -1724,6 +1717,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
waitForBufferWriteClose();
}
bufferWriteProcessor.close();
+ bufferWriteProcessor = null;
} catch (BufferWriteProcessorException e) {
throw new FileNodeProcessorException(e);
}
@@ -1834,12 +1828,15 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
private FileNodeProcessorStore readStoreFromDisk() throws
FileNodeProcessorException {
synchronized (fileNodeRestoreLock) {
-
File restoreFile = new File(fileNodeRestoreFilePath);
if (!restoreFile.exists() || restoreFile.length() == 0) {
- return new FileNodeProcessorStore(false, new HashMap<>(),
- new TsFileResource(OverflowChangeType.NO_CHANGE, null),
- new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
+ try {
+ return new FileNodeProcessorStore(false, new HashMap<>(),
+ new TsFileResource(null, false),
+ new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
}
try (FileInputStream inputStream = new
FileInputStream(fileNodeRestoreFilePath)) {
return FileNodeProcessorStore.deSerialize(inputStream);
@@ -1916,7 +1913,7 @@ public class FileNodeProcessor extends Processor
implements IStatistic {
* Similar to delete(), but only deletes data in BufferWrite. Only used by
WAL recovery.
*/
public void deleteBufferWrite(String deviceId, String measurementId, long
timestamp)
- throws IOException {
+ throws IOException, BufferWriteProcessorException {
String fullPath = deviceId +
IoTDBConstant.PATH_SEPARATOR + measurementId;
long version = versionController.nextVersion();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
index c349c62..3d6d74a 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java
@@ -20,16 +20,25 @@ package org.apache.iotdb.db.engine.filenode;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
@@ -38,71 +47,90 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class TsFileResource {
private OverflowChangeType overflowChangeType;
+
+ //the file index of `settled` folder in the Directories.
private int baseDirIndex;
- private String relativePath;
+ private File file;
private Map<String, Long> startTimeMap;
private Map<String, Long> endTimeMap;
private Set<String> mergeChanged = new HashSet<>();
private transient ModificationFile modFile;
- public TsFileResource(Map<String, Long> startTimeMap, Map<String, Long>
endTimeMap,
- OverflowChangeType type, int baseDirIndex, String relativePath) {
-
- this.overflowChangeType = type;
- this.baseDirIndex = baseDirIndex;
- this.relativePath = relativePath;
-
- this.startTimeMap = startTimeMap;
- this.endTimeMap = endTimeMap;
- this.modFile = new ModificationFile(
- Directories.getInstance().getTsFileFolder(baseDirIndex) +
File.separator
- + relativePath + ModificationFile.FILE_SUFFIX);
+ /**
+ * @param autoRead whether read the file to initialize startTimeMap and
endTimeMap
+ */
+ public TsFileResource(File file, boolean autoRead) throws IOException {
+ this(new HashMap<>(), new HashMap<>(), OverflowChangeType.NO_CHANGE, file);
+ if (autoRead) {
+ //init startTime and endTime
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(file.getAbsolutePath())) {
+ if (reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+ //this is a complete tsfile, and we can read the metadata directly.
+ for (Map.Entry<String, TsDeviceMetadataIndex> deviceEntry :
reader.readFileMetadata()
+ .getDeviceMap().entrySet()) {
+ startTimeMap.put(deviceEntry.getKey(),
deviceEntry.getValue().getStartTime());
+ endTimeMap.put(deviceEntry.getKey(),
deviceEntry.getValue().getEndTime());
+ }
+ } else {
+ //sadly, this is not a complete tsfile. we have to repair it bytes
by bytes
+ //TODO will implement it
+ List<ChunkGroupMetaData> metaDataList = new ArrayList<>();
+ reader.selfCheck(null, metaDataList, false);
+ initTimeMapFromChunGroupMetaDatas(metaDataList);
+ }
+ }
+ }
}
/**
- * This is just used to construct a new bufferwritefile.
- *
- * @param type whether this file is affected by overflow and how it is
affected.
- * @param relativePath the path of the file relative to the FileNode.
+ * @param writer an unclosed TsFile Writer
*/
- public TsFileResource(OverflowChangeType type, int baseDirIndex, String
relativePath) {
-
- this.overflowChangeType = type;
- this.baseDirIndex = baseDirIndex;
- this.relativePath = relativePath;
-
- startTimeMap = new HashMap<>();
- endTimeMap = new HashMap<>();
- this.modFile = new ModificationFile(
- Directories.getInstance().getTsFileFolder(baseDirIndex) +
File.separator
- + relativePath + ModificationFile.FILE_SUFFIX);
+ public TsFileResource(File file, RestorableTsFileIOWriter writer) {
+ this(new HashMap<>(), new HashMap<>(), OverflowChangeType.NO_CHANGE, file);
+ initTimeMapFromChunGroupMetaDatas(writer.getChunkGroupMetaDatas());
+ }
+
+ private void initTimeMapFromChunGroupMetaDatas(List<ChunkGroupMetaData>
metaDataList) {
+ for (ChunkGroupMetaData metaData : metaDataList) {
+ long startTime = startTimeMap.getOrDefault(metaData.getDeviceID(),
Long.MAX_VALUE);
+ long endTime = endTimeMap.getOrDefault(metaData.getDeviceID(),
Long.MIN_VALUE);
+ for (ChunkMetaData chunk : metaData.getChunkMetaDataList()) {
+ if (chunk.getStartTime() < startTime) {
+ startTime = chunk.getStartTime();
+ }
+ if (chunk.getEndTime() > endTime) {
+ endTime = chunk.getEndTime();
+ }
+ }
+ startTimeMap.put(metaData.getDeviceID(), startTime);
+ endTimeMap.put(metaData.getDeviceID(), endTime);
+ }
}
- public TsFileResource(OverflowChangeType type, String baseDir, String
relativePath) {
- this.overflowChangeType = type;
- this.baseDirIndex =
Directories.getInstance().getTsFileFolderIndex(baseDir);
- this.relativePath = relativePath;
+ public TsFileResource(Map<String, Long> startTimeMap, Map<String, Long>
endTimeMap,
+ OverflowChangeType type, File file) {
- startTimeMap = new HashMap<>();
- endTimeMap = new HashMap<>();
- this.modFile = new ModificationFile(
- Directories.getInstance().getTsFileFolder(baseDirIndex) +
File.separator
- + relativePath + ModificationFile.FILE_SUFFIX);
- }
+ this.overflowChangeType = type;
+ if (file != null) {
+ this.baseDirIndex = Directories.getInstance()
+ .getTsFileFolderIndex(file.getParentFile().getParent());
+ this.modFile = new ModificationFile(file.getAbsolutePath() +
ModificationFile.FILE_SUFFIX);
+ }
+ this.file = file;
- public TsFileResource(OverflowChangeType type, String relativePath) {
+ this.startTimeMap = startTimeMap;
+ this.endTimeMap = endTimeMap;
- this(type, 0, relativePath);
}
public void serialize(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(this.overflowChangeType.serialize(), outputStream);
ReadWriteIOUtils.write(this.baseDirIndex, outputStream);
- ReadWriteIOUtils.writeIsNull(this.relativePath, outputStream);
- if (this.relativePath != null) {
- ReadWriteIOUtils.write(this.relativePath, outputStream);
+ ReadWriteIOUtils.writeIsNull(this.file, outputStream);
+ if (this.file != null) {
+ ReadWriteIOUtils.write(getRelativePath(), outputStream);
}
ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
@@ -125,9 +153,11 @@ public class TsFileResource {
.deserialize(ReadWriteIOUtils.readShort(inputStream));
int baseDirIndex = ReadWriteIOUtils.readInt(inputStream);
boolean hasRelativePath = ReadWriteIOUtils.readIsNull(inputStream);
- String relativePath = null;
+
+ File file = null;
if (hasRelativePath) {
- relativePath = ReadWriteIOUtils.readString(inputStream);
+ String relativePath = ReadWriteIOUtils.readString(inputStream);
+ file = new File(Directories.getInstance().getTsFileFolder(baseDirIndex),
relativePath);
}
int size = ReadWriteIOUtils.readInt(inputStream);
Map<String, Long> startTimes = new HashMap<>();
@@ -149,12 +179,12 @@ public class TsFileResource {
String path = ReadWriteIOUtils.readString(inputStream);
mergeChanaged.add(path);
}
- TsFileResource tsFileResource = new TsFileResource(startTimes, endTimes,
overflowChangeType,
- baseDirIndex, relativePath);
+ TsFileResource tsFileResource = new TsFileResource(startTimes, endTimes,
overflowChangeType, file);
tsFileResource.mergeChanged = mergeChanaged;
return tsFileResource;
}
+
public void setStartTime(String deviceId, long startTime) {
startTimeMap.put(deviceId, startTime);
@@ -212,45 +242,26 @@ public class TsFileResource {
endTimeMap.remove(deviceId);
}
- public String getFilePath() {
- if (relativePath == null) {
- return relativePath;
- }
- return new File(Directories.getInstance().getTsFileFolder(baseDirIndex),
- relativePath).getPath();
+ public File getFile() {
+ return file;
}
+
public int getBaseDirIndex() {
return baseDirIndex;
}
- public void setBaseDirIndex(int baseDirIndex) {
- this.baseDirIndex = baseDirIndex;
- }
-
- public String getRelativePath() {
-
- return relativePath;
- }
-
- public void setRelativePath(String relativePath) {
-
- this.relativePath = relativePath;
- }
-
public boolean checkEmpty() {
return startTimeMap.isEmpty() && endTimeMap.isEmpty();
}
public void clear() {
-
startTimeMap.clear();
endTimeMap.clear();
mergeChanged.clear();
overflowChangeType = OverflowChangeType.NO_CHANGE;
- relativePath = null;
}
public void changeTypeToChanged(FileNodeProcessorStatus
fileNodeProcessorState) {
@@ -287,8 +298,12 @@ public class TsFileResource {
Map<String, Long> startTimeMapCopy = new HashMap<>(this.startTimeMap);
Map<String, Long> endTimeMapCopy = new HashMap<>(this.endTimeMap);
- return new TsFileResource(startTimeMapCopy, endTimeMapCopy,
overflowChangeType,
- baseDirIndex, relativePath);
+ return new TsFileResource(startTimeMapCopy,
+ endTimeMapCopy, overflowChangeType, file);
+ }
+
+ public Set<String> getDevices() {
+ return this.startTimeMap.keySet();
}
@Override
@@ -297,37 +312,31 @@ public class TsFileResource {
final int prime = 31;
int result = 1;
result = prime * result + ((endTimeMap == null) ? 0 :
endTimeMap.hashCode());
- result = prime * result + ((relativePath == null) ? 0 :
relativePath.hashCode());
+ result = prime * result + ((file == null) ? 0 : file.hashCode());
result = prime * result + ((overflowChangeType == null) ? 0 :
overflowChangeType.hashCode());
result = prime * result + ((startTimeMap == null) ? 0 :
startTimeMap.hashCode());
return result;
}
+
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
+ if (!(o instanceof TsFileResource)) {
return false;
}
- TsFileResource fileNode = (TsFileResource) o;
- return baseDirIndex == fileNode.baseDirIndex &&
- overflowChangeType == fileNode.overflowChangeType &&
- Objects.equals(relativePath, fileNode.relativePath) &&
- Objects.equals(startTimeMap, fileNode.startTimeMap) &&
- Objects.equals(endTimeMap, fileNode.endTimeMap) &&
- Objects.equals(mergeChanged, fileNode.mergeChanged);
+ TsFileResource that = (TsFileResource) o;
+ return baseDirIndex == that.baseDirIndex &&
+ overflowChangeType == that.overflowChangeType &&
+ Objects.equals(file, that.file) &&
+ Objects.equals(startTimeMap, that.startTimeMap) &&
+ Objects.equals(endTimeMap, that.endTimeMap) &&
+ Objects.equals(mergeChanged, that.mergeChanged) &&
+ Objects.equals(modFile, that.modFile);
}
- @Override
- public String toString() {
-
- return String.format(
- "TsFileResource [relativePath=%s,overflowChangeType=%s,
startTimeMap=%s,"
- + " endTimeMap=%s, mergeChanged=%s]",
- relativePath, overflowChangeType, startTimeMap, endTimeMap,
mergeChanged);
- }
public OverflowChangeType getOverflowChangeType() {
return overflowChangeType;
@@ -339,9 +348,7 @@ public class TsFileResource {
public synchronized ModificationFile getModFile() {
if (modFile == null) {
- modFile = new ModificationFile(
- Directories.getInstance().getTsFileFolder(baseDirIndex) +
File.separator
- + relativePath + ModificationFile.FILE_SUFFIX);
+ modFile = new ModificationFile(file.getAbsolutePath() +
ModificationFile.FILE_SUFFIX);
}
return modFile;
}
@@ -353,4 +360,31 @@ public class TsFileResource {
public void setModFile(ModificationFile modFile) {
this.modFile = modFile;
}
+
+ public void close() throws IOException {
+ modFile.close();
+ }
+
+ public String getRelativePath() {
+ if (file == null) {
+ return null;
+ }
+ return this.getFile().getParentFile().getName() + File.separator +
this.getFile().getName();
+ }
+
+ public void setFile(File file) throws IOException {
+ this.file = file;
+ if (file != null) {
+ this.baseDirIndex = Directories.getInstance()
+ .getTsFileFolderIndex(file.getParentFile().getParent());
+ if (this.modFile != null) {
+ this.modFile.close();
+ }
+ this.modFile = new ModificationFile(file.getAbsolutePath() +
ModificationFile.FILE_SUFFIX);
+ }
+ }
+
+ public String getFilePath() {
+ return this.getFile().getAbsolutePath();
+ }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 104aea8..097f7f7 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
public abstract class AbstractMemTable implements IMemTable {
@@ -73,6 +74,13 @@ public abstract class AbstractMemTable implements IMemTable {
}
@Override
+ public void write(String deviceId, String measurement, TSDataType dataType,
long insertTime,
+ Object value) {
+ IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId,
measurement, dataType);
+ memSeries.write(insertTime, value);
+ }
+
+ @Override
public int size() {
int sum = 0;
for (Map<String, IWritableMemChunk> seriesMap : memTableMap.values()) {
@@ -107,15 +115,19 @@ public abstract class AbstractMemTable implements
IMemTable {
}
@Override
- public void delete(String deviceId, String measurementId, long timestamp) {
+ public boolean delete(String deviceId, String measurementId, long timestamp)
{
Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
if (deviceMap != null) {
IWritableMemChunk chunk = deviceMap.get(measurementId);
+ //TODO: if the memtable is thread safe, then we do not need to copy data
again,
+ // otherwise current implementation is error.
IWritableMemChunk newChunk = filterChunk(chunk, timestamp);
if (newChunk != null) {
deviceMap.put(measurementId, newChunk);
+ return newChunk.count() != chunk.count();
}
}
+ return false;
}
/**
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 25547d0..53ea2bc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.memtable;
import java.util.Map;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
/**
* IMemTable is designed to store data points which are not flushed into
TsFile yet. An instance of
@@ -36,6 +37,9 @@ public interface IMemTable {
void write(String deviceId, String measurement, TSDataType dataType,
long insertTime, String insertValue);
+ void write(String deviceId, String measurement, TSDataType dataType,
+ long insertTime, Object value);
+
int size();
ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType
dataType,
@@ -55,8 +59,9 @@ public interface IMemTable {
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
* @param timestamp the upper-bound of deletion time.
+ * @return true if there is data that been deleted. otherwise false.
*/
- void delete(String deviceId, String measurementId, long timestamp);
+ boolean delete(String deviceId, String measurementId, long timestamp);
/**
* Make a copy of this MemTable.
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 05b0522..fbb1fa5 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -37,6 +37,8 @@ public interface IWritableMemChunk extends
TimeValuePairSorter {
void write(long insertTime, String insertValue);
+ void write(long insertTime, Object insertValue);
+
void reset();
int count();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
index 7692602..692687b 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
@@ -19,12 +19,15 @@
package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -84,8 +87,7 @@ public class MemTableFlushUtil {
* the function for flushing memtable.
*/
public static void flushMemTable(FileSchema fileSchema, TsFileIOWriter
tsFileIoWriter,
- IMemTable imemTable, long version)
- throws IOException {
+ IMemTable imemTable, long version) throws IOException {
for (String deviceId : imemTable.getMemTableMap().keySet()) {
long startPos = tsFileIoWriter.getPos();
tsFileIoWriter.startFlushChunkGroup(deviceId);
@@ -96,8 +98,8 @@ public class MemTableFlushUtil {
MeasurementSchema desc =
fileSchema.getMeasurementSchema(measurementId);
ChunkBuffer chunkBuffer = new ChunkBuffer(desc);
IChunkWriter seriesWriter = new ChunkWriterImpl(desc, chunkBuffer,
PAGE_SIZE_THRESHOLD);
- writeOneSeries(series.getSortedTimeValuePairList(), seriesWriter,
- desc.getType());
+ List<TimeValuePair> sortedTimeValuePairs =
series.getSortedTimeValuePairList();
+ writeOneSeries(sortedTimeValuePairs, seriesWriter, desc.getType());
seriesWriter.writeToFileWriter(tsFileIoWriter);
}
long memSize = tsFileIoWriter.getPos() - startPos;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index c351163..608c48d 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -67,6 +67,32 @@ public class WritableMemChunk implements IWritableMemChunk {
}
}
+ public void write(long insertTime, Object value) {
+ switch (dataType) {
+ case BOOLEAN:
+ putBoolean(insertTime, (Boolean)value);
+ break;
+ case INT32:
+ putInt(insertTime, (Integer)value);
+ break;
+ case INT64:
+ putLong(insertTime, (Long)value);
+ break;
+ case FLOAT:
+ putFloat(insertTime, (Float)value);
+ break;
+ case DOUBLE:
+ putDouble(insertTime, (Double)value);
+ break;
+ case TEXT:
+ putBinary(insertTime, (Binary)value);
+ break;
+ default:
+ throw new UnSupportedDataTypeException("Unsupported data type:" +
dataType);
+ }
+ }
+
+
@Override
public void putLong(long t, long v) {
list.putTimestamp(t, v);
@@ -101,7 +127,6 @@ public class WritableMemChunk implements IWritableMemChunk {
// TODO: Consider using arrays to sort and remove duplicates
public List<TimeValuePair> getSortedTimeValuePairList() {
int length = list.size();
-
Map<Long, TsPrimitiveType> map = new HashMap<>(length, 1.0f);
for (int i = 0; i < length; i++) {
map.put(list.getTimestamp(i), TsPrimitiveType.getByType(dataType,
list.getValue(i)));
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java
similarity index 60%
copy from
iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
copy to
iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java
index 05b0522..db86a78 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java
@@ -16,30 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.memtable;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
+package org.apache.iotdb.db.engine.overflow.io;
-public interface IWritableMemChunk extends TimeValuePairSorter {
+import java.io.File;
+import java.io.FileNotFoundException;
+import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
- void putLong(long t, long v);
+public class OverflowedTsFileIOWriter extends TsFileIOWriter {
- void putInt(long t, int v);
+ public OverflowedTsFileIOWriter(File file) throws FileNotFoundException {
+ super();
+ this.out = new DefaultTsFileOutput(file, true);
- void putFloat(long t, float v);
+ }
- void putDouble(long t, double v);
- void putBinary(long t, Binary v);
-
- void putBoolean(long t, boolean v);
-
- void write(long insertTime, String insertValue);
-
- void reset();
-
- int count();
-
- TSDataType getType();
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index a3bb2a4..b80c393 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -264,31 +264,31 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
}
@Override
- public int multiInsert(String deviceId, long insertTime, List<String>
measurementList,
- List<String> insertValues)
+ public int multiInsert(String deviceId, long insertTime, String[]
measurementList,
+ String[] insertValues)
throws ProcessorException {
try {
TSRecord tsRecord = new TSRecord(insertTime, deviceId);
MNode node = mManager.getNodeByDeviceIdFromCache(deviceId);
- for (int i = 0; i < measurementList.size(); i++) {
- if (!node.hasChild(measurementList.get(i))) {
+ for (int i = 0; i < measurementList.length; i++) {
+ if (!node.hasChild(measurementList[i])) {
throw new ProcessorException(
String.format("Current deviceId[%s] does not contains
measurement:%s",
- deviceId, measurementList.get(i)));
+ deviceId, measurementList[i]));
}
- MNode measurementNode = node.getChild(measurementList.get(i));
+ MNode measurementNode = node.getChild(measurementList[i]);
if (!measurementNode.isLeaf()) {
throw new ProcessorException(
String.format("Current Path is not leaf node. %s.%s", deviceId,
- measurementList.get(i)));
+ measurementList[i]));
}
TSDataType dataType = measurementNode.getSchema().getType();
- String value = insertValues.get(i);
+ String value = insertValues[i];
value = checkValue(dataType, value);
- DataPoint dataPoint = DataPoint.getDataPoint(dataType,
measurementList.get(i), value);
+ DataPoint dataPoint = DataPoint.getDataPoint(dataType,
measurementList[i], value);
tsRecord.addTuple(dataPoint);
}
return fileNodeManager.insert(tsRecord, false);
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 60f97f7..376997e 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -190,8 +190,8 @@ public abstract class QueryProcessExecutor {
* @param insertValues values to be inserted
* @return - Operate Type.
*/
- public abstract int multiInsert(String deviceId, long insertTime,
List<String> measurementList,
- List<String> insertValues) throws ProcessorException;
+ public abstract int multiInsert(String deviceId, long insertTime, String[]
measurementList,
+ String[] insertValues) throws ProcessorException;
public abstract List<String> getAllPaths(String originPath) throws
PathErrorException;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/InsertOperator.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/InsertOperator.java
index e0c6097..aa2ea87 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/InsertOperator.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/crud/InsertOperator.java
@@ -26,27 +26,27 @@ import java.util.List;
public class InsertOperator extends SFWOperator {
private long time;
- private List<String> measurementList;
- private List<String> valueList;
+ private String[] measurementList;
+ private String[] valueList;
public InsertOperator(int tokenIntType) {
super(tokenIntType);
operatorType = OperatorType.INSERT;
}
- public List<String> getMeasurementList() {
+ public String[] getMeasurementList() {
return measurementList;
}
- public void setMeasurementList(List<String> measurementList) {
+ public void setMeasurementList(String[] measurementList) {
this.measurementList = measurementList;
}
- public List<String> getValueList() {
+ public String[] getValueList() {
return valueList;
}
- public void setValueList(List<String> insertValue) {
+ public void setValueList(String[] insertValue) {
this.valueList = insertValue;
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 4991811..3e0c13a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.qp.physical.crud;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.iotdb.db.qp.logical.Operator;
@@ -28,16 +29,16 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class InsertPlan extends PhysicalPlan {
private String deviceId;
- private List<String> measurements;
- private List<String> values;
+ private String[] measurements;
+ private String[] values;
private long time;
// insertType
// 1 : BufferWrite Insert 2 : Overflow Insert
private int insertType;
- public InsertPlan(String deviceId, long insertTime, List<String>
measurementList,
- List<String> insertValues) {
+ public InsertPlan(String deviceId, long insertTime, String[] measurementList,
+ String[] insertValues) {
super(false, Operator.OperatorType.INSERT);
this.time = insertTime;
this.deviceId = deviceId;
@@ -45,8 +46,8 @@ public class InsertPlan extends PhysicalPlan {
this.values = insertValues;
}
- public InsertPlan(int insertType, String deviceId, long insertTime,
List<String> measurementList,
- List<String> insertValues) {
+ public InsertPlan(int insertType, String deviceId, long insertTime, String[]
measurementList,
+ String[] insertValues) {
super(false, Operator.OperatorType.INSERT);
this.insertType = insertType;
this.time = insertTime;
@@ -89,19 +90,19 @@ public class InsertPlan extends PhysicalPlan {
this.deviceId = deviceId;
}
- public List<String> getMeasurements() {
+ public String[] getMeasurements() {
return this.measurements;
}
- public void setMeasurements(List<String> measurements) {
+ public void setMeasurements(String[] measurements) {
this.measurements = measurements;
}
- public List<String> getValues() {
+ public String[] getValues() {
return this.values;
}
- public void setValues(List<String> values) {
+ public void setValues(String[] values) {
this.values = values;
}
@@ -115,8 +116,8 @@ public class InsertPlan extends PhysicalPlan {
}
InsertPlan that = (InsertPlan) o;
return time == that.time && Objects.equals(deviceId, that.deviceId)
- && Objects.equals(measurements, that.measurements)
- && Objects.equals(values, that.values);
+ && Arrays.equals(measurements, that.measurements)
+ && Arrays.equals(values, that.values);
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index c80ff1f..1efb56c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -445,15 +445,15 @@ public class LogicalGenerator {
"number of measurement is NOT EQUAL TO the number of values");
}
insertOp.setTime(timestamp);
- List<String> measurementList = new ArrayList<>();
+ String[] measurementList = new String[astNode.getChild(1).getChildCount()
- 1];
for (int i = 1; i < astNode.getChild(1).getChildCount(); i++) {
- measurementList.add(astNode.getChild(1).getChild(i).getText());
+ measurementList[i - 1] = astNode.getChild(1).getChild(i).getText();
}
insertOp.setMeasurementList(measurementList);
- List<String> valueList = new ArrayList<>();
+ String[] valueList = new String[astNode.getChild(2).getChildCount() - 1];
for (int i = 1; i < astNode.getChild(2).getChildCount(); i++) {
- valueList.add(astNode.getChild(2).getChild(i).getText());
+ valueList[i - 1] = astNode.getChild(2).getChild(i).getText();
}
insertOp.setValueList(valueList);
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index a535d87..e4ef67e 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.db.query.control;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -26,6 +28,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -88,8 +92,6 @@ public class QueryResourceManager {
private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>>
queryTokensMap;
private JobFileManager filePathsManager;
private AtomicLong maxJobId;
-
-
private QueryResourceManager() {
queryTokensMap = new ConcurrentHashMap<>();
filePathsManager = new JobFileManager();
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 2d07796..07073cd 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -161,5 +161,4 @@ public class EngineExecutorWithoutTimeGenerator {
throw new FileNodeManagerException(e);
}
}
-
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
index eaa32c4..400065e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
@@ -470,8 +470,9 @@ public class SyncServiceImpl implements SyncService.Iface {
String header = syncDataPath;
String relativePath = path.substring(header.length());
TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap,
- OverflowChangeType.NO_CHANGE,
- Directories.getInstance().getNextFolderIndexForTsFile(),
relativePath);
+ OverflowChangeType.NO_CHANGE, new File(
+ Directories.getInstance().getNextFolderIndexForTsFile() +
File.separator + relativePath)
+ );
// call interface of load external file
try {
if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode,
path)) {
@@ -556,7 +557,8 @@ public class SyncServiceImpl implements SyncService.Iface {
}
}
if (insertExecutor
- .multiInsert(deviceId, record.getTimestamp(), measurementList,
insertValues) <= 0) {
+ .multiInsert(deviceId, record.getTimestamp(),
measurementList.toArray(new String[]{}),
+ insertValues.toArray(new String[]{})) <= 0) {
throw new IOException("Inserting series data to IoTDB engine has
failed.");
}
}
@@ -612,77 +614,37 @@ public class SyncServiceImpl implements SyncService.Iface
{
/** secondly, use tsFile Reader to form SQL **/
ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
- ArrayList<Path> paths = new ArrayList<>();
+ List<Path> paths = new ArrayList<>();
/** compare data with one timeseries in a round to get valid data **/
for (String timeseries : timeseriesList) {
paths.clear();
paths.add(new Path(timeseries));
- Map<InsertPlan, String> originDataPoint = new HashMap<>();
- Map<InsertPlan, String> newDataPoint = new HashMap<>();
+ Set<InsertPlan> originDataPoints = new HashSet<>();
QueryExpression queryExpression = QueryExpression.create(paths,
null);
QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
- while (queryDataSet.hasNext()) {
- RowRecord record = queryDataSet.next();
- List<Field> fields = record.getFields();
- /** get all data with the timeseries in the sync file **/
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- List<String> measurementList = new ArrayList<>();
- if (!field.isNull()) {
- measurementList.add(paths.get(i).getMeasurement());
- InsertPlan insertPlan = new InsertPlan(deviceID,
record.getTimestamp(),
- measurementList, new ArrayList<>());
- newDataPoint.put(insertPlan,
- field.getDataType() == TSDataType.TEXT ?
String.format("'%s'", field.toString())
- : field.toString());
- }
- }
- }
+ Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet,
paths, deviceID);
+
/** get all data with the timeseries in all overlap files. **/
for (String overlapFile : overlapFiles) {
ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
QueryDataSet queryDataSetOverlap =
readTsFileOverlap.query(queryExpression);
- while (queryDataSetOverlap.hasNext()) {
- RowRecord recordOverlap = queryDataSetOverlap.next();
- List<Field> fields = recordOverlap.getFields();
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- List<String> measurementList = new ArrayList<>();
- if (!field.isNull()) {
- measurementList.add(paths.get(i).getMeasurement());
- InsertPlan insertPlan = new InsertPlan(deviceID,
recordOverlap.getTimestamp(),
- measurementList, new ArrayList<>());
- originDataPoint.put(insertPlan,
- field.getDataType() == TSDataType.TEXT ? String
- .format("'%s'", field.toString())
- : field.toString());
- }
- }
- }
+ originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap,
paths, deviceID));
}
/** If there has no overlap data with the timeseries, inserting all
data in the sync file **/
- if (originDataPoint.isEmpty()) {
- for (Map.Entry<InsertPlan, String> entry :
newDataPoint.entrySet()) {
- InsertPlan insertPlan = entry.getKey();
- List<String> insertValues = new ArrayList<>();
- insertValues.add(entry.getValue());
+ if (originDataPoints.isEmpty()) {
+ for (InsertPlan insertPlan : newDataPoints) {
if (insertExecutor.multiInsert(insertPlan.getDeviceId(),
insertPlan.getTime(),
- insertPlan.getMeasurements(), insertValues) <= 0) {
+ insertPlan.getMeasurements(), insertPlan.getValues()) <= 0) {
throw new IOException("Inserting series data to IoTDB engine
has failed.");
}
}
} else {
/** Compare every data to get valid data **/
- for (Map.Entry<InsertPlan, String> entry :
newDataPoint.entrySet()) {
- if (!originDataPoint.containsKey(entry.getKey())
- || (originDataPoint.containsKey(entry.getKey())
- &&
!originDataPoint.get(entry.getKey()).equals(entry.getValue()))) {
- InsertPlan insertPlan = entry.getKey();
- List<String> insertValues = new ArrayList<>();
- insertValues.add(entry.getValue());
+ for (InsertPlan insertPlan : newDataPoints) {
+ if (!originDataPoints.contains(insertPlan)) {
if (insertExecutor.multiInsert(insertPlan.getDeviceId(),
insertPlan.getTime(),
- insertPlan.getMeasurements(), insertValues) <= 0) {
+ insertPlan.getMeasurements(), insertPlan.getValues()) <=
0) {
throw new IOException("Inserting series data to IoTDB engine
has failed.");
}
}
@@ -705,6 +667,27 @@ public class SyncServiceImpl implements SyncService.Iface {
}
}
+ private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet,
List<Path> paths, String deviceID) throws IOException {
+ Set<InsertPlan> plans = new HashSet<>();
+ while (queryDataSet.hasNext()) {
+ RowRecord record = queryDataSet.next();
+ List<Field> fields = record.getFields();
+ /** get all data with the timeseries in the sync file **/
+ for (int i = 0; i < fields.size(); i++) {
+ Field field = fields.get(i);
+ String[] measurementList = new String[1];
+ if (!field.isNull()) {
+ measurementList[0] = paths.get(i).getMeasurement();
+ InsertPlan insertPlan = new InsertPlan(deviceID,
record.getTimestamp(),
+ measurementList, new String[]{field.getDataType() ==
TSDataType.TEXT ? String.format("'%s'", field.toString())
+ : field.toString()});
+ plans.add(insertPlan);
+ }
+ }
+ }
+ return plans;
+ }
+
/**
* Open all tsfile reader and cache
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 34fdbaa..65f3188 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
@@ -71,6 +72,29 @@ public class MemUtils {
}
/**
+ * @param value can be null if the type is not TEXT
+ */
+ public static long getPointSize(TSDataType type, String value) {
+ switch (type) {
+ case INT32:
+ return 8L + 4L;
+ case INT64:
+ return 8L + 8L;
+ case FLOAT:
+ return 8L + 4L;
+ case DOUBLE:
+ return 8L + 8L;
+ case BOOLEAN:
+ return 8L + 1L;
+ case TEXT:
+ return 8L + value.length() * 2;
+ default:
+ return 8L + 8L;
+ }
+ }
+
+
+ /**
* Calculate how much memory will be used if the given record is written to
Bufferwrite.
*/
public static long getTsRecordMemBufferwrite(TSRecord record) {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index 7fe68de..8bf9fb9 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -104,6 +104,7 @@ public class MultiFileLogNodeManager implements
WriteLogNodeManager, IService {
return InstanceHolder.instance;
}
+
@Override
public WriteLogNode getNode(String identifier, String restoreFilePath,
String processorStoreFilePath)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
index cf0339b..57b3e44 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
@@ -33,9 +33,9 @@ public interface WriteLogNodeManager {
* processorStoreFilePath are provided, if either restoreFilePath or
processorStoreFilePath is not
* provided and the LogNode does not exist, null is returned.
*
- * @param identifier -identifier
- * @param processorStoreFilePath -processor store file path
- * @param restoreFilePath -restore file path
+ * @param identifier -identifier, the format:
"{storageGroupName}-bufferwrite/overflow"
+ * @param restoreFilePath -restore file path of the data file. e.g,
data/settled/{storageGroupName}/{tsfileName}.restore
+ * @param processorStoreFilePath -processor store file path. e.g.,
data/system/info/{storageGroupName}/{storageGroupName}.restore
*/
WriteLogNode getNode(String identifier, String restoreFilePath, String
processorStoreFilePath)
throws IOException;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
index f64c862..6573884 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
@@ -65,15 +65,15 @@ public class ConcreteLogReplayer implements LogReplayer {
throws PathErrorException, FileNodeManagerException {
String deviceId = insertPlan.getDeviceId();
long insertTime = insertPlan.getTime();
- List<String> measurementList = insertPlan.getMeasurements();
- List<String> insertValues = insertPlan.getValues();
+ String[] measurementList = insertPlan.getMeasurements();
+ String[] insertValues = insertPlan.getValues();
TSRecord tsRecord = new TSRecord(insertTime, deviceId);
- for (int i = 0; i < measurementList.size(); i++) {
- String pathKey = deviceId + "." + measurementList.get(i);
+ for (int i = 0; i < measurementList.length; i++) {
+ String pathKey = deviceId + "." + measurementList[i];
TSDataType dataType = MManager.getInstance().getSeriesType(pathKey);
- String value = insertValues.get(i);
- DataPoint dataPoint = DataPoint.getDataPoint(dataType,
measurementList.get(i), value);
+ String value = insertValues[i];
+ DataPoint dataPoint = DataPoint.getDataPoint(dataType,
measurementList[i], value);
tsRecord.addTuple(dataPoint);
}
FileNodeManager.getInstance().insert(tsRecord, true);
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
index f70a41a..31b8527 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java
@@ -183,14 +183,14 @@ public class CodecInstances {
putString(buffer, plan.getDeviceId());
- List<String> measurementList = plan.getMeasurements();
- buffer.putInt(measurementList.size());
+ String[] measurementList = plan.getMeasurements();
+ buffer.putInt(measurementList.length);
for (String m : measurementList) {
putString(buffer, m);
}
- List<String> valueList = plan.getValues();
- buffer.putInt(valueList.size());
+ String[] valueList = plan.getValues();
+ buffer.putInt(valueList.length);
for (String m : valueList) {
putString(buffer, m);
}
@@ -209,18 +209,18 @@ public class CodecInstances {
String device = readString(buffer);
int mmListLength = buffer.getInt();
- List<String> measurementsList = new ArrayList<>(mmListLength);
+ String[] measurements = new String[mmListLength];
for (int i = 0; i < mmListLength; i++) {
- measurementsList.add(readString(buffer));
+ measurements[i] = readString(buffer);
}
int valueListLength = buffer.getInt();
- List<String> valuesList = new ArrayList<>(valueListLength);
+ String[] values = new String[valueListLength];
for (int i = 0; i < valueListLength; i++) {
- valuesList.add(readString(buffer));
+ values[i] = readString(buffer);
}
- InsertPlan ans = new InsertPlan(device, time, measurementsList,
valuesList);
+ InsertPlan ans = new InsertPlan(device, time, measurements, values);
ans.setInsertType(insertType);
return ans;
}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index ce01fb6..8cbf640 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -105,7 +105,6 @@ public class BufferWriteProcessorNewTest {
processorName, filename,
parameters, SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(processorName));
- assertEquals(filename, bufferwrite.getFileName());
assertEquals(processorName + File.separator + filename,
bufferwrite.getFileRelativePath());
assertTrue(bufferwrite.isNewProcessor());
bufferwrite.setNewProcessor(false);
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
index b66deea..a1c9d24 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/TsFileResourceTest.java
@@ -22,7 +22,9 @@ import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
@@ -36,12 +38,12 @@ public class TsFileResourceTest {
public static TsFileResource constructTsfileResource() {
TsFileResource tsFileResource;
- String relativePath = "relativePath";
+ String relativePath = "data/data/settled/b/relativePath";
Map<String, Long> startTimes = new HashMap<>();
Map<String, Long> endTimes = new HashMap<>();
- tsFileResource = new TsFileResource(OverflowChangeType.MERGING_CHANGE,
- relativePath);
+ tsFileResource = new TsFileResource(Collections.emptyMap(),
Collections.emptyMap(),
+ OverflowChangeType.MERGING_CHANGE, new File(relativePath));
for (int i = 0; i < 10; i++) {
startTimes.put("d" + i, (long) i);
}
@@ -77,7 +79,7 @@ public class TsFileResourceTest {
@Test
public void testSerdeializeCornerCase() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(0);
- tsFileResource.setRelativePath(null);
+ tsFileResource.setFile(null);
tsFileResource.serialize(outputStream);
ByteArrayInputStream inputStream = new
ByteArrayInputStream(outputStream.toByteArray());
TsFileResource deTsfileResource = TsFileResource.deSerialize(inputStream);
@@ -87,7 +89,7 @@ public class TsFileResourceTest {
public static void assertTsfileRecource(TsFileResource tsFileResource,
TsFileResource deTsfileResource) {
assertEquals(tsFileResource.getBaseDirIndex(),
deTsfileResource.getBaseDirIndex());
- assertEquals(tsFileResource.getRelativePath(),
deTsfileResource.getRelativePath());
+ assertEquals(tsFileResource.getFile(), deTsfileResource.getFile());
assertEquals(tsFileResource.getOverflowChangeType(),
deTsfileResource.getOverflowChangeType());
assertEquals(tsFileResource.getStartTimeMap(),
deTsfileResource.getStartTimeMap());
assertEquals(tsFileResource.getEndTimeMap(),
deTsfileResource.getEndTimeMap());
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
index 13eee7b..c7662bf 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
@@ -37,35 +37,29 @@ public class OverflowResourceTest {
private OverflowResource work;
private File insertFile;
- private File updateFile;
- private File positionFile;
private String insertFileName = "unseqTsFile";
- private String updateDeleteFileName = "overflowFile";
- private String positionFileName = "positionFile";
- private String filePath = "overflow";
+ private String folderPath = "overflow";
private String dataPath = "1";
- private OverflowMemtable support = new OverflowMemtable();
+ private OverflowMemtable memtable = new OverflowMemtable();
@Before
public void setUp() throws Exception {
- work = new OverflowResource(filePath, dataPath,
SysTimeVersionController.INSTANCE);
- insertFile = new File(new File(filePath, dataPath), insertFileName);
- updateFile = new File(new File(filePath, dataPath), updateDeleteFileName);
- positionFile = new File(new File(filePath, dataPath), positionFileName);
+ work = new OverflowResource(folderPath, dataPath,
SysTimeVersionController.INSTANCE);
+ insertFile = new File(new File(folderPath, dataPath), insertFileName);
}
@After
public void tearDown() throws Exception {
work.close();
- support.clear();
- EnvironmentUtils.cleanDir(filePath);
+ memtable.clear();
+ EnvironmentUtils.cleanDir(folderPath);
}
@Test
public void testOverflowInsert() throws IOException {
- OverflowTestUtils.produceInsertData(support);
+ OverflowTestUtils.produceInsertData(memtable);
QueryContext context = new QueryContext();
- work.flush(OverflowTestUtils.getFileSchema(), support.getMemTabale(),
"processorName");
+ work.flush(OverflowTestUtils.getFileSchema(), memtable.getMemTabale(),
"processorName");
List<ChunkMetaData> chunkMetaDatas =
work.getInsertMetadatas(OverflowTestUtils.deviceId1,
OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2,
context);
assertEquals(0, chunkMetaDatas.size());
@@ -85,7 +79,7 @@ public class OverflowResourceTest {
fileOutputStream.write(new byte[20]);
fileOutputStream.close();
assertEquals(originlength + 20, insertFile.length());
- work = new OverflowResource(filePath, dataPath,
SysTimeVersionController.INSTANCE);
+ work = new OverflowResource(folderPath, dataPath,
SysTimeVersionController.INSTANCE);
chunkMetaDatas = work
.getInsertMetadatas(OverflowTestUtils.deviceId1,
OverflowTestUtils.measurementId1,
OverflowTestUtils.dataType1, context);
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 1446cb1..de521c0 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -200,8 +200,8 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
}
@Override
- public int multiInsert(String deviceId, long insertTime, List<String>
measurementList,
- List<String> insertValues) {
+ public int multiInsert(String deviceId, long insertTime, String[]
measurementList,
+ String[] insertValues) {
return 0;
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 8619eba..485a33c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -78,8 +78,8 @@ public class WalCheckerTest {
List<byte[]> binaryPlans = new ArrayList<>();
String deviceId = "device1";
- List<String> measurements = Arrays.asList("s1", "s2", "s3");
- List<String> values = Arrays.asList("5", "6", "7");
+ String[] measurements = new String[]{"s1", "s2", "s3"};
+ String[] values = new String[]{"5", "6", "7"};
for (int j = 0; j < 10; j++) {
binaryPlans.add(PhysicalPlanLogTransfer
.operatorToLog(new InsertPlan(deviceId, j, measurements,
values)));
@@ -111,8 +111,8 @@ public class WalCheckerTest {
List<byte[]> binaryPlans = new ArrayList<>();
String deviceId = "device1";
- List<String> measurements = Arrays.asList("s1", "s2", "s3");
- List<String> values = Arrays.asList("5", "6", "7");
+ String[] measurements = new String[]{"s1", "s2", "s3"};
+ String[] values = new String[]{"5", "6", "7"};
for (int j = 0; j < 10; j++) {
binaryPlans.add(PhysicalPlanLogTransfer
.operatorToLog(new InsertPlan(deviceId, j, measurements,
values)));
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index cc101e9..eb0a117 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -94,8 +94,8 @@ public class PerformanceTest {
long time = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50,
@@ -156,7 +156,7 @@ public class PerformanceTest {
for (int i = 0; i < 1000000; i++) {
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"), Arrays.asList("1.0", "15",
"str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"}, new String[]{"1.0", "15", "str",
"false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -190,8 +190,8 @@ public class PerformanceTest {
byte[] bytes1 = null;
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
for (int i = 0; i < 20; i++) {
@@ -220,8 +220,8 @@ public class PerformanceTest {
String sql = "INSERT INTO root.logTestDevice(time,s1,s2,s3,s4) "
+ "VALUES (100,1.0,15,\"str\",false)";
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
long time = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
byte[] bytes = PhysicalPlanLogTransfer.operatorToLog(bwInsertPlan);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
index cb9e6c4..9a9617c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
@@ -84,8 +84,8 @@ public class RecoverTest {
try {
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -161,8 +161,8 @@ public class RecoverTest {
flagFile.createNewFile();
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -239,8 +239,8 @@ public class RecoverTest {
flagFile.createNewFile();
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 3a031ee..e03c418 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -75,8 +75,8 @@ public class WriteLogNodeManagerTest {
.getNode("root.managerTest", tempRestore.getPath(),
tempProcessorStore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -122,8 +122,8 @@ public class WriteLogNodeManagerTest {
.getNode(deviceName, tempRestore.getPath(),
tempProcessorStore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, deviceName, 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path(deviceName + ".s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path(deviceName + ".s1"));
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 6efb056..d125ebc 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -74,8 +75,8 @@ public class WriteLogNodeTest {
tempProcessorStore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -99,9 +100,9 @@ public class WriteLogNodeTest {
crc32.update(buffer, 0, logSize);
assertEquals(checksum, crc32.getValue());
InsertPlan bwInsertPlan2 = (InsertPlan)
PhysicalPlanLogTransfer.logToOperator(buffer);
- assertEquals(bwInsertPlan.getMeasurements(),
bwInsertPlan2.getMeasurements());
+ Assert.assertArrayEquals(bwInsertPlan.getMeasurements(),
bwInsertPlan2.getMeasurements());
assertEquals(bwInsertPlan.getTime(), bwInsertPlan2.getTime());
- assertEquals(bwInsertPlan.getValues(), bwInsertPlan2.getValues());
+ Assert.assertArrayEquals(bwInsertPlan.getValues(),
bwInsertPlan2.getValues());
assertEquals(bwInsertPlan.getPaths(), bwInsertPlan2.getPaths());
assertEquals(bwInsertPlan.getDeviceId(), bwInsertPlan2.getDeviceId());
@@ -148,8 +149,8 @@ public class WriteLogNodeTest {
tempProcessorStore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -194,8 +195,8 @@ public class WriteLogNodeTest {
tempProcessorStore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -230,8 +231,8 @@ public class WriteLogNodeTest {
tempProcessorStore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", "str", "false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new
Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new
Path("root.logTestDevice.s1"));
@@ -268,8 +269,8 @@ public class WriteLogNodeTest {
tempProcessorStore.getPath());
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice.oversize",
100,
- Arrays.asList("s1", "s2", "s3", "s4"),
- Arrays.asList("1.0", "15", new String(new char[4 * 1024 * 1024]),
"false"));
+ new String[]{"s1", "s2", "s3", "s4"},
+ new String[]{"1.0", "15", new String(new char[4 * 1024 * 1024]),
"false"});
boolean caught = false;
try {
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index d68478e..20d15fd 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -46,10 +46,10 @@ public class LogWriterReaderTest {
if (new File(filePath).exists()) {
new File(filePath).delete();
}
- InsertPlan insertPlan1 = new InsertPlan(1, "d1", 10L, Arrays.asList("s1",
"s2"),
- Arrays.asList("1", "2"));
- InsertPlan insertPlan2 = new InsertPlan(2, "d1", 10L, Arrays.asList("s1",
"s2"),
- Arrays.asList("1", "2"));
+ InsertPlan insertPlan1 = new InsertPlan(1, "d1", 10L, new String[]{"s1",
"s2"},
+ new String[]{"1", "2"});
+ InsertPlan insertPlan2 = new InsertPlan(2, "d1", 10L, new String[]{"s1",
"s2"},
+ new String[]{"1", "2"});
UpdatePlan updatePlan = new UpdatePlan(8L, 11L, "3", new
Path("root.d1.s1"));
DeletePlan deletePlan = new DeletePlan(10L, new Path("root.d1.s1"));
plans.add(insertPlan1);
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
index 58e8b1f..6a23f0d 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java
@@ -43,7 +43,7 @@ public class PhysicalPlanLogTransferTest {
private QueryProcessor processor = new QueryProcessor(new
MemIntQpExecutor());
private InsertPlan insertPlan = new InsertPlan(1, "device", 100,
- Arrays.asList("s1", "s2", "s3", "s4"), Arrays.asList("0.1", "100",
"test", "false"));
+ new String[]{"s1", "s2", "s3", "s4"}, new String[]{"0.1", "100", "test",
"false"});
private DeletePlan deletePlan = new DeletePlan(50, new
Path("root.vehicle.device"));
private UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.vehicle.device.sensor"));
diff --git
a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index da4e784..fb82e91 100644
---
a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++
b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -43,8 +43,8 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader;
public class TsFileSequenceRead {
public static void main(String[] args) throws IOException {
- TsFileSequenceReader reader = new TsFileSequenceReader("test.tsfile");
- System.out.println("file length: " + new File("test.tsfile").length());
+ TsFileSequenceReader reader = new TsFileSequenceReader(args[0]);
+ System.out.println("file length: " + new File(args[0]).length());
System.out.println("file magic head: " + reader.readHeadMagic());
System.out.println("file magic tail: " + reader.readTailMagic());
System.out.println("Level 1 metadata position: " +
reader.getFileMetadataPos());
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java
index a3a4236..89aba48 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFile.java
@@ -28,7 +28,7 @@ import
org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.executor.TsFileExecutor;
-public class ReadOnlyTsFile {
+public class ReadOnlyTsFile implements AutoCloseable {
private TsFileSequenceReader fileReader;
private MetadataQuerier metadataQuerier;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index deb2e4f..0f6c2ed 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -113,8 +113,13 @@ public class TsFileSequenceReader implements AutoCloseable{
public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize)
throws IOException {
this.tsFileInput = input;
- if (loadMetadataSize) { // NOTE no autoRepair here
- loadMetadataSize();
+ try {
+ if (loadMetadataSize) { // NOTE no autoRepair here
+ loadMetadataSize();
+ }
+ } catch (Throwable e) {
+ tsFileInput.close();
+ throw e;
}
}
@@ -458,7 +463,7 @@ public class TsFileSequenceReader implements AutoCloseable{
* Self Check the file and return the position before where the data is safe.
*
* @param newSchema @OUT. the measurement schema in the file will be added
into
- * this parameter.
+ * this parameter. (can be null)
* @param newMetaData @OUT can not be null, the chunk group metadta in the
file will be added into
* this parameter.
* @param fastFinish if true and the file is complete, then newSchema and
newMetaData parameter
@@ -600,6 +605,4 @@ public class TsFileSequenceReader implements AutoCloseable{
return truncatedPosition;
}
}
-
-
}
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
index 8220c2a..416f2bc 100644
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
@@ -49,8 +49,8 @@ import org.junit.Test;
public class TsFileReadWriteTest {
private final double delta = 0.0000001;
- private String path = "read_write_rle.tsfile";
- private File f;
+ String path = "read_write_rle.tsfile";
+ File f;
@Before
public void setUp() throws Exception {