This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_flush_empty_memtable in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9498f129f31b803c8d742491c64de194eb368d29 Author: qiaojialin <[email protected]> AuthorDate: Fri Mar 20 12:21:13 2020 +0800 avoid flushing empty memtable --- .../org/apache/iotdb/db/engine/StorageEngine.java | 9 ++--- .../iotdb/db/engine/memtable/AbstractMemTable.java | 13 +++--- .../apache/iotdb/db/engine/memtable/IMemTable.java | 6 ++- .../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++++-------- .../db/engine/storagegroup/TsFileProcessor.java | 46 ++++++++-------------- .../iotdb/db/exception/WriteProcessException.java | 4 ++ .../db/exception/query/OutOfTTLException.java | 3 +- .../org/apache/iotdb/db/monitor/StatMonitor.java | 2 +- .../iotdb/db/writelog/recover/LogReplayer.java | 8 ++-- .../db/engine/cache/DeviceMetaDataCacheTest.java | 6 +-- .../storagegroup/FileNodeManagerBenchmark.java | 3 +- .../storagegroup/StorageGroupProcessorTest.java | 9 ++--- .../iotdb/db/engine/storagegroup/TTLTest.java | 11 +++--- .../engine/storagegroup/TsFileProcessorTest.java | 10 ++--- .../apache/iotdb/db/integration/IoTDBTtlIT.java | 2 +- .../iotdb/db/query/reader/ReaderTestHelper.java | 7 +--- 16 files changed, 83 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 6bd9b14..1c0a4df 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -253,13 +253,12 @@ public class StorageEngine implements IService { * * @param insertPlan physical plan of insertion */ - public void insert(InsertPlan insertPlan) - throws StorageEngineException, QueryProcessException { + public void insert(InsertPlan insertPlan) throws StorageEngineException { StorageGroupProcessor storageGroupProcessor; try { storageGroupProcessor = getProcessor(insertPlan.getDeviceId()); - } catch (StorageEngineException e) { + } catch (Exception e) { logger.warn("get StorageGroupProcessor of device {} failed, because {}", insertPlan.getDeviceId(), e.getMessage(), e); throw new StorageEngineException(e); @@ -268,8 +267,8 @@ public class StorageEngine implements IService { // TODO monitor: update statistics try { storageGroupProcessor.insert(insertPlan); - } catch (QueryProcessException e) { - throw new QueryProcessException(e); + } catch (WriteProcessException e) { + throw new StorageEngineException(e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 3bd46dd..06366c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -27,8 +27,8 @@ import java.util.Map.Entry; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.rescon.TVListAllocator; @@ -37,7 +37,6 @@ import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.utils.Binary; public abstract class AbstractMemTable implements IMemTable { @@ -86,7 +85,7 @@ public abstract class AbstractMemTable implements IMemTable { protected abstract IWritableMemChunk genMemSeries(TSDataType dataType); @Override - public void insert(InsertPlan insertPlan) throws QueryProcessException { + public void insert(InsertPlan insertPlan) throws WriteProcessException { try { for (int i = 0; i < insertPlan.getValues().length; i++) { @@ -96,20 +95,20 @@ public abstract class AbstractMemTable implements IMemTable { } long recordSizeInByte = MemUtils.getRecordSize(insertPlan); memSize += recordSizeInByte; - } catch (RuntimeException e) { - throw new QueryProcessException(e.getMessage()); + } catch (Exception e) { + throw new WriteProcessException(e.getMessage()); } } @Override public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end) - throws QueryProcessException { + throws WriteProcessException { try { write(batchInsertPlan, start, end); long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan, start, end); memSize += recordSizeInByte; } catch (RuntimeException e) { - throw new QueryProcessException(e.getMessage()); + throw new WriteProcessException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 1ffd5f0..ba213f0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.Map; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@ -54,13 +56,13 @@ public interface IMemTable { */ long memSize(); - void insert(InsertPlan insertPlan) throws QueryProcessException; + void insert(InsertPlan insertPlan) throws WriteProcessException; /** * [start, end) */ void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end) - throws QueryProcessException; + throws WriteProcessException; ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType, TSEncoding encoding, Map<String, String> props, long timeLowerBound) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 6642ae1..b1dd172 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -466,7 +466,7 @@ public class StorageGroupProcessor { } } - public void insert(InsertPlan insertPlan) throws QueryProcessException { + public void insert(InsertPlan insertPlan) throws WriteProcessException { // reject insertions that are out of ttl if (!checkTTL(insertPlan.getTime())) { throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL)); @@ -475,6 +475,7 @@ public class StorageGroupProcessor { try { // init map long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime()); + latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()) .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE); partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()) @@ -484,6 +485,7 @@ public class StorageGroupProcessor { insertToTsFileProcessor(insertPlan, insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId) .get(insertPlan.getDeviceId())); + } finally { writeUnlock(); } @@ -584,7 +586,7 @@ public class StorageGroupProcessor { private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan, int start, int end, boolean sequence, TSStatus[] results, long timePartitionId) throws WriteProcessException { - // return when start <= end + // return when start >= end if (start >= end) { return; } @@ -598,7 +600,12 @@ public class StorageGroupProcessor { return; } - tsFileProcessor.insertBatch(batchInsertPlan, start, end, results); + try { + tsFileProcessor.insertBatch(batchInsertPlan, start, end, results); + } catch (WriteProcessException e) { + logger.error("insert to TsFileProcessor error ", e); + return; + } latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>()) .putIfAbsent(batchInsertPlan.getDeviceId(), Long.MIN_VALUE); @@ -640,9 +647,8 @@ public class StorageGroupProcessor { } private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence) - throws QueryProcessException { + throws WriteProcessException { TsFileProcessor tsFileProcessor; - boolean result; long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime()); tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); @@ -652,20 +658,18 @@ public class StorageGroupProcessor { } // insert TsFileProcessor - result = tsFileProcessor.insert(insertPlan); + tsFileProcessor.insert(insertPlan); // try to update the latest time of the device of this tsRecord - if (result - && latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan + if (latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan .getTime()) { latestTimeForEachDevice.get(timePartitionId) .put(insertPlan.getDeviceId(), insertPlan.getTime()); } - long globalLatestFlushTime = - globalLatestFlushedTimeForEachDevice.computeIfAbsent( + long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.computeIfAbsent( insertPlan.getDeviceId(), k -> Long.MIN_VALUE); tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime); - if (result && globalLatestFlushTime < insertPlan.getTime()) { + if (globalLatestFlushTime < insertPlan.getTime()) { globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime()); } @@ -676,7 +680,7 @@ public class StorageGroupProcessor { } public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime) - throws QueryProcessException { + throws WriteProcessException { try { MNode node = MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId()); @@ -687,8 +691,8 @@ public class StorageGroupProcessor { ((LeafMNode) measurementNode) .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime); } - } catch (MetadataException e) { - throw new QueryProcessException(e); + } catch (MetadataException | QueryProcessException e) { + throw new WriteProcessException(e); } } @@ -1286,9 +1290,9 @@ public class StorageGroupProcessor { .get(processor.getTimeRangeId()); if (curPartitionDeviceLatestTime == null) { - logger.error("Partition: " + processor.getTimeRangeId() + - " does't have latest time for each device record. Flushing tsfile is: " - + processor.getTsFileResource().getFile()); + logger.warn("Partition: " + processor.getTimeRangeId() + + " does't have latest time for each device. No valid record is written into memtable." + + "Flushing tsfile is: " + processor.getTsFileResource().getFile()); return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index a770882..8bb369b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -153,9 +153,8 @@ public class TsFileProcessor { * insert data in an InsertPlan into the workingMemtable. * * @param insertPlan physical plan of insertion - * @return succeed or fail */ - public boolean insert(InsertPlan insertPlan) throws QueryProcessException { + public void insert(InsertPlan insertPlan) throws WriteProcessException { if (workMemTable == null) { workMemTable = MemTablePool.getInstance().getAvailableMemTable(this); @@ -167,10 +166,10 @@ public class TsFileProcessor { if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { try { getLogNode().write(insertPlan); - } catch (IOException e) { + } catch (Exception e) { logger.error("{}: {} write WAL failed", storageGroupName, tsFileResource.getFile().getName(), e); - return false; + throw new WriteProcessException(e); } } @@ -181,8 +180,6 @@ public class TsFileProcessor { if (!sequence) { tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime()); } - - return true; } public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end, @@ -195,24 +192,20 @@ public class TsFileProcessor { // insert insertPlan to the work memtable try { workMemTable.insertBatch(batchInsertPlan, start, end); - for (int i = start; i < end; i++) { - results[i] = RpcUtils.SUCCESS_STATUS; - } - } catch (Exception e) { - setErrorResult(start, end, results, e); - return; - } - - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - try { + if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { batchInsertPlan.setStart(start); batchInsertPlan.setEnd(end); getLogNode().write(batchInsertPlan); - } catch (IOException e) { - logger.error("{}: {} write WAL failed", storageGroupName, - tsFileResource.getFile().getName(), e); - setErrorResult(start, end, results, e); } + } catch (Exception e) { + for (int i = start; i < end; i++) { + results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } + throw new WriteProcessException(e); + } + + for (int i = start; i < end; i++) { + results[i] = RpcUtils.SUCCESS_STATUS; } tsFileResource @@ -226,14 +219,6 @@ public class TsFileProcessor { } } - private void setErrorResult(int start, int end, TSStatus[] results, Exception e) - throws WriteProcessException { - for (int i = start; i < end; i++) { - results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); - } - throw new WriteProcessException(e); - } - /** * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which * <= 'timestamp' in the deletion. <br/> @@ -449,9 +434,10 @@ public class TsFileProcessor { * flushManager again. */ private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException { - if(!updateLatestFlushTimeCallback.call(this)){ - logger.error("{}: {} Memetable info: {}", storageGroupName, + if(!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0){ + logger.warn("This memtable is empty, skip it in flush. {}: {} Memetable info: {}", storageGroupName, tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap()); + return; } flushingMemTables.addLast(tobeFlushed); if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java index 9ae88ef..dde7735 100644 --- a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java @@ -29,6 +29,10 @@ public class WriteProcessException extends IoTDBException { super(message, TSStatusCode.STORAGE_GROUP_ERROR.getStatusCode()); } + public WriteProcessException(String message, int errorCode) { + super(message, errorCode); + } + public WriteProcessException(Exception exception) { super(exception, TSStatusCode.STORAGE_GROUP_PROCESSOR_ERROR.getStatusCode()); } diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java index 6365b21..746fc69 100644 --- a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java @@ -21,9 +21,10 @@ package org.apache.iotdb.db.exception.query; import java.util.Date; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.rpc.TSStatusCode; -public class OutOfTTLException extends QueryProcessException { +public class OutOfTTLException extends WriteProcessException { private static final long serialVersionUID = -1197147887094603300L; diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java index 70cd44d..82bc7b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java +++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java @@ -384,7 +384,7 @@ public class StatMonitor implements IService { numInsert.incrementAndGet(); pointNum = entry.getValue().dataPointList.size(); numPointsInsert.addAndGet(pointNum); - } catch (StorageEngineException | QueryProcessException e) { + } catch (StorageEngineException e) { numInsertError.incrementAndGet(); logger.error("Inserting stat points error.", e); } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java index 3d3b977..dd4362d 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java @@ -28,7 +28,7 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; @@ -105,7 +105,7 @@ public class LogReplayer { } } catch (IOException e) { throw new StorageGroupProcessorException("Cannot replay logs" + e.getMessage()); - } catch (QueryProcessException e) { + } catch (WriteProcessException e) { throw new StorageGroupProcessorException( "Cannot replay logs for query processor exception" + e.getMessage()); } finally { @@ -129,7 +129,7 @@ public class LogReplayer { } } - private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws QueryProcessException { + private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws WriteProcessException { if (currentTsFileResource != null) { // the last chunk group may contain the same data with the logs, ignore such logs in seq file Long lastEndTime = currentTsFileResource.getEndTimeMap().get(batchInsertPlan.getDeviceId()); @@ -150,7 +150,7 @@ public class LogReplayer { recoverMemTable.insertBatch(batchInsertPlan, 0, batchInsertPlan.getRowCount()); } - private void replayInsert(InsertPlan insertPlan) throws QueryProcessException { + private void replayInsert(InsertPlan insertPlan) { if (currentTsFileResource != null) { // the last chunk group may contain the same data with the logs, ignore such logs in seq file Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java index a46be2a..1c629d5 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java @@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; @@ -82,7 +82,7 @@ public class DeviceMetaDataCacheTest { EnvironmentUtils.cleanDir(systemDir); } - private void insertOneRecord(long time, int num) throws QueryProcessException { + private void insertOneRecord(long time, int num) throws WriteProcessException { TSRecord record = new TSRecord(time, storageGroup); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId0, String.valueOf(num))); record.addTuple(DataPoint.getDataPoint(TSDataType.INT64, measurementId1, String.valueOf(num))); @@ -92,7 +92,7 @@ public class DeviceMetaDataCacheTest { storageGroupProcessor.insert(new InsertPlan(record)); } - protected void insertData() throws IOException, QueryProcessException { + protected void insertData() throws IOException, WriteProcessException { for (int j = 1; j <= 100; j++) { insertOneRecord(j, j); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java index 7a773a7..99199ac 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -117,7 +116,7 @@ public class FileNodeManagerBenchmark { TSRecord tsRecord = getRecord(deltaObject, time); StorageEngine.getInstance().insert(new InsertPlan(tsRecord)); } - } catch (QueryProcessException | StorageEngineException e) { + } catch (StorageEngineException e) { e.printStackTrace(); } finally { latch.countDown(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 0abeb56..cfb777c 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; @@ -85,7 +84,7 @@ public class StorageGroupProcessorTest { @Test - public void testUnseqUnsealedDelete() throws QueryProcessException, IOException { + public void testUnseqUnsealedDelete() throws WriteProcessException, IOException { TSRecord record = new TSRecord(10000, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); processor.insert(new InsertPlan(record)); @@ -132,7 +131,7 @@ public class StorageGroupProcessorTest { } @Test - public void testSequenceSyncClose() throws QueryProcessException { + public void testSequenceSyncClose() throws WriteProcessException { for (int j = 1; j <= 10; j++) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); @@ -208,7 +207,7 @@ public class StorageGroupProcessorTest { @Test - public void testSeqAndUnSeqSyncClose() throws QueryProcessException { + public void testSeqAndUnSeqSyncClose() throws WriteProcessException { for (int j = 21; j <= 30; j++) { TSRecord record = new TSRecord(j, deviceId); @@ -240,7 +239,7 @@ public class StorageGroupProcessorTest { } @Test - public void testMerge() throws QueryProcessException { + public void testMerge() throws WriteProcessException { mergeLock = new AtomicLong(0); for (int j = 21; j <= 30; j++) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index 24ad39f..0675ab4 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -121,7 +122,7 @@ public class TTLTest { } @Test - public void testTTLWrite() throws QueryProcessException { + public void testTTLWrite() throws WriteProcessException { InsertPlan insertPlan = new InsertPlan(); insertPlan.setDeviceId(sg1); insertPlan.setTime(System.currentTimeMillis()); @@ -146,7 +147,7 @@ public class TTLTest { storageGroupProcessor.insert(insertPlan); } - private void prepareData() throws QueryProcessException { + private void prepareData() throws WriteProcessException { InsertPlan insertPlan = new InsertPlan(); insertPlan.setDeviceId(sg1); insertPlan.setTime(System.currentTimeMillis()); @@ -174,7 +175,7 @@ public class TTLTest { } @Test - public void testTTLRead() throws IOException, QueryProcessException, StorageEngineException { + public void testTTLRead() throws IOException, WriteProcessException, StorageEngineException { prepareData(); // files before ttl @@ -222,7 +223,7 @@ public class TTLTest { } @Test - public void testTTLRemoval() throws StorageEngineException, QueryProcessException { + public void testTTLRemoval() throws StorageEngineException, WriteProcessException { prepareData(); storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); @@ -333,7 +334,7 @@ public class TTLTest { } @Test - public void testTTLCleanFile() throws QueryProcessException { + public void testTTLCleanFile() throws WriteProcessException { prepareData(); storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 1c926d9..34559eb 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -34,8 +34,8 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.TsFileProcessorException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -84,7 +84,7 @@ public class TsFileProcessorTest { } @Test - public void testWriteAndFlush() throws IOException, QueryProcessException, MetadataException { + public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndFlush begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, @@ -133,7 +133,7 @@ public class TsFileProcessorTest { @Test public void testWriteAndRestoreMetadata() - throws IOException, QueryProcessException, MetadataException { + throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, @@ -204,7 +204,7 @@ public class TsFileProcessorTest { @Test - public void testMultiFlush() throws IOException, QueryProcessException, MetadataException { + public void testMultiFlush() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, @@ -239,7 +239,7 @@ public class TsFileProcessorTest { @Test - public void testWriteAndClose() throws IOException, QueryProcessException, MetadataException { + public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java index 4fd5f13..5ac6598 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java @@ -104,7 +104,7 @@ public class IoTDBTtlIT { boolean caught = false; try { statement.execute(String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", - now - 50000 + i, i)); + now - 500000 + i, i)); } catch (SQLException e) { if (TSStatusCode.OUT_OF_TTL_ERROR.getStatusCode() == e.getErrorCode()) { caught = true; diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java index 44245d7..dbc7ac3 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.MetadataManagerHelper; import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@ -68,10 +69,4 @@ public abstract class ReaderTestHelper { abstract protected void insertData() throws IOException, QueryProcessException; - protected void insertOneRecord(long time, int num) throws QueryProcessException { - TSRecord record = new TSRecord(time, deviceId); - record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(num))); - storageGroupProcessor.insert(new InsertPlan(record)); - } - } \ No newline at end of file
