This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch refactor_overflow in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 1ea3bfff7b892c696e93d7cbb74e695f07efc96d Author: 江天 <[email protected]> AuthorDate: Tue May 28 12:43:38 2019 +0800 abstract StorageGroupManager as StorageEngine --- .../db/engine/memcontrol/FlushPartialPolicy.java | 3 +- .../db/engine/overflow/io/OverflowProcessor.java | 1 - .../iotdb/db/engine/sgmanager/StorageEngine.java | 167 +++++++++++++++++++++ .../db/engine/sgmanager/StorageEngineFactory.java | 27 ++++ .../db/engine/sgmanager/StorageGroupManager.java | 136 ++++++----------- 5 files changed, 237 insertions(+), 97 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java index 31a5860..a3604a3 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/FlushPartialPolicy.java @@ -20,13 +20,12 @@ package org.apache.iotdb.db.engine.memcontrol; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.filenode.FileNodeManager; import org.apache.iotdb.db.utils.MemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class only gives a hint to FilenodeManager that it may flush some data to avoid rush hour. + * This class only gives a hint to StorageGroupManager that it may flush some data to avoid rush hour. */ public class FlushPartialPolicy implements Policy { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java index f052876..466138f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java @@ -38,7 +38,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.Processor; import org.apache.iotdb.db.engine.bufferwrite.Action; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; -import org.apache.iotdb.db.engine.filenode.FileNodeManager; import org.apache.iotdb.db.engine.memcontrol.BasicMemController; import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel; import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java new file mode 100644 index 0000000..72c7dcf --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngine.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.sgmanager; + +import java.util.List; +import java.util.Map; +import org.apache.iotdb.db.engine.filenode.TsFileResource; +import org.apache.iotdb.db.engine.memcontrol.BasicMemController; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageGroupManagerException; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; + +/** + * StorageEngine is an abstraction of IoTDB storage-level interfaces. + */ +public interface StorageEngine { + + /** + * This function is just for unit test. + */ + void reset(); + + /** + * Execute an insertion. + * + * @param plan an insert plan + * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not + * be recorded. if false, the statParamsHashMap will be updated. + */ + void insert(InsertPlan plan, boolean isMonitor) throws StorageGroupManagerException; + + /** + * update data. + */ + void update(String deviceId, String measurementId, long startTime, long endTime, + TSDataType type, String v) + throws StorageGroupManagerException; + + /** + * Delete data whose timestamp <= 'timestamp' of time series 'deviceId'.'measurementId'. + * @param deviceId + * @param measurementId + * @param timestamp + * @throws StorageGroupManagerException + */ + void deleteData(String deviceId, String measurementId, long timestamp) + throws StorageGroupManagerException; + + /** + * Similar to deleteData(), but only deletes data in sequence files. Only used by WAL recovery. + */ + void deleteInSeqFile(String deviceId, String measurementId, long timestamp) + throws StorageGroupManagerException; + + /** + * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery. + */ + void deleteInOverflow(String deviceId, String measurementId, long timestamp) + throws StorageGroupManagerException; + + /** + * Get a StorageGroup-level token for this query so that the StorageGroupProcessor may know which + * queries are occupying resources. + * + * @param deviceId queried deviceId + * @return a query token for the device. + */ + int beginQuery(String deviceId) throws StorageGroupManagerException; + + /** + * Notify the storage group of 'deviceId' that query 'token' has ended and its resource can be + * released. + */ + void endQuery(String deviceId, int token) throws StorageGroupManagerException; + + /** + * Find sealed files, unsealed file and memtable data in SeqFiles and OverflowFiles that contains the + * given series. + * @param seriesExpression provides the path of the series. + * @param context provides shared modifications across a query. + * @return sealed files, unsealed file and memtable data in SeqFiles or OverflowFiles + * @throws StorageGroupManagerException + */ + QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context) + throws StorageGroupManagerException; + + /** + * Append one specified tsfile to the storage group. <b>This method is only provided for + * transmission module</b> + * + * @param storageGroupName the seriesPath of storage group + * @param appendFile the appended tsfile information + */ + boolean appendFileToStorageGroup(String storageGroupName, TsFileResource appendFile, + String appendFilePath) throws StorageGroupManagerException; + + /** + * get all overlap tsfiles which are conflict with the appendFile. + * + * @param storageGroupName the seriesPath of storage group + * @param appendFile the appended tsfile information + */ + List<String> getOverlapFilesFromStorageGroup(String storageGroupName, TsFileResource appendFile, + String uuid) throws StorageGroupManagerException; + + /** + * merge all overflowed storage group. + * + * @throws StorageGroupManagerException StorageGroupManagerException + */ + void mergeAll() throws StorageGroupManagerException; + + /** + * delete one storage group. + */ + void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException; + + /** + * add time series. + */ + void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding, + CompressionType compressor, + Map<String, String> props) throws StorageGroupManagerException; + + /** + * Force to close the storage group processor. + */ + void closeStorageGroup(String processorName) throws StorageGroupManagerException; + + /** + * delete all storage groups. + */ + boolean deleteAll() throws StorageGroupManagerException; + + /** + * delete all storage groups. + */ + void closeAll() throws StorageGroupManagerException; + + /** + * force flush to control memory usage. + */ + void forceFlush(BasicMemController.UsageLevel level); +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java new file mode 100644 index 0000000..85218a7 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageEngineFactory.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.sgmanager; + +public class StorageEngineFactory { + + public static StorageEngine getCurrent() { + return StorageGroupManager.getInstance(); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java index d313c38..22d37b8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java @@ -36,15 +36,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; -import org.apache.iotdb.db.engine.filenode.FileNodeProcessor; import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.memcontrol.BasicMemController; -import org.apache.iotdb.db.engine.pool.FlushManager; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.exception.StorageGroupManagerException; -import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.PathErrorException; -import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.exception.StorageGroupManagerException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.IStatistic; @@ -71,7 +67,7 @@ import org.slf4j.LoggerFactory; * StorageGroupManager provides top-level interfaces to access IoTDB storage engine. It decides * which StorageGroup(s) to access in order to complete a query. */ -public class StorageGroupManager implements IStatistic, IService { +public class StorageGroupManager implements IStatistic, IService, StorageEngine { private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupManager.class); private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig(); @@ -92,9 +88,10 @@ public class StorageGroupManager implements IStatistic, IService { private HashMap<String, AtomicLong> statParamsHashMap; - private StorageGroupManager() { + private StorageGroupManager() throws StorageGroupManagerException { processorMap = new ConcurrentHashMap<>(); initStat(); + recover(); } private void initStat() { @@ -112,14 +109,14 @@ public class StorageGroupManager implements IStatistic, IService { } public static StorageGroupManager getInstance() { - return StorageGroupManagerHolder.INSTANCE; + return StorageGroupManagerHolder.instance; } - private void updateStatHashMapWhenFail(TSRecord tsRecord) { + private void updateStatHashMapWhenFail(InsertPlan plan) { statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_REQ_FAIL.name()) .incrementAndGet(); statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_POINTS_FAIL.name()) - .addAndGet(tsRecord.dataPointList.size()); + .addAndGet(plan.getValues().length); } /** @@ -172,7 +169,8 @@ public class StorageGroupManager implements IStatistic, IService { /** * This function is just for unit test. */ - public synchronized void resetStorageGroupManager() { + @Override + public synchronized void reset() { for (String key : statParamsHashMap.keySet()) { statParamsHashMap.put(key, new AtomicLong()); } @@ -232,7 +230,7 @@ public class StorageGroupManager implements IStatistic, IService { /** * recover the StorageGroupProcessors. */ - public void recover() throws StorageGroupManagerException { + private void recover() throws StorageGroupManagerException { List<String> storageGroupNames; try { storageGroupNames = MManager.getInstance().getAllStorageGroups(); @@ -254,13 +252,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * insert TsRecord into storage group. - * - * @param plan an insert plan - * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not - * be recorded. if false, the statParamsHashMap will be updated. - */ + @Override public void insert(InsertPlan plan, boolean isMonitor) throws StorageGroupManagerException { long timestamp = plan.getTime(); @@ -277,6 +269,7 @@ public class StorageGroupManager implements IStatistic, IService { result.toString())); } } catch (TsFileProcessorException e) { + updateStatHashMapWhenFail(plan); throw new StorageGroupManagerException(String.format("Fail to write in SG %s", processor.getProcessorName()), e); } finally { @@ -313,23 +306,15 @@ public class StorageGroupManager implements IStatistic, IService { } - /** - * update data. - */ + @Override public void update(String deviceId, String measurementId, long startTime, long endTime, TSDataType type, String v) throws StorageGroupManagerException { throw new UnsupportedOperationException("Method unimplemented"); } - /** - * Delete data whose timestamp <= 'timestamp' of time series 'deviceId'.'measurementId'. - * @param deviceId - * @param measurementId - * @param timestamp - * @throws StorageGroupManagerException - */ - public void deleteProcessor(String deviceId, String measurementId, long timestamp) + @Override + public void deleteData(String deviceId, String measurementId, long timestamp) throws StorageGroupManagerException { StorageGroupProcessor processor = getProcessor(deviceId, true); @@ -392,9 +377,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * Similar to delete(), but only deletes data in sequence files. Only used by WAL recovery. - */ + @Override public void deleteInSeqFile(String deviceId, String measurementId, long timestamp) throws StorageGroupManagerException { StorageGroupProcessor processor = getProcessor(deviceId, true); @@ -405,9 +388,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery. - */ + @Override public void deleteInOverflow(String deviceId, String measurementId, long timestamp) throws StorageGroupManagerException { StorageGroupProcessor processor = getProcessor(deviceId, true); @@ -418,13 +399,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * Get a StorageGroup-level token for this query so that the StorageGroupProcessor may know which - * queries are occupying resources. - * - * @param deviceId queried deviceId - * @return a query token for the device. - */ + @Override public int beginQuery(String deviceId) throws StorageGroupManagerException { StorageGroupProcessor processor = getProcessor(deviceId, true); try { @@ -438,10 +413,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * Notify the storage group of 'deviceId' that query 'token' has ended and its resource can be - * released. - */ + @Override public void endQuery(String deviceId, int token) throws StorageGroupManagerException { StorageGroupProcessor processorrocessor = getProcessor(deviceId, true); @@ -456,14 +428,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * Find sealed files, unsealed file and memtable data in SeqFiles and OverflowFiles that contains the - * given series. - * @param seriesExpression provides the path of the series. - * @param context provides shared modifications across a query. - * @return sealed files, unsealed file and memtable data in SeqFiles or OverflowFiles - * @throws StorageGroupManagerException - */ + @Override public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context) throws StorageGroupManagerException { String deviceId = seriesExpression.getSeriesPath().getDevice(); @@ -486,13 +451,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * Append one specified tsfile to the storage group. <b>This method is only provided for - * transmission module</b> - * - * @param storageGroupName the seriesPath of storage group - * @param appendFile the appended tsfile information - */ + @Override public boolean appendFileToStorageGroup(String storageGroupName, TsFileResource appendFile, String appendFilePath) throws StorageGroupManagerException { StorageGroupProcessor processor = getProcessor(storageGroupName, true); @@ -516,13 +475,9 @@ public class StorageGroupManager implements IStatistic, IService { return true; } - /** - * get all overlap tsfiles which are conflict with the appendFile. - * - * @param storageGroupName the seriesPath of storage group - * @param appendFile the appended tsfile information - */ - public List<String> getOverlapFilesFromStorageGroup(String storageGroupName, TsFileResource appendFile, + @Override + public List<String> getOverlapFilesFromStorageGroup(String storageGroupName, + TsFileResource appendFile, String uuid) throws StorageGroupManagerException { StorageGroupProcessor processor = getProcessor(storageGroupName, true); List<String> overlapFiles; @@ -536,11 +491,8 @@ public class StorageGroupManager implements IStatistic, IService { return overlapFiles; } - /** - * merge all overflowed storage group. - * - * @throws StorageGroupManagerException StorageGroupManagerException - */ + + @Override public void mergeAll() throws StorageGroupManagerException { if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { LOGGER.warn("Unable to merge all storage groups when the status is {}", @@ -649,9 +601,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * delete one storage group. - */ + @Override public void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException { if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { return; @@ -750,9 +700,7 @@ public class StorageGroupManager implements IStatistic, IService { return res; } - /** - * add time series. - */ + @Override public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding, CompressionType compressor, Map<String, String> props) throws StorageGroupManagerException { @@ -764,10 +712,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - - /** - * Force to close the storage group processor. - */ + @Override public void closeStorageGroup(String processorName) throws StorageGroupManagerException { if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { return; @@ -824,9 +769,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * delete all storage groups. - */ + @Override public synchronized boolean deleteAll() throws StorageGroupManagerException { LOGGER.info("Start deleting all storage group"); if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { @@ -849,9 +792,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * Try to close All. - */ + @Override public void closeAll() throws StorageGroupManagerException { LOGGER.info("Start closing all storage group processor"); if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { @@ -869,9 +810,7 @@ public class StorageGroupManager implements IStatistic, IService { } } - /** - * force flush to control memory usage. - */ + @Override public void forceFlush(BasicMemController.UsageLevel level) { switch (level) { // only select the most urgent (most active or biggest in size) @@ -981,7 +920,16 @@ public class StorageGroupManager implements IStatistic, IService { private StorageGroupManagerHolder() { } - private static final StorageGroupManager INSTANCE = new StorageGroupManager(); + private static StorageGroupManager instance; + + static { + try { + instance = new StorageGroupManager(); + } catch (StorageGroupManagerException e) { + LOGGER.error("Failed to initialize StorageGroupManager due to a FATAL error", e); + instance = null; + } + } } } \ No newline at end of file
