This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TyBugFix in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 7fa60f1a9896c3cee4771369013612dbac0711b5 Author: JackieTien97 <[email protected]> AuthorDate: Thu Mar 12 11:34:38 2020 +0800 Release query resource while exception happened in query producer thread --- .../iotdb/db/engine/cache/TsFileMetaDataCache.java | 9 ++-- .../engine/storagegroup/StorageGroupProcessor.java | 51 +++++++++------------- .../dataset/RawQueryDataSetWithoutValueFilter.java | 46 +++++++++++++------ .../db/query/executor/RawDataQueryExecutor.java | 10 +++-- .../org/apache/iotdb/db/service/TSServiceImpl.java | 15 ++++++- .../tsfile/read/common/ExceptionBatchData.java | 16 +++++++ 6 files changed, 96 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java index fd93b32..d5301e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java @@ -18,8 +18,6 @@ */ package org.apache.iotdb.db.engine.cache; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -27,6 +25,9 @@ import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + /** * This class is used to cache <code>TsFileMetaData</code> of tsfile in IoTDB. */ @@ -103,10 +104,10 @@ public class TsFileMetaDataCache { } synchronized (internPath) { synchronized (cache) { - if (cache.containsKey(path)) { + if (cache.containsKey(tsFileResource)) { cacheHitNum.incrementAndGet(); printCacheLog(true); - return cache.get(path); + return cache.get(tsFileResource); } } printCacheLog(false); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 16bfb4a..72e05b1 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,28 +18,6 @@ */ package org.apache.iotdb.db.engine.storagegroup; -import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -62,14 +40,10 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SimpleFileVersionController; import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.exception.MergeException; -import org.apache.iotdb.db.exception.TsFileProcessorException; +import org.apache.iotdb.db.exception.*; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.exception.StorageGroupProcessorException; -import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.mnode.LeafMNode; import org.apache.iotdb.db.metadata.mnode.MNode; @@ -99,6 +73,17 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one @@ -1690,9 +1675,15 @@ public class StorageGroupProcessor { if (resource.getHistoricalVersions().containsAll(seqFile.getHistoricalVersions()) && !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions()) && seqFile.getWriteQueryLock().writeLock().tryLock()) { - iterator.remove(); - seqFile.remove(); - seqFile.getWriteQueryLock().writeLock().unlock(); + try { + iterator.remove(); + seqFile.remove(); + } catch (Exception e) { + logger.error("Something gets wrong while removing FullyOverlapFiles ", e); + throw e; + } finally { + seqFile.getWriteQueryLock().writeLock().unlock(); + } } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index 1d5734a..9273c87 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -95,14 +95,24 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { Thread.currentThread().interrupt(); reader.setHasRemaining(false); } catch (IOException e) { - LOGGER.error(String - .format("Something gets wrong while reading from the series reader %s: ", pathName), e); - reader.setHasRemaining(false); + putExceptionBatchData(e, String.format("Something gets wrong while reading from the series reader %s: ", pathName)); } catch (Exception e) { - LOGGER.error("Something gets wrong: ", e); + putExceptionBatchData(e, "Something gets wrong: "); + } + } + + private void putExceptionBatchData(Exception e, String logMessage) { + try { + LOGGER.error(logMessage, e); reader.setHasRemaining(false); + blockingQueue.put(new ExceptionBatchData(e)); + } catch (InterruptedException ex) { + ex.printStackTrace(); + LOGGER.error("Interrupted while putting ExceptionBatchData into the blocking queue: ", ex); + Thread.currentThread().interrupt(); } } + } private List<ManagedSeriesReader> seriesReaderList; @@ -141,7 +151,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { * @param readers readers in List(IPointReader) structure */ public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes, - List<ManagedSeriesReader> readers) throws InterruptedException { + List<ManagedSeriesReader> readers) throws IOException, InterruptedException { super(paths, dataTypes); this.seriesReaderList = readers; blockingQueueArray = new BlockingQueue[readers.size()]; @@ -153,7 +163,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { init(); } - private void init() throws InterruptedException { + private void init() throws IOException, InterruptedException { timeHeap = new TreeSet<>(); for (int i = 0; i < seriesReaderList.size(); i++) { ManagedSeriesReader reader = seriesReaderList.get(i); @@ -177,8 +187,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { * for RPC in RawData query between client and server fill time buffer, value buffers and bitmap * buffers */ - public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) - throws IOException, InterruptedException { + public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException { int seriesNum = seriesReaderList.size(); TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); @@ -339,14 +348,22 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { return tsQueryDataSet; } - private void fillCache(int seriesIndex) throws InterruptedException { + private void fillCache(int seriesIndex) throws IOException, InterruptedException { BatchData batchData = blockingQueueArray[seriesIndex].take(); // no more batch data in this time series queue if (batchData instanceof SignalBatchData) { noMoreDataInQueueArray[seriesIndex] = true; - } - // there are more batch data in this time series queue - else { + } else if (batchData instanceof ExceptionBatchData) { + // exception happened in producer thread + ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData; + LOGGER.error("exception happened in producer thread", exceptionBatchData.exception); + if (exceptionBatchData.exception instanceof IOException) { + throw (IOException)exceptionBatchData.exception; + } else if (exceptionBatchData.exception instanceof RuntimeException) { + throw (RuntimeException)exceptionBatchData.exception; + } + + } else { // there are more batch data in this time series queue cachedBatchDataArray[seriesIndex] = batchData; synchronized (seriesReaderList.get(seriesIndex)) { @@ -387,7 +404,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { * for spark/hadoop/hive integration and test */ @Override - protected RowRecord nextWithoutConstraint() { + protected RowRecord nextWithoutConstraint() throws IOException { int seriesNum = seriesReaderList.size(); long minTime = timeHeap.pollFirst(); @@ -414,6 +431,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { } catch (InterruptedException e) { LOGGER.error("Interrupted while taking from the blocking queue: ", e); Thread.currentThread().interrupt(); + } catch (IOException e) { + LOGGER.error("Got IOException", e); + throw e; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index 4f9ea11..4eb2cbd 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -18,8 +18,6 @@ */ package org.apache.iotdb.db.query.executor; -import java.util.ArrayList; -import java.util.List; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; @@ -41,6 +39,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * IoTDB query executor. */ @@ -60,7 +62,7 @@ public class RawDataQueryExecutor { * without filter or with global time filter. */ public QueryDataSet executeWithoutValueFilter(QueryContext context) - throws StorageEngineException { + throws StorageEngineException { List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context); try { @@ -69,6 +71,8 @@ public class RawDataQueryExecutor { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new StorageEngineException(e.getMessage()); + } catch (IOException e) { + throw new StorageEngineException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 3ad965c..130dd6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -592,6 +592,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { private TSExecuteStatementResp internalExecuteQueryStatement( long statementId, PhysicalPlan plan, int fetchSize, String username) { long t1 = System.currentTimeMillis(); + long queryId = -1; try { TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers @@ -611,7 +612,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } // else default ignoreTimeStamp is false resp.setOperationType(plan.getOperatorType().toString()); // generate the queryId for the operation - long queryId = generateQueryId(true); + queryId = generateQueryId(true); // put it into the corresponding Set statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId); @@ -629,6 +630,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return resp; } catch (Exception e) { logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + if (queryId != -1) { + try { + releaseQueryResource(queryId); + } catch (StorageEngineException ex) { + logger.error("Error happened while releasing query resource: ", ex); + } + } return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1); @@ -861,6 +869,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } catch (Exception e) { logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + try { + releaseQueryResource(req.queryId); + } catch (StorageEngineException ex) { + logger.error("Error happened while releasing query resource: ", ex); + } return RpcUtils.getTSFetchResultsResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java new file mode 100644 index 0000000..0d7eb57 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java @@ -0,0 +1,16 @@ +package org.apache.iotdb.tsfile.read.common; + + +public class ExceptionBatchData extends BatchData { + + public Exception exception; + + public ExceptionBatchData(Exception exception) { + this.exception = exception; + } + + @Override + public boolean hasCurrent() { + throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData"); + } +}
