This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch refactor_overflow in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 57cb4a8f684bdfb6e469b31b262cdbe28bad44e0 Author: 江天 <[email protected]> AuthorDate: Wed May 29 08:56:25 2019 +0800 rename querycontext to datasource remove unused classes --- .../org/apache/iotdb/db/engine/DatabaseEngine.java | 3 +- .../engine/cache/RowGroupBlockMetaDataCache.java | 167 ----- .../iotdb/db/engine/cache/TsFileMetaDataCache.java | 98 --- .../iotdb/db/engine/cache/TsFileMetadataUtils.java | 84 --- .../QueryDataSource.java | 2 +- .../ReadOnlyMemChunk.java | 8 +- .../SeriesDataSource.java | 14 +- .../UnsealedTsFile.java | 2 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 3 +- .../apache/iotdb/db/engine/memtable/IMemTable.java | 3 +- .../db/engine/memtable/MemSeriesLazyMerger.java | 2 +- .../iotdb/db/engine/overflow/io/OverflowIO.java | 173 ----- .../db/engine/overflow/io/OverflowMemtable.java | 116 ---- .../db/engine/overflow/io/OverflowProcessor.java | 735 --------------------- .../db/engine/overflow/io/OverflowResource.java | 329 --------- .../overflow/io/OverflowedTsFileIOWriter.java | 36 - .../engine/overflow/metadata/OFFileMetadata.java | 107 --- .../overflow/metadata/OFRowGroupListMetadata.java | 109 --- .../overflow/metadata/OFSeriesListMetadata.java | 107 --- .../db/engine/overflow/utils/MergeStatus.java | 29 - .../db/engine/overflow/utils/OverflowOpType.java | 29 - .../apache/iotdb/db/engine/pool/FlushManager.java | 56 +- .../apache/iotdb/db/engine/pool/MergeManager.java | 52 +- .../engine/querycontext/MergeSeriesDataSource.java | 33 - .../db/engine/querycontext/OverflowInsertFile.java | 51 -- .../querycontext/OverflowSeriesDataSource.java | 78 --- .../querycontext/OverflowUpdateDeleteFile.java | 42 -- .../db/engine/sgmanager/StorageGroupManager.java | 2 +- .../db/engine/sgmanager/StorageGroupProcessor.java | 4 +- .../db/engine/tsfiledata/TsFileProcessor.java | 6 +- .../iotdb/db/query/control/JobFileManager.java | 4 +- .../db/query/control/QueryResourceManager.java | 8 +- .../db/query/executor/AggregateEngineExecutor.java | 2 +- .../EngineExecutorWithoutTimeGenerator.java | 3 +- .../db/query/executor/FillEngineExecutor.java | 2 +- .../GroupByWithOnlyTimeFilterDataSetDataSet.java | 2 +- .../db/query/factory/SeriesReaderFactory.java | 7 +- .../java/org/apache/iotdb/db/query/fill/IFill.java | 2 +- .../org/apache/iotdb/db/query/fill/LinearFill.java | 2 +- .../apache/iotdb/db/query/fill/PreviousFill.java | 2 +- .../iotdb/db/query/reader/mem/MemChunkReader.java | 2 +- .../query/reader/sequence/SequenceDataReader.java | 2 +- .../sequence/SequenceDataReaderByTimestamp.java | 2 +- .../reader/sequence/UnSealedTsFileReader.java | 2 +- .../sequence/UnSealedTsFilesReaderByTimestamp.java | 2 +- .../query/timegenerator/EngineNodeConstructor.java | 2 +- .../bufferwrite/BufferWriteProcessorNewTest.java | 2 +- .../bufferwrite/BufferWriteProcessorTest.java | 2 +- .../engine/modification/DeletionFileNodeTest.java | 2 +- .../engine/overflow/io/OverflowProcessorTest.java | 4 +- 50 files changed, 46 insertions(+), 2490 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java index 3e4a979..abe90ec 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/DatabaseEngine.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.iotdb.db.engine.sgmanager.TsFileResource; import org.apache.iotdb.db.engine.memcontrol.BasicMemController; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; @@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; -import org.apache.iotdb.tsfile.write.record.TSRecord; /** * DatabaseEngine is an abstraction of IoTDB storage-level interfaces. diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/RowGroupBlockMetaDataCache.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/RowGroupBlockMetaDataCache.java deleted file mode 100644 index 044fb5d..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/RowGroupBlockMetaDataCache.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.cache; - -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; -import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is used to cache <code>RowGroupBlockMetaDataCache</code> of tsfile in IoTDB. - */ -public class RowGroupBlockMetaDataCache { - - private static final Logger LOGGER = LoggerFactory.getLogger(RowGroupBlockMetaDataCache.class); - - private static final int CACHE_SIZE = 100; - /** - * key: the file path + deviceId. - */ - private LinkedHashMap<String, TsDeviceMetadata> lruCache; - - private AtomicLong cacheHintNum = new AtomicLong(); - private AtomicLong cacheRequestNum = new AtomicLong(); - - private RowGroupBlockMetaDataCache(int cacheSize) { - lruCache = new LruLinkedHashMap(cacheSize, true); - } - - public static RowGroupBlockMetaDataCache getInstance() { - return RowGroupBlockMetaDataCacheSingleton.INSTANCE; - } - - /** - * get {@link TsDeviceMetadata}. THREAD SAFE. - */ - public TsDeviceMetadata get(String filePath, String deviceId, TsFileMetaData fileMetaData) - throws IOException { - // The key(the tsfile path and deviceId) for the lruCache - - String jointPath = filePath + deviceId; - Object jointPathObject = jointPath.intern(); - synchronized (lruCache) { - cacheRequestNum.incrementAndGet(); - if (lruCache.containsKey(jointPath)) { - cacheHintNum.incrementAndGet(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Cache hint: the number of requests for cache is {}, " - + "the number of hints for cache is {}", - cacheRequestNum.get(), cacheHintNum.get()); - } - return lruCache.get(jointPath); - } - } - synchronized (jointPathObject) { - synchronized (lruCache) { - if (lruCache.containsKey(jointPath)) { - return lruCache.get(jointPath); - } - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Cache didn't hint: the number of requests for cache is {}", - cacheRequestNum.get()); - } - TsDeviceMetadata blockMetaData = TsFileMetadataUtils - .getTsRowGroupBlockMetaData(filePath, deviceId, - fileMetaData); - synchronized (lruCache) { - lruCache.put(jointPath, blockMetaData); - return lruCache.get(jointPath); - } - } - } - - /** - * clear LRUCache. - */ - public void clear() { - synchronized (lruCache) { - lruCache.clear(); - } - } - - /** - * the default LRU cache size is 100. The singleton pattern. - */ - private static class RowGroupBlockMetaDataCacheSingleton { - - private static final RowGroupBlockMetaDataCache INSTANCE = new - RowGroupBlockMetaDataCache(CACHE_SIZE); - } - - /** - * This class is a map used to cache the <code>RowGroupBlockMetaData</code>. The caching strategy - * is LRU. - * - */ - private class LruLinkedHashMap extends LinkedHashMap<String, TsDeviceMetadata> { - - private static final long serialVersionUID = 1290160928914532649L; - private static final float LOAD_FACTOR_MAP = 0.75f; - private int maxCapacity; - - public LruLinkedHashMap(int maxCapacity, boolean isLru) { - super(maxCapacity, LOAD_FACTOR_MAP, isLru); - this.maxCapacity = maxCapacity; - } - - @Override - protected boolean removeEldestEntry(Map.Entry<String, TsDeviceMetadata> eldest) { - return size() > maxCapacity; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - return super.equals(o); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - RowGroupBlockMetaDataCache that = (RowGroupBlockMetaDataCache) o; - return Objects.equals(lruCache, that.lruCache) && - Objects.equals(cacheHintNum, that.cacheHintNum) && - Objects.equals(cacheRequestNum, that.cacheRequestNum); - } - - @Override - public int hashCode() { - return Objects.hash(lruCache, cacheHintNum, cacheRequestNum); - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java deleted file mode 100644 index 3cfe180..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.cache; - -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is used to cache <code>TsFileMetaData</code> of tsfile in IoTDB. - */ -public class TsFileMetaDataCache { - - private static final Logger LOGGER = LoggerFactory.getLogger(TsFileMetaDataCache.class); - /** - * key: The file seriesPath of tsfile. - */ - private ConcurrentHashMap<String, TsFileMetaData> cache; - private AtomicLong cacheHintNum = new AtomicLong(); - private AtomicLong cacheRequestNum = new AtomicLong(); - - private TsFileMetaDataCache() { - cache = new ConcurrentHashMap<>(); - } - - public static TsFileMetaDataCache getInstance() { - return TsFileMetaDataCacheHolder.INSTANCE; - } - - /** - * get the TsFileMetaData for the given path. - * - * @param path -given path - */ - public TsFileMetaData get(String path) throws IOException { - - Object internPath = path.intern(); - synchronized (internPath) { - cacheRequestNum.incrementAndGet(); - if (!cache.containsKey(path)) { - // read value from tsfile - TsFileMetaData fileMetaData = TsFileMetadataUtils.getTsFileMetaData(path); - cache.put(path, fileMetaData); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Cache didn't hint: the number of requests for cache is {}", - cacheRequestNum.get()); - } - return cache.get(path); - } else { - cacheHintNum.incrementAndGet(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Cache hint: the number of requests for cache is {}, the number of hints for cache " - + "is {}", - cacheRequestNum.get(), cacheHintNum.get()); - } - return cache.get(path); - } - } - } - - public void remove(String path) { - cache.remove(path); - } - - public void clear() { - cache.clear(); - } - - /* - * Singleton pattern - */ - private static class TsFileMetaDataCacheHolder { - - private TsFileMetaDataCacheHolder() {} - - private static final TsFileMetaDataCache INSTANCE = new TsFileMetaDataCache(); - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java deleted file mode 100644 index 4cd1602..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.cache; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; -import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; -import org.apache.iotdb.tsfile.read.TsFileSequenceReader; - -/** - * This class is used to read metadata(<code>TsFileMetaData</code> and - * <code>TsRowGroupBlockMetaData</code>). - */ -public class TsFileMetadataUtils { - - private TsFileMetadataUtils(){ - - } - - /** - * get tsfile meta data. - * - * @param filePath -given path - * @return -meta data - */ - public static TsFileMetaData getTsFileMetaData(String filePath) throws IOException { - TsFileSequenceReader reader = null; - try { - reader = new TsFileSequenceReader(filePath); - return reader.readFileMetadata(); - } finally { - if (reader != null) { - reader.close(); - } - } - } - - /** - * get row group block meta data. - * - * @param filePath -file path - * @param deviceId -device id - * @param fileMetaData -ts file meta data - * @return -device meta data - */ - public static TsDeviceMetadata getTsRowGroupBlockMetaData(String filePath, String deviceId, - TsFileMetaData fileMetaData) throws IOException { - if (!fileMetaData.getDeviceMap().containsKey(deviceId)) { - return null; - } else { - TsFileSequenceReader reader = null; - try { - reader = new TsFileSequenceReader(filePath); - long offset = fileMetaData.getDeviceMap().get(deviceId).getOffset(); - int size = fileMetaData.getDeviceMap().get(deviceId).getLen(); - ByteBuffer data = ByteBuffer.allocate(size); - reader.readRaw(offset, size, data); - data.flip(); - return TsDeviceMetadata.deserializeFrom(data); - } finally { - if (reader != null) { - reader.close(); - } - } - } - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/QueryDataSource.java similarity index 96% rename from iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java rename to iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/QueryDataSource.java index 97f0d1c..3e80d67 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/QueryDataSource.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.querycontext; +package org.apache.iotdb.db.engine.datasource; public class QueryDataSource { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/ReadOnlyMemChunk.java similarity index 96% rename from iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java rename to iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/ReadOnlyMemChunk.java index 5f7aac8..0af6cec 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/ReadOnlyMemChunk.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.querycontext; +package org.apache.iotdb.db.engine.datasource; import java.util.Collections; import java.util.Iterator; @@ -42,7 +42,7 @@ public class ReadOnlyMemChunk implements TimeValuePairSorter { private TimeValuePairSorter memSeries; private List<TimeValuePair> sortedTimeValuePairList; - Map<String, String> props; + private Map<String, String> props; private int floatPrecision = TSFileConfig.floatPrecision; /** @@ -159,4 +159,8 @@ public class ReadOnlyMemChunk implements TimeValuePairSorter { checkInitialized(); return sortedTimeValuePairList.isEmpty(); } + + public Map<String, String> getProps() { + return props; + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/SeriesDataSource.java similarity index 85% rename from iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java rename to iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/SeriesDataSource.java index 9c797ef..43f6972 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/SeriesDataSource.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/SeriesDataSource.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.querycontext; +package org.apache.iotdb.db.engine.datasource; import java.util.List; import org.apache.iotdb.db.engine.sgmanager.TsFileResource; @@ -53,10 +53,6 @@ public class SeriesDataSource { return sealedFiles; } - public void setSealedFiles(List<TsFileResource> sealedFiles) { - this.sealedFiles = sealedFiles; - } - public boolean hasUnsealedFile() { return unsealedFile != null; } @@ -65,10 +61,6 @@ public class SeriesDataSource { return unsealedFile; } - public void setUnsealedFile(UnsealedTsFile unsealedFile) { - this.unsealedFile = unsealedFile; - } - public boolean hasRawSeriesChunk() { return readableChunk != null; } @@ -77,10 +69,6 @@ public class SeriesDataSource { return readableChunk; } - public void setReadableChunk(ReadOnlyMemChunk readableChunk) { - this.readableChunk = readableChunk; - } - public Path getSeriesPath() { return seriesPath; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/UnsealedTsFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/UnsealedTsFile.java similarity index 96% rename from iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/UnsealedTsFile.java rename to iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/UnsealedTsFile.java index 6d140aa..5b6a009 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/UnsealedTsFile.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/datasource/UnsealedTsFile.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.querycontext; +package org.apache.iotdb.db.engine.datasource; import java.util.List; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 097f7f7..8b7d663 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -21,10 +21,9 @@ package org.apache.iotdb.db.engine.memtable; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Binary; public abstract class AbstractMemTable implements IMemTable { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 53ea2bc..39a4a9c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -19,9 +19,8 @@ package org.apache.iotdb.db.engine.memtable; import java.util.Map; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Binary; /** * IMemTable is designed to store data points which are not flushed into TsFile yet. An instance of diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemSeriesLazyMerger.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemSeriesLazyMerger.java index d360957..191b94e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemSeriesLazyMerger.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemSeriesLazyMerger.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.memtable; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk; import org.apache.iotdb.db.utils.TimeValuePair; public class MemSeriesLazyMerger implements TimeValuePairSorter { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java deleted file mode 100644 index 90f0423..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.io; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.FileChannel; -import java.util.ArrayList; - -import org.apache.iotdb.tsfile.read.reader.TsFileInput; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; -import org.apache.iotdb.tsfile.write.writer.TsFileOutput; - -public class OverflowIO extends TsFileIOWriter { - - private OverflowReadWriter overflowReadWriter; - - public OverflowIO(OverflowReadWriter overflowReadWriter) throws IOException { - super(overflowReadWriter, new ArrayList<>()); - this.overflowReadWriter = overflowReadWriter; - toTail(); - } - - public void clearRowGroupMetadatas() { - super.chunkGroupMetaDataList.clear(); - } - - @Override - public long getPos() throws IOException { - return overflowReadWriter.getPosition(); - } - - public void toTail() throws IOException { - overflowReadWriter.toTail(); - } - - public void close() throws IOException { - overflowReadWriter.close(); - } - - public void flush() throws IOException { - overflowReadWriter.flush(); - } - - public TsFileInput getReader() { - return overflowReadWriter; - } - - public TsFileOutput getWriter() { - return overflowReadWriter; - } - - public OutputStream getOutputStream() { - return overflowReadWriter; - } - - public static class OverflowReadWriter extends OutputStream implements TsFileOutput, TsFileInput { - - private static final String RW_MODE = "rw"; - private RandomAccessFile raf; - - public OverflowReadWriter(String filepath) throws FileNotFoundException { - this.raf = new RandomAccessFile(filepath, RW_MODE); - } - - public void toTail() throws IOException { - raf.seek(raf.length()); - } - - @Override - public long size() throws IOException { - return raf.length(); - } - - @Override - public long position() throws IOException { - return raf.getFilePointer(); - } - - @Override - public TsFileInput position(long newPosition) throws IOException { - raf.seek(newPosition); - return this; - } - - @Override - public int read(ByteBuffer dst) throws IOException { - return raf.getChannel().read(dst); - } - - @Override - public int read(ByteBuffer dst, long position) throws IOException { - return raf.getChannel().read(dst, position); - } - - @Override - public FileChannel wrapAsFileChannel() throws IOException { - return raf.getChannel(); - } - - @Override - public InputStream wrapAsInputStream() throws IOException { - return Channels.newInputStream(raf.getChannel()); - } - - @Override - public int read() throws IOException { - return raf.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - raf.readFully(b, off, len); - return len; - } - - @Override - public int readInt() throws IOException { - return raf.readInt(); - } - - @Override - public void write(ByteBuffer b) throws IOException { - raf.getChannel().write(b); - } - - @Override - public long getPosition() throws IOException { - return raf.getFilePointer(); - } - - @Override - public OutputStream wrapAsStream() throws IOException { - return Channels.newOutputStream(raf.getChannel()); - } - - @Override - public void truncate(long position) throws IOException { - raf.getChannel().truncate(position); - } - - @Override - public void write(int b) throws IOException { - raf.write(b); - } - - @Override - public void close() throws IOException { - raf.close(); - } - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java deleted file mode 100644 index 5fadb54..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.io; - -import java.util.HashMap; -import java.util.Map; -import org.apache.iotdb.db.engine.memtable.IMemTable; -import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.write.record.TSRecord; -import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; - -/** - * This class is used to store and query all overflow data in memory.<br> - */ -public class OverflowMemtable { - - /** - * store update and delete data - */ - private Map<String, Map<String, LongStatistics>> indexTrees; - - /** - * store insert data - */ - private IMemTable memTable; - - public OverflowMemtable() { - indexTrees = new HashMap<>(); - memTable = new PrimitiveMemTable(); - } - - public void insert(TSRecord tsRecord) { - for (DataPoint dataPoint : tsRecord.dataPointList) { - memTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(), - tsRecord.time, - dataPoint.getValue().toString()); - } - } - - /** - * @deprecated update time series data - */ - @Deprecated - public void update(String deviceId, String measurementId, long startTime, long endTime, - TSDataType dataType, - byte[] value) { - if (!indexTrees.containsKey(deviceId)) { - indexTrees.put(deviceId, new HashMap<>()); - } - if (!indexTrees.get(deviceId).containsKey(measurementId)) { - indexTrees.get(deviceId).put(measurementId, new LongStatistics()); - } - indexTrees.get(deviceId).get(measurementId).updateStats(startTime, endTime); - } - - public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) { - if (isFlushing) { - memTable = memTable.copy(); - memTable.delete(deviceId, measurementId, timestamp); - } else { - memTable.delete(deviceId, measurementId, timestamp); - } - } - - public ReadOnlyMemChunk queryOverflowInsertInMemory(String deviceId, String measurementId, - TSDataType dataType, Map<String, String> props) { - return memTable.query(deviceId, measurementId, dataType, props); - } - - public boolean isEmptyOfOverflowSeriesMap() { - return indexTrees.isEmpty(); - } - - public Map<String, Map<String, LongStatistics>> getOverflowSeriesMap() { - return indexTrees; - } - - public boolean isEmptyOfMemTable() { - return memTable.isEmpty(); - } - - public IMemTable getMemTabale() { - return memTable; - } - - public long getSize() { - // TODO: calculate the size of this overflow support - return 0; - } - - public void clear() { - indexTrees.clear(); - memTable.clear(); - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java deleted file mode 100644 index 288382f..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java +++ /dev/null @@ -1,735 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.io; - -import java.io.File; -import java.io.IOException; -import java.time.Instant; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBConstant; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.Processor; -import org.apache.iotdb.db.engine.bufferwrite.Action; -import org.apache.iotdb.db.engine.EngingeConstants; -import org.apache.iotdb.db.engine.memcontrol.BasicMemController; -import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel; -import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger; -import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.engine.pool.FlushManager; -import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile; -import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; -import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.OverflowProcessorException; -import org.apache.iotdb.db.exception.TsFileProcessorException; -import org.apache.iotdb.db.qp.constant.DatetimeUtils; -import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.utils.ImmediateFuture; -import org.apache.iotdb.db.utils.MemUtils; -import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; -import org.apache.iotdb.db.writelog.node.WriteLogNode; -import org.apache.iotdb.tsfile.common.conf.TSFileConfig; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.record.TSRecord; -import org.apache.iotdb.tsfile.write.schema.FileSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OverflowProcessor extends Processor { - - private static final Logger LOGGER = LoggerFactory.getLogger(OverflowProcessor.class); - private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig(); - private OverflowResource workResource; - private OverflowResource mergeResource; - - private OverflowMemtable workSupport; - private OverflowMemtable flushSupport; - - private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true); - private volatile boolean isMerge; - private int valueCount; - private String parentPath; - private long lastFlushTime = -1; - private AtomicLong dataPathCount = new AtomicLong(); - private ReentrantLock queryFlushLock = new ReentrantLock(); - - private Action overflowFlushAction; - private Action filenodeFlushAction; - private FileSchema fileSchema; - - private long memThreshold = TSFileConfig.groupSizeInByte; - private AtomicLong memSize = new AtomicLong(); - - private WriteLogNode logNode; - private VersionController versionController; - - private boolean isClosed = true; - private boolean isFlush = false; - - public OverflowProcessor(String processorName, Map<String, Action> parameters, - FileSchema fileSchema, VersionController versionController) - throws IOException { - super(processorName); - this.fileSchema = fileSchema; - this.versionController = versionController; - //for this old version, we only support one Overflow file - String overflowDirPath = TsFileDBConf.getOverflowDataDirs()[0]; - if (overflowDirPath.length() > 0 - && overflowDirPath.charAt(overflowDirPath.length() - 1) != File.separatorChar) { - overflowDirPath = overflowDirPath + File.separatorChar; - } - this.parentPath = overflowDirPath + processorName; - - overflowFlushAction = parameters.get(EngingeConstants.OVERFLOW_FLUSH_ACTION); - filenodeFlushAction = parameters - .get(EngingeConstants.FILENODE_PROCESSOR_FLUSH_ACTION); - - reopen(); - - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - logNode = MultiFileLogNodeManager.getInstance().getNode( - processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX, - getOverflowRestoreFile()); - } - } - - public void reopen() throws IOException { - if (!isClosed) { - return; - } - // recover file - File processorDataDir = new File(parentPath); - if (!processorDataDir.exists()) { - processorDataDir.mkdirs(); - } - recovery(processorDataDir); - - // memory - if (workSupport == null) { - workSupport = new OverflowMemtable(); - } else { - workSupport.clear(); - } - isClosed = false; - isFlush = false; - } - public void checkOpen() throws OverflowProcessorException { - if (isClosed) { - throw new OverflowProcessorException("OverflowProcessor already closed"); - } - } - - - private void recovery(File parentFile) throws IOException { - String[] subFilePaths = clearFile(parentFile.list()); - if (subFilePaths.length == 0) { - workResource = new OverflowResource(parentPath, - String.valueOf(dataPathCount.getAndIncrement()), versionController); - } else if (subFilePaths.length == 1) { - long count = Long.parseLong(subFilePaths[0]); - dataPathCount.addAndGet(count + 1); - workResource = new OverflowResource(parentPath, String.valueOf(count), versionController); - LOGGER.info("The overflow processor {} recover from work status.", getProcessorName()); - } else { - long count1 = Long.parseLong(subFilePaths[0]); - long count2 = Long.parseLong(subFilePaths[1]); - if (count1 > count2) { - long temp = count1; - count1 = count2; - count2 = temp; - } - dataPathCount.addAndGet(count2 + 1); - // work dir > merge dir - workResource = new OverflowResource(parentPath, String.valueOf(count2), versionController); - mergeResource = new OverflowResource(parentPath, String.valueOf(count1), versionController); - LOGGER.info("The overflow processor {} recover from merge status.", getProcessorName()); - } - } - - private String[] clearFile(String[] subFilePaths) { - // just clear the files whose name are number. - List<String> files = new ArrayList<>(); - for (String file : subFilePaths) { - try { - Long.valueOf(file); - files.add(file); - } catch (NumberFormatException e) { - // ignore the exception, if the name of file is not a number. - - } - } - return files.toArray(new String[files.size()]); - } - - /** - * insert one time-series record - */ - public void insert(TSRecord tsRecord) throws IOException { - try { - checkOpen(); - } catch (OverflowProcessorException e) { - throw new IOException(e); - } - // memory control - long memUage = MemUtils.getRecordSize(tsRecord); - UsageLevel usageLevel = BasicMemController.getInstance().acquireUsage(this, memUage); - switch (usageLevel) { - case SAFE: - // write data - workSupport.insert(tsRecord); - valueCount++; - // check flush - memUage = memSize.addAndGet(memUage); - if (memUage > memThreshold) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("The usage of memory {} in overflow processor {} reaches the threshold {}", - MemUtils.bytesCntToStr(memUage), getProcessorName(), - MemUtils.bytesCntToStr(memThreshold)); - } - flush(); - } - break; - case WARNING: - // write data - workSupport.insert(tsRecord); - valueCount++; - // flush - memSize.addAndGet(memUage); - flush(); - break; - case DANGEROUS: - throw new IOException("The insertion is rejected because dangerous memory level hit"); - } - - - - } - - /** - * @deprecated update one time-series data which time range is from startTime from endTime. - */ - @Deprecated - public void update(String deviceId, String measurementId, long startTime, long endTime, - TSDataType type, byte[] value) { - workSupport.update(deviceId, measurementId, startTime, endTime, type, value); - valueCount++; - } - - /** - * @deprecated this function need to be re-implemented. - */ - @Deprecated - public void update(String deviceId, String measurementId, long startTime, long endTime, - TSDataType type, String value) { - workSupport.update(deviceId, measurementId, startTime, endTime, type, - convertStringToBytes(type, value)); - valueCount++; - } - - private byte[] convertStringToBytes(TSDataType type, String o) { - switch (type) { - case INT32: - return BytesUtils.intToBytes(Integer.valueOf(o)); - case INT64: - return BytesUtils.longToBytes(Long.valueOf(o)); - case BOOLEAN: - return BytesUtils.boolToBytes(Boolean.valueOf(o)); - case FLOAT: - return BytesUtils.floatToBytes(Float.valueOf(o)); - case DOUBLE: - return BytesUtils.doubleToBytes(Double.valueOf(o)); - case TEXT: - return BytesUtils.stringToBytes(o); - default: - LOGGER.error("Unsupport data type: {}", type); - throw new UnsupportedOperationException("Unsupport data type:" + type); - } - } - - /** - * Delete data of a timeseries whose time ranges from 0 to timestamp. - * - * @param deviceId the deviceId of the timeseries. - * @param measurementId the measurementId of the timeseries. - * @param timestamp the upper-bound of deletion time. - * @param version the version number of this deletion. - * @param updatedModFiles add successfully updated Modification files to the list, and abort them - * when exception is raised - */ - public void delete(String deviceId, String measurementId, long timestamp, long version, - List<ModificationFile> updatedModFiles) throws IOException { - try { - checkOpen(); - } catch (OverflowProcessorException e) { - throw new IOException(e); - } - workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles); - workSupport.delete(deviceId, measurementId, timestamp, false); - if (isFlush()) { - mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles); - flushSupport.delete(deviceId, measurementId, timestamp, true); - } - } - - /** - * query all overflow data which contain insert data in memory, insert data in file, update/delete - * data in memory, update/delete data in file. - * - * @return OverflowSeriesDataSource - */ - public OverflowSeriesDataSource query(String deviceId, String measurementId, - TSDataType dataType, Map<String, String> props, QueryContext context) - throws IOException { - try { - checkOpen(); - } catch (OverflowProcessorException e) { - throw new IOException(e); - } - queryFlushLock.lock(); - try { - // query insert data in memory and unseqTsFiles - // memory - ReadOnlyMemChunk insertInMem = queryOverflowInsertInMemory(deviceId, measurementId, - dataType, props); - List<OverflowInsertFile> overflowInsertFileList = new ArrayList<>(); - // work file - Pair<String, List<ChunkMetaData>> insertInDiskWork = queryWorkDataInOverflowInsert(deviceId, - measurementId, - dataType, context); - if (insertInDiskWork.left != null) { - overflowInsertFileList - .add(0, new OverflowInsertFile(insertInDiskWork.left, - insertInDiskWork.right)); - } - // merge file - Pair<String, List<ChunkMetaData>> insertInDiskMerge = queryMergeDataInOverflowInsert(deviceId, - measurementId, dataType, context); - if (insertInDiskMerge.left != null) { - overflowInsertFileList - .add(0, new OverflowInsertFile(insertInDiskMerge.left - , insertInDiskMerge.right)); - } - // work file - return new OverflowSeriesDataSource(new Path(deviceId + "." + measurementId), dataType, - overflowInsertFileList, insertInMem); - } finally { - queryFlushLock.unlock(); - } - } - - /** - * query insert data in memory table. while flushing, merge the work memory table with flush - * memory table. - * - * @return insert data in SeriesChunkInMemTable - */ - private ReadOnlyMemChunk queryOverflowInsertInMemory(String deviceId, String measurementId, - TSDataType dataType, Map<String, String> props) { - - MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger(); - queryFlushLock.lock(); - try { - if (flushSupport != null && isFlush()) { - memSeriesLazyMerger - .addMemSeries( - flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType, props)); - } - memSeriesLazyMerger - .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId, - dataType, props)); - // memSeriesLazyMerger has handled the props, - // so we do not need to handle it again in the following readOnlyMemChunk - return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger, Collections.emptyMap()); - } finally { - queryFlushLock.unlock(); - } - } - - /** - * Get the insert data which is WORK in unseqTsFile. - * - * @param deviceId deviceId of the target time-series - * @param measurementId measurementId of the target time-series - * @param dataType data type of the target time-series - * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special - * time-series. - */ - private Pair<String, List<ChunkMetaData>> queryWorkDataInOverflowInsert(String deviceId, - String measurementId, TSDataType dataType, QueryContext context) { - return new Pair<>( - workResource.getInsertFilePath(), - workResource.getInsertMetadatas(deviceId, measurementId, dataType, context)); - } - - /** - * Get the all merge data in unseqTsFile and overflowFile. - * - * @return MergeSeriesDataSource - */ - public MergeSeriesDataSource queryMerge(String deviceId, String measurementId, - TSDataType dataType, QueryContext context) { - Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId, - measurementId, - dataType, context); - return new MergeSeriesDataSource(new OverflowInsertFile(mergeInsert.left, mergeInsert.right)); - } - - public OverflowSeriesDataSource queryMerge(String deviceId, String measurementId, - TSDataType dataType, boolean isMerge, QueryContext context) { - Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId, - measurementId, - dataType, context); - OverflowSeriesDataSource overflowSeriesDataSource = new OverflowSeriesDataSource( - new Path(deviceId + "." + measurementId)); - overflowSeriesDataSource.setReadableMemChunk(null); - overflowSeriesDataSource - .setOverflowInsertFileList( - Arrays.asList(new OverflowInsertFile(mergeInsert.left, mergeInsert.right))); - return overflowSeriesDataSource; - } - - /** - * Get the insert data which is MERGE in unseqTsFile - * - * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special - * time-series. - **/ - private Pair<String, List<ChunkMetaData>> queryMergeDataInOverflowInsert(String deviceId, - String measurementId, TSDataType dataType, QueryContext context) { - if (!isMerge) { - return new Pair<>(null, null); - } - return new Pair<>( - mergeResource.getInsertFilePath(), - mergeResource.getInsertMetadatas(deviceId, measurementId, dataType, context)); - } - - private void switchWorkToFlush() { - queryFlushLock.lock(); - try { - OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : flushSupport; - flushSupport = workSupport; - workSupport = temp; - isFlush = true; - } finally { - queryFlushLock.unlock(); - } - } - - private void switchFlushToWork() { - queryFlushLock.lock(); - try { - flushSupport.clear(); - workResource.appendMetadatas(); - isFlush = false; - } finally { - queryFlushLock.unlock(); - } - } - - public void switchWorkToMerge() throws IOException { - if (mergeResource == null) { - mergeResource = workResource; - workResource = new OverflowResource(parentPath, - String.valueOf(dataPathCount.getAndIncrement()), versionController); - } - isMerge = true; - LOGGER.info("The overflow processor {} switch from WORK to MERGE", getProcessorName()); - } - - public void switchMergeToWork() throws IOException { - if (mergeResource != null) { - FileReaderManager.getInstance().closeFileAndRemoveReader(mergeResource.getInsertFilePath()); - mergeResource.close(); - mergeResource.deleteResource(); - mergeResource = null; - } - isMerge = false; - LOGGER.info("The overflow processor {} switch from MERGE to WORK", getProcessorName()); - } - - public boolean isMerge() { - return isMerge; - } - - public boolean isFlush() { - return isFlush; - } - - private boolean flushTask(String displayMessage) { - boolean result; - long flushStartTime = System.currentTimeMillis(); - try { - LOGGER.info("The overflow processor {} starts flushing {}.", getProcessorName(), - displayMessage); - // flush data - workResource - .flush(fileSchema, flushSupport.getMemTabale(), - getProcessorName()); - filenodeFlushAction.act(); - // write-ahead log - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - logNode.notifyEndFlush(null); - } - result = true; - } catch (IOException e) { - LOGGER.error("Flush overflow processor {} rowgroup to file error in {}. Thread {} exits.", - getProcessorName(), displayMessage, Thread.currentThread().getName(), e); - result = false; - } catch (Exception e) { - LOGGER.error("FilenodeFlushAction action failed. Thread {} exits.", - Thread.currentThread().getName(), e); - result = false; - } finally { - // switch from flush to work. - switchFlushToWork(); - } - // log flush time - if (LOGGER.isInfoEnabled()) { - LOGGER - .info("The overflow processor {} ends flushing {}.", getProcessorName(), displayMessage); - long flushEndTime = System.currentTimeMillis(); - LOGGER.info( - "The overflow processor {} flush {}, start time is {}, flush end time is {}," + - " time consumption is {}ms", - getProcessorName(), displayMessage, - DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime), - DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime), - flushEndTime - flushStartTime); - } - return result; - } - - @Override - public synchronized Future<Boolean> flush() throws IOException { - // statistic information for flush - if (lastFlushTime > 0 && LOGGER.isInfoEnabled()) { - long thisFLushTime = System.currentTimeMillis(); - ZonedDateTime lastDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastFlushTime), - IoTDBDescriptor.getInstance().getConfig().getZoneID()); - ZonedDateTime thisDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisFLushTime), - IoTDBDescriptor.getInstance().getConfig().getZoneID()); - LOGGER.info( - "The overflow processor {} last flush time is {}, this flush time is {}," - + " flush time interval is {}s", - getProcessorName(), lastDateTime, thisDateTime, - (thisFLushTime - lastFlushTime) / 1000); - } - lastFlushTime = System.currentTimeMillis(); - try { - flushFuture.get(); - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } - if (valueCount > 0) { - try { - // backup newIntervalFile list and emptyIntervalFileNode - overflowFlushAction.act(); - } catch (Exception e) { - LOGGER.error("Flush the overflow rowGroup to file faied, when overflowFlushAction act"); - throw new IOException(e); - } - - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - try { - logNode.notifyStartFlush(); - } catch (IOException e) { - LOGGER.error("Overflow processor {} encountered an error when notifying log node, {}", - getProcessorName(), e); - } - } - BasicMemController.getInstance().releaseUsage(this, memSize.get()); - memSize.set(0); - valueCount = 0; - // switch from work to flush - switchWorkToFlush(); - flushFuture = FlushManager.getInstance().submit( () -> - flushTask("asynchronously")); - } else { - flushFuture = new ImmediateFuture(true); - } - return flushFuture; - - } - - @Override - public void close() throws TsFileProcessorException { - if (isClosed) { - return; - } - LOGGER.info("The overflow processor {} starts close operation.", getProcessorName()); - long closeStartTime = System.currentTimeMillis(); - // flush data - try { - flush().get(); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Encounter an interrupt error when waitting for the flushing, " - + "the bufferwrite processor is {}.", - getProcessorName(), e); - Thread.currentThread().interrupt(); - } catch (IOException e) { - throw new TsFileProcessorException(e); - } - if (LOGGER.isInfoEnabled()) { - LOGGER.info("The overflow processor {} ends close operation.", getProcessorName()); - // log close time - long closeEndTime = System.currentTimeMillis(); - LOGGER.info( - "The close operation of overflow processor {} starts at {} and ends at {}." - + " It comsumes {}ms.", - getProcessorName(), DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime), - DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime), - closeEndTime - closeStartTime); - } - try { - clear(); - } catch (IOException e) { - throw new TsFileProcessorException(e); - } - isClosed = true; - } - - public void clear() throws IOException { - if (workResource != null) { - workResource.close(); - workResource = null; - } - if (mergeResource != null) { - mergeResource.close(); - mergeResource = null; - } - } - - @Override - public boolean canBeClosed() { - // TODO: consider merge - return !isMerge; - } - - @Override - public long memoryUsage() { - return memSize.get(); - } - - public String getOverflowRestoreFile() { - return workResource.getPositionFilePath(); - } - - /** - * @return The sum of all timeseries's metadata size within this file. - */ - public long getMetaSize() { - // TODO : [MemControl] implement this - return 0; - } - - /** - * @return The size of overflow file corresponding to this processor. - */ - public long getFileSize() { - return workResource.getInsertFile().length() + memoryUsage(); - } - - /** - * Check whether current overflow file contains too many metadata or size of current overflow file - * is too large If true, close current file and open a new one. - */ - private boolean checkSize() { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - long metaSize = getMetaSize(); - long fileSize = getFileSize(); - LOGGER.info( - "The overflow processor {}, the size of metadata reaches {}," - + " the size of file reaches {}.", - getProcessorName(), MemUtils.bytesCntToStr(metaSize), MemUtils.bytesCntToStr(fileSize)); - if (metaSize >= config.getOverflowMetaSizeThreshold() - || fileSize >= config.getOverflowFileSizeThreshold()) { - LOGGER.info( - "The overflow processor {}, size({}) of the file {} reaches threshold {}," - + " size({}) of metadata reaches threshold {}.", - getProcessorName(), MemUtils.bytesCntToStr(fileSize), workResource.getInsertFilePath(), - MemUtils.bytesCntToStr(config.getOverflowMetaSizeThreshold()), - MemUtils.bytesCntToStr(metaSize), - MemUtils.bytesCntToStr(config.getOverflowMetaSizeThreshold())); - return true; - } else { - return false; - } - } - - public WriteLogNode getLogNode() { - return logNode; - } - - public OverflowResource getWorkResource() { - return workResource; - } - - @Override - public boolean equals(Object o) { - return this == o; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode()); - } - - /** - * used for test. We can block to wait for finishing flushing. - * @return the future of the flush() task. - */ - public Future<Boolean> getFlushFuture() { - return flushFuture; - } - - /** - * used for test. We can know when the flush() is called. - * @return the last flush() time. - */ - public long getLastFlushTime() { - return lastFlushTime; - } - - @Override - public String toString() { - return "OverflowProcessor in " + parentPath; - } - - public boolean isClosed() { - return isClosed; - } -} \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java deleted file mode 100644 index 17078f8..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java +++ /dev/null @@ -1,329 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.io; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.FileUtils; -import org.apache.iotdb.db.conf.IoTDBConstant; -import org.apache.iotdb.db.engine.memtable.IMemTable; -import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil; -import org.apache.iotdb.db.engine.modification.Deletion; -import org.apache.iotdb.db.engine.modification.Modification; -import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.utils.MemUtils; -import org.apache.iotdb.db.utils.QueryUtils; -import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; -import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.FileSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OverflowResource { - - private static final Logger LOGGER = LoggerFactory.getLogger(OverflowResource.class); - - private static final String INSERT_FILE_NAME = "unseqTsFile"; - private static final String POSITION_FILE_NAME = "positionFile"; - - private static final int FOOTER_LENGTH = 4; - private static final int POS_LENGTH = 8; - private String parentPath; - private String dataPath; - private String insertFilePath; - private String positionFilePath; - private File insertFile; - private OverflowIO insertIO; - private Map<String, Map<String, List<ChunkMetaData>>> insertMetadatas; - private List<ChunkGroupMetaData> appendInsertMetadatas; - private VersionController versionController; - private ModificationFile modificationFile; - - public OverflowResource(String parentPath, String dataPath, VersionController versionController) - throws IOException { - this.insertMetadatas = new HashMap<>(); - this.appendInsertMetadatas = new ArrayList<>(); - this.parentPath = parentPath; - this.dataPath = dataPath; - File dataFile = new File(parentPath, dataPath); - if (!dataFile.exists()) { - dataFile.mkdirs(); - } - insertFile = new File(dataFile, INSERT_FILE_NAME); - insertFilePath = insertFile.getPath(); - positionFilePath = new File(dataFile, POSITION_FILE_NAME).getPath(); - - Pair<Long, Long> position = readPositionInfo(); - try { - // insert stream - OverflowIO.OverflowReadWriter readWriter = new OverflowIO.OverflowReadWriter(insertFilePath); - // truncate - readWriter.wrapAsFileChannel().truncate(position.left); - // reposition - // seek to zero - readWriter.wrapAsFileChannel().position(0); - // seek to tail - // the tail is at least the len of magic string - insertIO = new OverflowIO(readWriter); - readMetadata(); - } catch (FileNotFoundException e){ - LOGGER.debug("Failed to construct the OverflowIO.", e); - } catch (IOException e) { - throw e; - } - this.versionController = versionController; - modificationFile = new ModificationFile(insertFilePath + ModificationFile.FILE_SUFFIX); - } - - private Pair<Long, Long> readPositionInfo() throws IOException { - File positionFile = new File(positionFilePath); - if (positionFile.exists()) { - try(FileInputStream inputStream = new FileInputStream(positionFile)) { - byte[] insertPositionData = new byte[8]; - byte[] updatePositionData = new byte[8]; - int byteRead = inputStream.read(insertPositionData); - if (byteRead != 8) { - throw new IOException("Not enough bytes for insertPositionData"); - } - byteRead = inputStream.read(updatePositionData); - if (byteRead != 8) { - throw new IOException("Not enough bytes for updatePositionData"); - } - long lastInsertPosition = BytesUtils.bytesToLong(insertPositionData); - long lastUpdatePosition = BytesUtils.bytesToLong(updatePositionData); - return new Pair<>(lastInsertPosition, lastUpdatePosition); - } - } else { - LOGGER.debug("No position info, returning a default value"); - long left = 0; - long right = 0; - File insertTempFile = new File(insertFilePath); - if (insertTempFile.exists()) { - left = insertTempFile.length(); - } - return new Pair<>(left, right); - } - - } - - private void writePositionInfo(long lastInsertPosition, long lastUpdatePosition) - throws IOException { - try(FileOutputStream outputStream = new FileOutputStream(positionFilePath)) { - byte[] data = new byte[16]; - BytesUtils.longToBytes(lastInsertPosition, data, 0); - BytesUtils.longToBytes(lastUpdatePosition, data, 8); - outputStream.write(data); - } - } - - private void readMetadata() throws IOException { - // read insert meta-data - insertIO.toTail(); - long position = insertIO.getPos(); - while (position != TsFileIOWriter.magicStringBytes.length) { - insertIO.getReader().position(position - FOOTER_LENGTH); - int metadataLength = insertIO.getReader().readInt(); - byte[] buf = new byte[metadataLength]; - insertIO.getReader().position(position - FOOTER_LENGTH - metadataLength); - insertIO.getReader().read(buf, 0, buf.length); - ByteArrayInputStream inputStream = new ByteArrayInputStream(buf); - TsDeviceMetadata tsDeviceMetadata = TsDeviceMetadata.deserializeFrom(inputStream); - byte[] bytesPosition = new byte[8]; - insertIO.getReader().position(position - FOOTER_LENGTH - metadataLength - POS_LENGTH); - insertIO.getReader().read(bytesPosition, 0, POS_LENGTH); - position = BytesUtils.bytesToLong(bytesPosition); - for (ChunkGroupMetaData rowGroupMetaData : tsDeviceMetadata.getChunkGroupMetaDataList()) { - String deviceId = rowGroupMetaData.getDeviceID(); - if (!insertMetadatas.containsKey(deviceId)) { - insertMetadatas.put(deviceId, new HashMap<>()); - } - for (ChunkMetaData chunkMetaData : rowGroupMetaData.getChunkMetaDataList()) { - chunkMetaData.setVersion(rowGroupMetaData.getVersion()); - String measurementId = chunkMetaData.getMeasurementUid(); - if (!insertMetadatas.get(deviceId).containsKey(measurementId)) { - insertMetadatas.get(deviceId).put(measurementId, new ArrayList<>()); - } - insertMetadatas.get(deviceId).get(measurementId).add(0, chunkMetaData); - } - } - } - } - - public List<ChunkMetaData> getInsertMetadatas(String deviceId, String measurementId, - TSDataType dataType, QueryContext context) { - List<ChunkMetaData> chunkMetaDatas = new ArrayList<>(); - if (insertMetadatas.containsKey(deviceId) && insertMetadatas.get(deviceId) - .containsKey(measurementId)) { - for (ChunkMetaData chunkMetaData : insertMetadatas.get(deviceId).get(measurementId)) { - // filter - if (chunkMetaData.getTsDataType().equals(dataType)) { - chunkMetaDatas.add(chunkMetaData); - } - } - } - try { - List<Modification> modifications = context.getPathModifications(modificationFile, - deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId); - QueryUtils.modifyChunkMetaData(chunkMetaDatas, modifications); - } catch (IOException e) { - LOGGER.error("Cannot access the modification file of Overflow {}, because:", parentPath, - e); - } - return chunkMetaDatas; - } - - public void flush(FileSchema fileSchema, IMemTable memTable, String processorName) - throws IOException { - // insert data - long startPos = insertIO.getPos(); - long startTime = System.currentTimeMillis(); - flush(fileSchema, memTable); - long timeInterval = System.currentTimeMillis() - startTime; - timeInterval = timeInterval == 0 ? 1 : timeInterval; - long insertSize = insertIO.getPos() - startPos; - if (LOGGER.isInfoEnabled()) { - LOGGER.info( - "Overflow processor {} flushes overflow insert data, actual:{}, time consumption:{} ms," - + " flush rate:{}/s", - processorName, MemUtils.bytesCntToStr(insertSize), timeInterval, - MemUtils.bytesCntToStr(insertSize / timeInterval * 1000)); - } - - writePositionInfo(insertIO.getPos(), 0); - } - - public void flush(FileSchema fileSchema, IMemTable memTable) throws IOException { - if (memTable != null && !memTable.isEmpty()) { - insertIO.toTail(); - long lastPosition = insertIO.getPos(); - MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable, - versionController.nextVersion()); - List<ChunkGroupMetaData> rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas(); - appendInsertMetadatas.addAll(rowGroupMetaDatas); - if (!rowGroupMetaDatas.isEmpty()) { - insertIO.getWriter().write(BytesUtils.longToBytes(lastPosition)); - TsDeviceMetadata tsDeviceMetadata = new TsDeviceMetadata(); - tsDeviceMetadata.setChunkGroupMetadataList(rowGroupMetaDatas); - long start = insertIO.getPos(); - tsDeviceMetadata.serializeTo(insertIO.getOutputStream()); - long end = insertIO.getPos(); - insertIO.getWriter().write(BytesUtils.intToBytes((int) (end - start))); - // clear the meta-data of insert IO - insertIO.clearRowGroupMetadatas(); - } - } - } - - public void appendMetadatas() { - if (!appendInsertMetadatas.isEmpty()) { - for (ChunkGroupMetaData rowGroupMetaData : appendInsertMetadatas) { - for (ChunkMetaData seriesChunkMetaData : rowGroupMetaData.getChunkMetaDataList()) { - addInsertMetadata(rowGroupMetaData.getDeviceID(), seriesChunkMetaData.getMeasurementUid(), - seriesChunkMetaData); - } - } - appendInsertMetadatas.clear(); - } - } - - public String getInsertFilePath() { - return insertFilePath; - } - - public File getInsertFile() { - return insertFile; - } - - public String getPositionFilePath() { - return positionFilePath; - } - - public void close() throws IOException { - insertMetadatas.clear(); - insertIO.close(); - modificationFile.close(); - } - - public void deleteResource() throws IOException { - FileUtils.forceDelete(new File(parentPath, dataPath)); - } - - private void cleanDir(String dir) throws IOException { - File file = new File(dir); - if (file.exists()) { - if (file.isDirectory()) { - File[] files = file.listFiles(); - if (files == null) { - return; - } - for (File subFile : files) { - cleanDir(subFile.getAbsolutePath()); - } - } - if (!file.delete()) { - throw new IOException(String.format("The file %s can't be deleted", dir)); - } - } - } - - private void addInsertMetadata(String deviceId, String measurementId, - ChunkMetaData chunkMetaData) { - if (!insertMetadatas.containsKey(deviceId)) { - insertMetadatas.put(deviceId, new HashMap<>()); - } - if (!insertMetadatas.get(deviceId).containsKey(measurementId)) { - insertMetadatas.get(deviceId).put(measurementId, new ArrayList<>()); - } - insertMetadatas.get(deviceId).get(measurementId).add(chunkMetaData); - } - - /** - * Delete data of a timeseries whose time ranges from 0 to timestamp. - * - * @param deviceId the deviceId of the timeseries. - * @param measurementId the measurementId of the timeseries. - * @param timestamp the upper-bound of deletion time. - * @param updatedModFiles add successfully updated modificationFile to this list, so that the - * deletion can be aborted when exception is thrown. - */ - public void delete(String deviceId, String measurementId, long timestamp, long version, - List<ModificationFile> updatedModFiles) - throws IOException { - modificationFile.write(new Deletion(deviceId + IoTDBConstant.PATH_SEPARATOR - + measurementId, version, timestamp)); - updatedModFiles.add(modificationFile); - } -} \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java deleted file mode 100644 index db86a78..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowedTsFileIOWriter.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.engine.overflow.io; - -import java.io.File; -import java.io.FileNotFoundException; -import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - -public class OverflowedTsFileIOWriter extends TsFileIOWriter { - - public OverflowedTsFileIOWriter(File file) throws FileNotFoundException { - super(); - this.out = new DefaultTsFileOutput(file, true); - - } - - -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadata.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadata.java deleted file mode 100644 index 5297aa6..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFFileMetadata.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.metadata; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -public class OFFileMetadata { - - private long lastFooterOffset; - private List<OFRowGroupListMetadata> rowGroupLists; - - public OFFileMetadata() { - //allowed to do nothing - } - - public OFFileMetadata(long lastFooterOffset, List<OFRowGroupListMetadata> rowGroupLists) { - this.lastFooterOffset = lastFooterOffset; - this.rowGroupLists = rowGroupLists; - } - - /** - * function for deserializing data from input stream. - */ - public static OFFileMetadata deserializeFrom(InputStream inputStream) throws IOException { - long lastFooterOffset = ReadWriteIOUtils.readLong(inputStream); - int size = ReadWriteIOUtils.readInt(inputStream); - List<OFRowGroupListMetadata> list = new ArrayList<>(); - for (int i = 0; i < size; i++) { - list.add(OFRowGroupListMetadata.deserializeFrom(inputStream)); - } - return new OFFileMetadata(lastFooterOffset, list); - } - - public static OFFileMetadata deserializeFrom(ByteBuffer buffer) throws IOException { - throw new IOException("The function has not been implemented."); - } - - /** - * add OFRowGroupListMetadata to list. - */ - public void addRowGroupListMetaData(OFRowGroupListMetadata rowGroupListMetadata) { - if (rowGroupLists == null) { - rowGroupLists = new ArrayList<>(); - } - rowGroupLists.add(rowGroupListMetadata); - } - - public List<OFRowGroupListMetadata> getRowGroupLists() { - return rowGroupLists == null ? null : Collections.unmodifiableList(rowGroupLists); - } - - public long getLastFooterOffset() { - return lastFooterOffset; - } - - public void setLastFooterOffset(long lastFooterOffset) { - this.lastFooterOffset = lastFooterOffset; - } - - @Override - public String toString() { - return String.format("OFFileMetadata{ last offset: %d, RowGroupLists: %s }", lastFooterOffset, - rowGroupLists.toString()); - } - - /** - * function for serializing data to output stream. - */ - public int serializeTo(OutputStream outputStream) throws IOException { - int byteLen = 0; - byteLen += ReadWriteIOUtils.write(lastFooterOffset, outputStream); - int size = rowGroupLists.size(); - byteLen += ReadWriteIOUtils.write(size, outputStream); - for (OFRowGroupListMetadata ofRowGroupListMetadata : rowGroupLists) { - byteLen += ofRowGroupListMetadata.serializeTo(outputStream); - } - return byteLen; - } - - public int serializeTo(ByteBuffer buffer) throws IOException { - throw new IOException("The function has not been implemented."); - } - -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadata.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadata.java deleted file mode 100644 index 274fb03..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFRowGroupListMetadata.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.metadata; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -/** - * Metadata of overflow RowGroup list. - */ -public class OFRowGroupListMetadata { - - private String deviceId; - private List<OFSeriesListMetadata> seriesList; - - private OFRowGroupListMetadata() { - } - - public OFRowGroupListMetadata(String deviceId) { - this.deviceId = deviceId; - seriesList = new ArrayList<>(); - } - - /** - * function for deserializing data from input stream. - */ - public static OFRowGroupListMetadata deserializeFrom(InputStream inputStream) throws IOException { - OFRowGroupListMetadata ofRowGroupListMetadata = new OFRowGroupListMetadata(); - ofRowGroupListMetadata.deviceId = ReadWriteIOUtils.readString(inputStream); - int size = ReadWriteIOUtils.readInt(inputStream); - List<OFSeriesListMetadata> list = new ArrayList<>(); - for (int i = 0; i < size; i++) { - list.add(OFSeriesListMetadata.deserializeFrom(inputStream)); - } - ofRowGroupListMetadata.seriesList = list; - return ofRowGroupListMetadata; - } - - public static OFRowGroupListMetadata deserializeFrom(ByteBuffer buffer) throws IOException { - throw new IOException("The function has not been implemented."); - } - - /** - * add OFSeriesListMetadata metadata to list. - */ - public void addSeriesListMetaData(OFSeriesListMetadata timeSeries) { - if (seriesList == null) { - seriesList = new ArrayList<>(); - } - seriesList.add(timeSeries); - } - - public List<OFSeriesListMetadata> getSeriesList() { - return seriesList == null ? null : Collections.unmodifiableList(seriesList); - } - - @Override - public String toString() { - return String.format("OFRowGroupListMetadata{ deviceId id: %s, series Lists: %s }", deviceId, - seriesList.toString()); - } - - public String getdeviceId() { - return deviceId; - } - - /** - * function for serialing data to output stream. - */ - public int serializeTo(OutputStream outputStream) throws IOException { - int byteLen = 0; - byteLen += ReadWriteIOUtils.write(deviceId, outputStream); - int size = seriesList.size(); - byteLen += ReadWriteIOUtils.write(size, outputStream); - for (OFSeriesListMetadata ofSeriesListMetadata : seriesList) { - byteLen += ofSeriesListMetadata.serializeTo(outputStream); - } - return byteLen; - } - - /** - * function for serializing data to byte buffer. - */ - public int serializeTo(ByteBuffer buffer) throws IOException { - throw new IOException("The function has not been implemented."); - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadata.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadata.java deleted file mode 100644 index 40f7302..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/metadata/OFSeriesListMetadata.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.metadata; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -/** - * Metadata of overflow series list. - */ -public class OFSeriesListMetadata { - - private String measurementId; - private List<ChunkMetaData> timeSeriesList; - - private OFSeriesListMetadata() { - } - - public OFSeriesListMetadata(String measurementId, List<ChunkMetaData> timeSeriesList) { - this.measurementId = measurementId; - this.timeSeriesList = timeSeriesList; - } - - /** - * function for deserializing data from input stream. - */ - public static OFSeriesListMetadata deserializeFrom(InputStream inputStream) throws IOException { - OFSeriesListMetadata ofSeriesListMetadata = new OFSeriesListMetadata(); - ofSeriesListMetadata.measurementId = ReadWriteIOUtils.readString(inputStream); - int size = ReadWriteIOUtils.readInt(inputStream); - List<ChunkMetaData> list = new ArrayList<>(); - for (int i = 0; i < size; i++) { - ChunkMetaData chunkMetaData = ChunkMetaData.deserializeFrom(inputStream); - list.add(chunkMetaData); - } - ofSeriesListMetadata.timeSeriesList = list; - return ofSeriesListMetadata; - } - - public static OFSeriesListMetadata deserializeFrom(ByteBuffer buffer) throws IOException { - throw new IOException("The function has not been implemented."); - } - - /** - * add TimeSeriesChunkMetaData to timeSeriesList. - */ - public void addSeriesMetaData(ChunkMetaData timeSeries) { - if (timeSeriesList == null) { - timeSeriesList = new ArrayList<>(); - } - timeSeriesList.add(timeSeries); - } - - public List<ChunkMetaData> getMetaDatas() { - return timeSeriesList == null ? null : Collections.unmodifiableList(timeSeriesList); - } - - @Override - public String toString() { - return String.format("OFSeriesListMetadata{ measurementId id: %s, series: %s }", measurementId, - timeSeriesList.toString()); - } - - public String getMeasurementId() { - return measurementId; - } - - /** - * function for serializing data to output stream. - */ - public int serializeTo(OutputStream outputStream) throws IOException { - int byteLen = 0; - byteLen += ReadWriteIOUtils.write(measurementId, outputStream); - byteLen += ReadWriteIOUtils.write(timeSeriesList.size(), outputStream); - for (ChunkMetaData chunkMetaData : timeSeriesList) { - byteLen += chunkMetaData.serializeTo(outputStream); - } - return byteLen; - } - - public int serializeTo(ByteBuffer buffer) throws IOException { - throw new IOException("The function has not been implemented."); - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/MergeStatus.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/MergeStatus.java deleted file mode 100644 index 9b9aa85..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/MergeStatus.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.utils; - -/** - * Used for IntervalTreeOperation.queryMemory() and IntervalTreeOperation.queryFileBlock(). - * - * <p>DONE means that a time pair is not used or this time pair has been merged into a new - * DynamicOneColumn MERGING means that a time pair is merging into a new DynamicOneColumn - */ -public enum MergeStatus { - DONE, MERGING -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/OverflowOpType.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/OverflowOpType.java deleted file mode 100644 index ba4c4f9..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/utils/OverflowOpType.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.utils; - -/** - * Include three types: INSERT,UPDATE,DELETE; INSERT is an operation which inserts a time point.<br> - * UPDATE is an operation which updates a time range.<br> DELETE is an operation which deletes a - * time range. Note that DELETE operation could only delete a time which is less than given time T. - * <br> - */ -public enum OverflowOpType { - INSERT, UPDATE, DELETE -} \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java index 7c996aa..d6c2a8c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java @@ -35,11 +35,10 @@ public class FlushManager { private static final int EXIT_WAIT_TIME = 60 * 1000; private ExecutorService pool; - private int threadCnt; private FlushManager() { IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - this.threadCnt = config.getConcurrentFlushThread(); + int threadCnt = config.getConcurrentFlushThread(); pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName()); } @@ -47,47 +46,6 @@ public class FlushManager { return InstanceHolder.instance; } - /** - * @throws ProcessorException - * if the pool is not terminated. - */ - public void reopen() throws ProcessorException { - if (!pool.isTerminated()) { - throw new ProcessorException("Flush Pool is not terminated!"); - } - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - pool = Executors.newFixedThreadPool(config.getConcurrentFlushThread()); - } - - public FlushManager(ExecutorService pool) { - this.pool = pool; - } - - /** - * Refuse new flush submits and exit when all RUNNING THREAD in the pool end. - * - * @param block - * if set to true, this method will wait for timeOut milliseconds. - * @param timeOut - * block time out in milliseconds. - * @throws ProcessorException - * if timeOut is reached or being interrupted while waiting to exit. - */ - public void forceClose(boolean block, long timeOut) throws ProcessorException { - pool.shutdownNow(); - if (block) { - try { - if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) { - throw new ProcessorException("Flush thread pool doesn't exit after " - + EXIT_WAIT_TIME + " ms"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProcessorException("Interrupted while waiting flush thread pool to exit. " - , e); - } - } - } /** * Block new flush submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end. @@ -114,22 +72,10 @@ public class FlushManager { } } - public synchronized Future<?> submit(Runnable task) { - return pool.submit(task); - } - public synchronized <T>Future<T> submit(Callable<T> task){ return pool.submit(task); } - public int getActiveCnt() { - return ((ThreadPoolExecutor) pool).getActiveCount(); - } - - public int getThreadCnt() { - return threadCnt; - } - private static class InstanceHolder { private InstanceHolder(){ //allowed to do nothing diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java index 81b0dcb..ff795c6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/MergeManager.java @@ -33,11 +33,10 @@ import org.apache.iotdb.db.exception.ProcessorException; public class MergeManager { private ExecutorService pool; - private int threadCnt; private MergeManager() { IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - this.threadCnt = config.getMergeConcurrentThreads(); + int threadCnt = config.getMergeConcurrentThreads(); pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.MERGE_SERVICE.getName()); } @@ -46,43 +45,6 @@ public class MergeManager { } /** - * reopen function. - * - * @throws ProcessorException if the pool is not terminated. - */ - public void reopen() throws ProcessorException { - if (!pool.isTerminated()) { - throw new ProcessorException("Merge pool is not terminated!"); - } - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - pool = Executors.newFixedThreadPool(config.getMergeConcurrentThreads()); - } - - /** - * Refuse new merge submits and exit when all RUNNING THREAD in the pool end. - * - * @param block if set block to true, this method will wait for timeOut milliseconds to close the - * merge pool. false, return directly. - * @param timeOut block time out in milliseconds. - * @throws ProcessorException if timeOut reach or interrupted while waiting to exit. - */ - public void forceClose(boolean block, long timeOut) throws ProcessorException { - pool.shutdownNow(); - if (block) { - try { - if (!pool.awaitTermination(timeOut, TimeUnit.MILLISECONDS)) { - throw new ProcessorException( - "Merge thread pool doesn't exit after " + timeOut + " ms"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProcessorException( - "Interrupted while waiting merge thread pool to exit. ", e); - } - } - } - - /** * Block new merge submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end. * * @param block if set to true, this method will wait for timeOut milliseconds. false, return @@ -106,22 +68,10 @@ public class MergeManager { } } - public Future<?> submit(Runnable task) { - return pool.submit(task); - } - public Future<?> submit(Callable task) { return pool.submit(task); } - public int getActiveCnt() { - return ((ThreadPoolExecutor) pool).getActiveCount(); - } - - public int getThreadCnt() { - return threadCnt; - } - private static class InstanceHolder { private InstanceHolder(){ //allowed to do nothing diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/MergeSeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/MergeSeriesDataSource.java deleted file mode 100644 index 2e42bac..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/MergeSeriesDataSource.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.querycontext; - -public class MergeSeriesDataSource { - - private OverflowInsertFile insertFile; - - public MergeSeriesDataSource(OverflowInsertFile insertFile) { - this.insertFile = insertFile; - } - - public OverflowInsertFile getInsertFile() { - return insertFile; - } - -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowInsertFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowInsertFile.java deleted file mode 100644 index 0c351df..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowInsertFile.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.querycontext; - -import java.util.List; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; - -public class OverflowInsertFile { - - private String filePath; - - // seriesChunkMetadata of selected series - private List<ChunkMetaData> timeSeriesChunkMetaData; - - public OverflowInsertFile() { - //allowed to do nothing - } - - public OverflowInsertFile(String path, List<ChunkMetaData> timeSeriesChunkMetaData) { - this.filePath = path; - this.timeSeriesChunkMetaData = timeSeriesChunkMetaData; - } - - public String getFilePath() { - return filePath; - } - - public List<ChunkMetaData> getChunkMetaDataList() { - return timeSeriesChunkMetaData; - } - - public void setTimeSeriesChunkMetaData(List<ChunkMetaData> timeSeriesChunkMetaData) { - this.timeSeriesChunkMetaData = timeSeriesChunkMetaData; - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowSeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowSeriesDataSource.java deleted file mode 100644 index 78571f9..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowSeriesDataSource.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.querycontext; - -import java.util.List; -import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.Path; - -public class OverflowSeriesDataSource { - - private Path seriesPath; - private TSDataType dataType; - // overflow tsfile - private List<OverflowInsertFile> overflowInsertFileList; - // unSeq mem-table - private ReadOnlyMemChunk readableMemChunk; - - public OverflowSeriesDataSource(Path seriesPath) { - this.seriesPath = seriesPath; - } - - public OverflowSeriesDataSource(Path seriesPath, TSDataType dataType, - List<OverflowInsertFile> overflowInsertFileList, ReadOnlyMemChunk readableMemChunk) { - this.seriesPath = seriesPath; - this.dataType = dataType; - this.overflowInsertFileList = overflowInsertFileList; - this.readableMemChunk = readableMemChunk; - } - - public List<OverflowInsertFile> getOverflowInsertFileList() { - return overflowInsertFileList; - } - - public void setOverflowInsertFileList(List<OverflowInsertFile> overflowInsertFileList) { - this.overflowInsertFileList = overflowInsertFileList; - } - - public ReadOnlyMemChunk getReadableMemChunk() { - return readableMemChunk; - } - - public void setReadableMemChunk(ReadOnlyMemChunk rawChunk) { - this.readableMemChunk = rawChunk; - } - - public Path getSeriesPath() { - return seriesPath; - } - - public void setSeriesPath(Path seriesPath) { - this.seriesPath = seriesPath; - } - - public TSDataType getDataType() { - return dataType; - } - - public boolean hasRawChunk() { - return readableMemChunk != null && !readableMemChunk.isEmpty(); - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowUpdateDeleteFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowUpdateDeleteFile.java deleted file mode 100644 index 875c98f..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/OverflowUpdateDeleteFile.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.querycontext; - -import java.util.List; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; - -public class OverflowUpdateDeleteFile { - - private String filePath; - private List<ChunkMetaData> timeSeriesChunkMetaDataList; - - public OverflowUpdateDeleteFile(String filePath, - List<ChunkMetaData> timeSeriesChunkMetaDataList) { - this.filePath = filePath; - this.timeSeriesChunkMetaDataList = timeSeriesChunkMetaDataList; - } - - public String getFilePath() { - return filePath; - } - - public List<ChunkMetaData> getTimeSeriesChunkMetaDataList() { - return timeSeriesChunkMetaDataList; - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java index 45d6faa..91440bd 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java @@ -38,7 +38,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.DatabaseEngine; import org.apache.iotdb.db.engine.memcontrol.BasicMemController; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.exception.TsFileProcessorException; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java index 42923ad..eeac144 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java @@ -54,8 +54,8 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.overflowdata.OverflowProcessor; import org.apache.iotdb.db.engine.pool.MergeManager; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.querycontext.SeriesDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.SeriesDataSource; import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor; import org.apache.iotdb.db.engine.version.SimpleFileVersionController; import org.apache.iotdb.db.engine.version.VersionController; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java index d787609..6abff9a 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java @@ -53,9 +53,9 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.pool.FlushManager; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; -import org.apache.iotdb.db.engine.querycontext.SeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; +import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.datasource.SeriesDataSource; +import org.apache.iotdb.db.engine.datasource.UnsealedTsFile; import org.apache.iotdb.db.engine.sgmanager.OperationResult; import org.apache.iotdb.db.engine.sgmanager.StorageGroupProcessor; import org.apache.iotdb.db.engine.version.VersionController; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java index 84ceeb0..46cce5b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java @@ -22,8 +22,8 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.iotdb.db.engine.sgmanager.TsFileResource; -import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.OverflowInsertFile; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; /** * <p> diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java index 310a850..406eb11 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java @@ -18,9 +18,7 @@ */ package org.apache.iotdb.db.query.control; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -28,10 +26,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.engine.DatabaseEngineFactory; -import org.apache.iotdb.db.engine.querycontext.SeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor; +import org.apache.iotdb.db.engine.datasource.OverflowSeriesDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.tsfile.read.common.Path; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java index aa494bb..49cd60c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java index 2819915..cdb286c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java @@ -22,8 +22,7 @@ package org.apache.iotdb.db.query.executor; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.metadata.MManager; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java index 7dfa31e..d2409a1 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.metadata.MManager; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java index c2b85eb..ff7a502 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.executor.groupby; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java index c6d34dc..013c0bf 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java @@ -19,16 +19,15 @@ package org.apache.iotdb.db.query.factory; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.iotdb.db.engine.sgmanager.TsFileResource; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.querycontext.SeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.SeriesDataSource; +import org.apache.iotdb.db.engine.datasource.UnsealedTsFile; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java index d64b49a..6292a65 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.fill; import java.io.IOException; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.AllDataReader; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java index dc46082..71d9e03 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.fill; import java.io.IOException; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.UnSupportedFillTypeException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.IPointReader; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java index b75fb4f..a893cfc 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.query.fill; import java.io.IOException; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.utils.TimeValuePair; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReader.java index 8c3e9cf..01a2d41 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReader.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.reader.mem; import java.util.Iterator; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk; import org.apache.iotdb.db.query.reader.IAggregateReader; import org.apache.iotdb.db.query.reader.IBatchReader; import org.apache.iotdb.db.query.reader.IPointReader; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java index 2a9abb4..a84c86b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.db.engine.querycontext.SeriesDataSource; +import org.apache.iotdb.db.engine.datasource.SeriesDataSource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.IAggregateReader; import org.apache.iotdb.db.query.reader.IBatchReader; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java index e8a3905..e8aafb1 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.db.engine.querycontext.SeriesDataSource; +import org.apache.iotdb.db.engine.datasource.SeriesDataSource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java index d4e75e6..8948488 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence; import java.io.IOException; import java.util.Collections; import java.util.List; -import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; +import org.apache.iotdb.db.engine.datasource.UnsealedTsFile; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.reader.IAggregateReader; import org.apache.iotdb.db.query.reader.IBatchReader; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestamp.java index 4aebdb8..06b36ba 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestamp.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestamp.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.reader.sequence; import java.io.IOException; -import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; +import org.apache.iotdb.db.engine.datasource.UnsealedTsFile; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java index 86b434f..4e31bff 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java @@ -24,7 +24,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.OR; import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES; import java.io.IOException; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java index 90a101b..ce7ae70 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit; import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.EngingeConstants; import org.apache.iotdb.db.engine.MetadataManagerHelper; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.db.utils.EnvironmentUtils; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java index 2ad558e..b934c3f 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java @@ -37,7 +37,7 @@ import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.EngingeConstants; import org.apache.iotdb.db.engine.MetadataManagerHelper; import org.apache.iotdb.db.engine.PathUtils; -import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.datasource.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.utils.EnvironmentUtils; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java index 2f22670..d08af05 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.filenode.DatabaseEngine; import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.datasource.QueryDataSource; import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.exception.MetadataArgsErrorException; import org.apache.iotdb.db.exception.PathErrorException; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java index b366c21..7b0a58d 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java @@ -31,8 +31,8 @@ import org.apache.iotdb.db.engine.PathUtils; import org.apache.iotdb.db.engine.bufferwrite.Action; import org.apache.iotdb.db.engine.bufferwrite.ActionException; import org.apache.iotdb.db.engine.EngingeConstants; -import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource; -import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; +import org.apache.iotdb.db.engine.datasource.MergeSeriesDataSource; +import org.apache.iotdb.db.engine.datasource.OverflowSeriesDataSource; import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.OverflowProcessorException; import org.apache.iotdb.db.query.context.QueryContext;
