This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/sonar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 63353d1fdffd69ff271a153638e20effe422c04e Author: JackieTien97 <[email protected]> AuthorDate: Mon Jun 19 17:32:05 2023 +0800 server/src/main/java/org/apache/iotdb/db/query done --- .../execution/operator/source/SeriesScanUtil.java | 9 +- .../iotdb/db/query/context/QueryContext.java | 58 +--- .../iotdb/db/query/control/FileReaderManager.java | 23 +- .../iotdb/db/query/control/QueryFileManager.java | 16 +- .../db/query/control/QueryResourceManager.java | 1 + .../iotdb/db/query/control/SessionManager.java | 34 +-- .../db/query/control/SessionManagerMBean.java | 3 + .../query/control/clientsession/ClientSession.java | 24 +- .../control/clientsession/IClientSession.java | 4 +- .../clientsession/InternalClientSession.java | 7 +- .../control/clientsession/MqttClientSession.java | 5 +- .../query/reader/chunk/DiskAlignedChunkLoader.java | 5 +- .../db/query/reader/chunk/DiskChunkLoader.java | 2 +- .../query/reader/chunk/MemAlignedChunkLoader.java | 3 +- .../query/reader/chunk/MemAlignedChunkReader.java | 3 +- .../query/reader/chunk/MemAlignedPageReader.java | 108 ++++--- .../db/query/reader/chunk/MemChunkLoader.java | 2 +- .../db/query/reader/chunk/MemChunkReader.java | 10 +- .../iotdb/db/query/reader/chunk/MemPageReader.java | 310 +++++++++++++-------- .../metadata/DiskAlignedChunkMetadataLoader.java | 7 +- .../chunk/metadata/DiskChunkMetadataLoader.java | 7 +- .../metadata/MemAlignedChunkMetadataLoader.java | 1 + .../chunk/metadata/MemChunkMetadataLoader.java | 1 + .../universal/AlignedDescPriorityMergeReader.java | 1 + .../universal/AlignedPriorityMergeReader.java | 15 +- .../reader/universal/DescPriorityMergeReader.java | 4 +- .../iotdb/db/query/reader/universal/Element.java | 1 + .../reader/universal/PriorityMergeReader.java | 28 +- .../tsfile/file/metadata/AlignedChunkMetadata.java | 11 +- .../iotdb/tsfile/file/metadata/IChunkMetadata.java | 2 +- .../iotdb/tsfile/read/reader/IPageReader.java | 3 +- .../tsfile/read/reader/page/AlignedPageReader.java | 5 +- .../iotdb/tsfile/read/reader/page/PageReader.java | 3 +- 33 files changed, 375 insertions(+), 341 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index fd36d153706..27b111654ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -730,8 +730,7 @@ public class SeriesScanUtil { getPointReader( firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())), firstPageReader.version, - orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()), - context); + orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, firstPageReader); firstPageReader = null; @@ -755,8 +754,7 @@ public class SeriesScanUtil { mergeReader.addReader( getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), pageReader.version, - orderUtils.getOverlapCheckTime(pageReader.getStatistics()), - context); + orderUtils.getOverlapCheckTime(pageReader.getStatistics())); currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader); } } @@ -954,8 +952,7 @@ public class SeriesScanUtil { mergeReader.addReader( getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), pageReader.version, - orderUtils.getOverlapCheckTime(pageReader.getStatistics()), - context); + orderUtils.getOverlapCheckTime(pageReader.getStatistics())); } private TsBlock nextOverlappedPage() throws IOException { diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java index 2a44762fddd..38a13bd5ccf 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java +++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java @@ -59,16 +59,7 @@ public class QueryContext { private boolean debug; - /** - * To reduce the cost of memory, we only keep the a certain size statement. For statement whose - * length is over this, we keep its head and tail. - */ - private static final int MAX_STATEMENT_LENGTH = 64; - private long startTime; - - private String statement; - private long timeout; private volatile boolean isInterrupted = false; @@ -84,7 +75,6 @@ public class QueryContext { this.queryId = queryId; this.debug = debug; this.startTime = startTime; - this.statement = statement; this.timeout = timeout; } @@ -93,8 +83,6 @@ public class QueryContext { * them from 'modFile' and put then into the cache. */ public List<Modification> getPathModifications(ModificationFile modFile, PartialPath path) { - // TODO change a way to do the existing check to avoid this IO call each time. - // if the mods file does not exist, do not add it to the cache if (!modFile.exists()) { return Collections.emptyList(); @@ -117,6 +105,19 @@ public class QueryContext { }); } + /** + * Find the modifications of all aligned 'paths' in 'modFile'. If they are not in the cache, read + * them from 'modFile' and put then into the cache. + */ + public List<List<Modification>> getPathModifications(ModificationFile modFile, AlignedPath path) { + int n = path.getMeasurementList().size(); + List<List<Modification>> ans = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + ans.add(getPathModifications(modFile, path.getPathWithMeasurement(i))); + } + return ans; + } + private List<Modification> sortAndMerge(List<Modification> modifications) { modifications.sort( (o1, o2) -> { @@ -152,19 +153,6 @@ public class QueryContext { return result; } - /** - * Find the modifications of all aligned 'paths' in 'modFile'. If they are not in the cache, read - * them from 'modFile' and put then into the cache. - */ - public List<List<Modification>> getPathModifications(ModificationFile modFile, AlignedPath path) { - int n = path.getMeasurementList().size(); - List<List<Modification>> ans = new ArrayList<>(n); - for (int i = 0; i < n; i++) { - ans.add(getPathModifications(modFile, path.getPathWithMeasurement(i))); - } - return ans; - } - public long getQueryId() { return queryId; } @@ -189,19 +177,11 @@ public class QueryContext { return startTime; } - public String getStatement() { - return statement; - } - public QueryContext setStartTime(long startTime) { this.startTime = startTime; return this; } - public void getStatement(String statement) { - this.statement = statement; - } - public long getTimeout() { return timeout; } @@ -211,18 +191,6 @@ public class QueryContext { return this; } - public QueryContext setStatement(String statement) { - if (statement.length() <= 64) { - this.statement = statement; - } else { - this.statement = - statement.substring(0, MAX_STATEMENT_LENGTH / 2) - + "..." - + statement.substring(statement.length() - MAX_STATEMENT_LENGTH / 2); - } - return this; - } - public void setInterrupted(boolean interrupted) { isInterrupted = interrupted; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java index a3e55893c9c..7f4154998c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control; import org.apache.iotdb.commons.utils.TestOnly; @@ -49,7 +50,7 @@ public class FileReaderManager { /** * When number of file streams reached MAX_CACHED_FILE_SIZE, then we will print a warning log each - * PRINT_INTERVAL + * PRINT_INTERVAL. */ private static final int PRINT_INTERVAL = 10000; @@ -174,10 +175,9 @@ public class FileReaderManager { if (unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0) { closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), false); } - } else if (closedReferenceMap.containsKey(tsFile.getTsFilePath())) { - if (closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0) { - closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), true); - } + } else if (closedReferenceMap.containsKey(tsFile.getTsFilePath()) + && (closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0)) { + closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), true); } } tsFile.readUnlock(); @@ -212,6 +212,8 @@ public class FileReaderManager { /** * Only for <code>EnvironmentUtils.cleanEnv</code> method. To make sure that unit tests and * integration tests will not conflict with each other. + * + * @throws IOException if failed to close file handlers, IOException will be thrown */ public synchronized void closeAndRemoveAllOpenedReaders() throws IOException { Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = @@ -243,17 +245,6 @@ public class FileReaderManager { || (!isClosed && unclosedFileReaderMap.containsKey(tsFile.getTsFilePath())); } - public synchronized void writeFileReferenceInfo() { - DEBUG_LOGGER.info("[closedReferenceMap]\n"); - for (Map.Entry<String, AtomicInteger> entry : closedReferenceMap.entrySet()) { - DEBUG_LOGGER.info(String.format("\t%s: %d%n", entry.getKey(), entry.getValue().get())); - } - DEBUG_LOGGER.info("[unclosedReferenceMap]\n"); - for (Map.Entry<String, AtomicInteger> entry : unclosedReferenceMap.entrySet()) { - DEBUG_LOGGER.info(String.format("\t%s: %d", entry.getKey(), entry.getValue().get())); - } - } - @TestOnly public Map<String, TsFileSequenceReader> getClosedFileReaderMap() { return closedFileReaderMap; diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java index 58af4134731..95a839f6b3d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; @@ -30,19 +31,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -/** - * QueryFileManager records the paths of files that every query uses for QueryResourceManager. - * - * <p> - */ +/** QueryFileManager records the paths of files that every query uses for QueryResourceManager. */ public class QueryFileManager { private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); - /** Map<queryId, Map<filePath,filePath>> */ - private Map<Long, Map<TsFileResource, TsFileResource>> sealedFilePathsMap; + /** + * QueryId -> TsFileResource -> TsFileResource. Inner Map's key and value are actually the same + * object. + */ + private final Map<Long, Map<TsFileResource, TsFileResource>> sealedFilePathsMap; - private Map<Long, Map<TsFileResource, TsFileResource>> unsealedFilePathsMap; + private final Map<Long, Map<TsFileResource, TsFileResource>> unsealedFilePathsMap; QueryFileManager() { sealedFilePathsMap = new ConcurrentHashMap<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java index 2a83413ed10..aca3623e157 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control; import java.util.concurrent.atomic.AtomicLong; diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index 005d6f401ae..1d66c0f2f0b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -43,7 +43,6 @@ import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp; import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; import org.apache.commons.lang.StringUtils; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; +import java.util.function.LongConsumer; import java.util.stream.Collectors; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; @@ -82,7 +81,7 @@ public class SessionManager implements SessionManagerMBean { public static final TSProtocolVersion CURRENT_RPC_VERSION = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; - private static final boolean enableAuditLog = + private static final boolean ENABLE_AUDIT_LOG = IoTDBDescriptor.getInstance().getConfig().isEnableAuditLog(); protected SessionManager() { @@ -99,8 +98,7 @@ public class SessionManager implements SessionManagerMBean { String password, String zoneId, TSProtocolVersion tsProtocolVersion, - IoTDBConstant.ClientVersion clientVersion) - throws TException { + IoTDBConstant.ClientVersion clientVersion) { TSStatus loginStatus; BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp(); @@ -126,7 +124,7 @@ public class SessionManager implements SessionManagerMBean { openSessionResp.getMessage(), username, session); - if (enableAuditLog) { + if (ENABLE_AUDIT_LOG) { AuditLogger.log( String.format( "%s: Login status: %s. User : %s, opens Session-%s", @@ -135,7 +133,7 @@ public class SessionManager implements SessionManagerMBean { } } } else { - if (enableAuditLog) { + if (ENABLE_AUDIT_LOG) { AuditLogger.log( String.format("User %s opens Session failed with an incorrect password", username), AUTHOR_STATEMENT); @@ -146,7 +144,7 @@ public class SessionManager implements SessionManagerMBean { return openSessionResp; } - public boolean closeSession(IClientSession session, Consumer<Long> releaseByQueryId) { + public boolean closeSession(IClientSession session, LongConsumer releaseByQueryId) { releaseSessionResource(session, releaseByQueryId); MetricService.getInstance() .remove( @@ -154,16 +152,10 @@ public class SessionManager implements SessionManagerMBean { Metric.SESSION_IDLE_TIME.toString(), Tag.NAME.toString(), String.valueOf(session.getId())); - // TODO we only need to do so when query is killed by time out - // // close the socket. - // // currently, we only focus on RPC service. - // // TODO do we need to consider MQTT ClientSession and Internal Client? - // if (session instanceof ClientSession) { - // ((ClientSession) session).shutdownStream(); - // } + // TODO we only need to do so when query is killed by time out close the socket. IClientSession session1 = currSession.get(); if (session1 != null && session != session1) { - if (enableAuditLog) { + if (ENABLE_AUDIT_LOG) { AuditLogger.log( String.format( "The client-%s is trying to close another session %s, pls check if it's a bug", @@ -172,14 +164,14 @@ public class SessionManager implements SessionManagerMBean { } return false; } else { - if (enableAuditLog) { + if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("Session-%s is closing", session), AUTHOR_STATEMENT); } return true; } } - private void releaseSessionResource(IClientSession session, Consumer<Long> releaseQueryResource) { + private void releaseSessionResource(IClientSession session, LongConsumer releaseQueryResource) { Iterable<Long> statementIds = session.getStatementIds(); if (statementIds != null) { for (Long statementId : statementIds) { @@ -199,7 +191,7 @@ public class SessionManager implements SessionManagerMBean { long statementId, boolean haveStatementId, boolean haveSetQueryId, - Consumer<Long> releaseByQueryId) { + LongConsumer releaseByQueryId) { if (!checkLogin(session)) { return RpcUtils.getStatus( TSStatusCode.NOT_LOGIN, @@ -246,7 +238,7 @@ public class SessionManager implements SessionManagerMBean { } public void closeStatement( - IClientSession session, long statementId, Consumer<Long> releaseByQueryId) { + IClientSession session, long statementId, LongConsumer releaseByQueryId) { Set<Long> queryIdSet = session.removeStatementId(statementId); if (queryIdSet != null) { for (Long queryId : queryIdSet) { @@ -349,7 +341,7 @@ public class SessionManager implements SessionManagerMBean { } public void closeDataset( - IClientSession session, Long statementId, Long queryId, Consumer<Long> releaseByQueryId) { + IClientSession session, Long statementId, Long queryId, LongConsumer releaseByQueryId) { releaseByQueryId.accept(queryId); session.removeQueryId(statementId, queryId); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java index 74adfc27523..ddc939ee0f1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control; import java.util.Set; @@ -23,6 +24,8 @@ import java.util.Set; public interface SessionManagerMBean { /** + * Get all currently connected clients + * * @return client's reqId-username:ip:port <br> * reqId may be deprecated in 0.14 */ diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java index 49bb8e790fc..3984d84bcf8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java @@ -16,11 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control.clientsession; import org.apache.iotdb.service.rpc.thrift.TSConnectionType; -import java.io.IOException; import java.net.Socket; import java.util.Map; import java.util.Set; @@ -89,26 +89,4 @@ public class ClientSession extends IClientSession { queryIds.remove(queryId); } } - - /** - * shutdownStream will close the socket stream directly, which cause a TTransportException with - * type = TTransportException.END_OF_FILE. In this case, thrift client thread will be finished - * asap. - */ - public void shutdownStream() { - if (!clientSocket.isInputShutdown()) { - try { - clientSocket.shutdownInput(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - if (!clientSocket.isOutputShutdown()) { - try { - clientSocket.shutdownOutput(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java index 2e5b6466347..0644e6f4568 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control.clientsession; import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; @@ -34,7 +35,6 @@ public abstract class IClientSession { private ZoneId zoneId; - // TODO: why some Statement Plans use timeZone while others use ZoneId? private TimeZone timeZone; private String username; @@ -49,7 +49,7 @@ public abstract class IClientSession { abstract TSConnectionType getConnectionType(); - /** ip:port for thrift-based service and client id for mqtt-based service */ + /** ip:port for thrift-based service and client id for mqtt-based service. */ abstract String getConnectionId(); public void setClientVersion(ClientVersion clientVersion) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java index b532a0c95f3..1292e91933e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control.clientsession; import org.apache.iotdb.service.rpc.thrift.TSConnectionType; @@ -25,7 +26,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; -/** For Internal usage, like CQ and Select Into */ +/** For Internal usage, like CQ and Select Into. */ public class InternalClientSession extends IClientSession { // For CQ, it will be cq_id @@ -38,10 +39,6 @@ public class InternalClientSession extends IClientSession { this.clientID = clientID; } - public String getClientID() { - return clientID; - } - @Override public String getClientAddress() { return clientID; diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java index 97fe4ec13e7..8daf49e48b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.control.clientsession; import org.apache.iotdb.service.rpc.thrift.TSConnectionType; @@ -31,10 +32,6 @@ public class MqttClientSession extends IClientSession { this.clientID = clientID; } - public String getClientID() { - return clientID; - } - @Override public String getClientAddress() { return clientID; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java index 6029bbc1d95..47ed02bc729 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.cache.ChunkCache; @@ -52,7 +53,9 @@ public class DiskAlignedChunkLoader implements IChunkLoader { } @Override - public void close() throws IOException {} + public void close() throws IOException { + // there is no resource need to be closed + } @Override public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java index 71c26523ba3..5707184321b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java @@ -34,7 +34,7 @@ import java.io.IOException; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CONSTRUCT_CHUNK_READER_NONALIGNED_DISK; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.INIT_CHUNK_READER_NONALIGNED_DISK; -/** To read one chunk from disk, and only used in iotdb server module */ +/** To read one chunk from disk, and only used in iotdb server module. */ public class DiskChunkLoader implements IChunkLoader { private final boolean debug; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java index fb9d30e0924..0c1199f6783 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk; @@ -30,7 +31,7 @@ import org.apache.iotdb.tsfile.read.reader.IChunkReader; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CONSTRUCT_CHUNK_READER_ALIGNED_MEM; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.INIT_CHUNK_READER_ALIGNED_MEM; -/** To read one aligned chunk from memory, and only used in iotdb server module */ +/** To read one aligned chunk from memory, and only used in iotdb server module. */ public class MemAlignedChunkLoader implements IChunkLoader { private final AlignedReadOnlyMemChunk chunk; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java index 65ce131a969..c3867a36067 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk; @@ -29,7 +30,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -/** To read aligned chunk data in memory */ +/** To read aligned chunk data in memory. */ public class MemAlignedChunkReader implements IChunkReader { private List<IPageReader> pageReaderList; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java index 6dfa1b2ff20..0a5d2b32097 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; @@ -35,6 +36,7 @@ import org.apache.iotdb.tsfile.read.reader.series.PaginationController; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import java.io.IOException; +import java.io.Serializable; import java.util.List; import static org.apache.iotdb.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; @@ -65,35 +67,39 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { BatchData batchData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false); for (int row = 0; row < tsBlock.getPositionCount(); row++) { // save the first not null value of each row - Object firstNotNullObject = null; - for (int column = 0; column < tsBlock.getValueColumnCount(); column++) { - if (!tsBlock.getColumn(column).isNull(row)) { - firstNotNullObject = tsBlock.getColumn(column).getObject(row); - break; - } - } + Object firstNotNullObject = getFirstNotNullObject(row); // if all the sub sensors' value are null in current time // or current row is not satisfied with the filter, just discard it - // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will only - // accept AlignedPath with only one sub sensor - if (firstNotNullObject != null - && (valueFilter == null - || valueFilter.satisfy(tsBlock.getTimeByIndex(row), firstNotNullObject))) { - TsPrimitiveType[] values = new TsPrimitiveType[tsBlock.getValueColumnCount()]; - for (int column = 0; column < tsBlock.getValueColumnCount(); column++) { - if (tsBlock.getColumn(column) != null && !tsBlock.getColumn(column).isNull(row)) { - values[column] = tsBlock.getColumn(column).getTsPrimitiveType(row); - } - } - batchData.putVector(tsBlock.getTimeByIndex(row), values); + if (firstNotNullObject != null) { + doFilter(firstNotNullObject, row, batchData); } } return batchData.flip(); } + private Object getFirstNotNullObject(int rowIndex) { + for (int column = 0; column < tsBlock.getValueColumnCount(); column++) { + if (!tsBlock.getColumn(column).isNull(rowIndex)) { + return tsBlock.getColumn(column).getObject(rowIndex); + } + } + return null; + } + + private void doFilter(Object row, int rowIndex, BatchData batchData) { + if (valueFilter == null || valueFilter.satisfy(tsBlock.getTimeByIndex(rowIndex), row)) { + TsPrimitiveType[] values = new TsPrimitiveType[tsBlock.getValueColumnCount()]; + for (int column = 0; column < tsBlock.getValueColumnCount(); column++) { + if (tsBlock.getColumn(column) != null && !tsBlock.getColumn(column).isNull(rowIndex)) { + values[column] = tsBlock.getColumn(column).getTsPrimitiveType(rowIndex); + } + } + batchData.putVector(tsBlock.getTimeByIndex(rowIndex), values); + } + } + private boolean pageSatisfy() { if (valueFilter != null) { - // TODO accept valueStatisticsList to filter return valueFilter.satisfy(getStatistics()); } else { // For aligned series, When we only query some measurements under an aligned device, if the @@ -102,7 +108,7 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { // NOTE: if we change the query semantic in the future for aligned series, we need to remove // this check here. long rowCount = getTimeStatistics().getCount(); - for (Statistics statistics : getValueStatisticsList()) { + for (Statistics<Serializable> statistics : getValueStatisticsList()) { if (statistics == null || statistics.hasNullValue(rowCount)) { return true; } @@ -124,8 +130,21 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { return builder.build(); } - boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()]; + boolean[] satisfyInfo = buildSatisfyInfoArray(); + + boolean[] hasValue = buildHasValueArray(); + + // build time column + int readEndIndex = buildTimeColumn(satisfyInfo, hasValue); + + // build value column + buildValueColumns(satisfyInfo, hasValue, readEndIndex); + + return builder.build(); + } + private boolean[] buildSatisfyInfoArray() { + boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()]; for (int row = 0; row < tsBlock.getPositionCount(); row++) { long time = tsBlock.getTimeByIndex(row); // ValueFilter in MPP will only contain time filter now. @@ -133,7 +152,10 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { satisfyInfo[row] = true; } } + return satisfyInfo; + } + private boolean[] buildHasValueArray() { boolean[] hasValue = new boolean[tsBlock.getPositionCount()]; // other value column for (int column = 0; column < tsBlock.getValueColumnCount(); column++) { @@ -142,29 +164,41 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { hasValue[row] = hasValue[row] || !valueColumn.isNull(row); } } + return hasValue; + } - // build time column + private int buildTimeColumn(boolean[] satisfyInfo, boolean[] hasValue) { int readEndIndex = tsBlock.getPositionCount(); - for (int row = 0; row < tsBlock.getPositionCount(); row++) { - if (!satisfyInfo[row] || !hasValue[row]) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - satisfyInfo[row] = false; + for (int row = 0; row < readEndIndex; row++) { + + if (needSkipCurrentRow(satisfyInfo, hasValue, row)) { continue; } + if (paginationController.hasCurLimit()) { builder.getTimeColumnBuilder().writeLong(tsBlock.getTimeByIndex(row)); builder.declarePosition(); paginationController.consumeLimit(); } else { readEndIndex = row; - break; } } + return readEndIndex; + } - // build value column + private boolean needSkipCurrentRow(boolean[] satisfyInfo, boolean[] hasValue, int rowIndex) { + if (!satisfyInfo[rowIndex] || !hasValue[rowIndex]) { + return true; + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + satisfyInfo[rowIndex] = false; + return true; + } + return false; + } + + private void buildValueColumns(boolean[] satisfyInfo, boolean[] hasValue, int readEndIndex) { for (int column = 0; column < tsBlock.getValueColumnCount(); column++) { Column valueColumn = tsBlock.getColumn(column); ColumnBuilder valueBuilder = builder.getColumnBuilder(column); @@ -178,26 +212,24 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { } } } - - return builder.build(); } @Override - public Statistics getStatistics() { + public Statistics<Serializable> getStatistics() { return chunkMetadata.getStatistics(); } @Override - public Statistics getStatistics(int index) { + public Statistics<Serializable> getStatistics(int index) { return chunkMetadata.getStatistics(index); } @Override - public Statistics getTimeStatistics() { + public Statistics<Serializable> getTimeStatistics() { return chunkMetadata.getTimeStatistics(); } - private List<Statistics> getValueStatisticsList() { + private List<Statistics<Serializable>> getValueStatisticsList() { return chunkMetadata.getValueStatisticsList(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java index 1f90f228726..8b8a51e4e28 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java @@ -31,7 +31,7 @@ import org.apache.iotdb.tsfile.read.reader.IChunkReader; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CONSTRUCT_CHUNK_READER_NONALIGNED_MEM; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.INIT_CHUNK_READER_NONALIGNED_MEM; -/** To read one chunk from memory, and only used in iotdb server module */ +/** To read one chunk from memory, and only used in iotdb server module. */ public class MemChunkLoader implements IChunkLoader { private final ReadOnlyMemChunk chunk; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java index 2d8394f4af7..c0d0ab3375b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; @@ -30,14 +31,15 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -/** To read chunk data in memory */ +/** To read chunk data in memory. */ public class MemChunkReader implements IChunkReader, IPointReader { - private IPointReader timeValuePairIterator; - private Filter filter; + private final IPointReader timeValuePairIterator; + private final Filter filter; + private final List<IPageReader> pageReaderList; + private boolean hasCachedTimeValuePair; private TimeValuePair cachedTimeValuePair; - private List<IPageReader> pageReaderList; public MemChunkReader(ReadOnlyMemChunk readableChunk, Filter filter) { timeValuePairIterator = readableChunk.getPointReader(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java index 4e4c4916ed6..5439ca5e72b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -35,6 +36,7 @@ import org.apache.iotdb.tsfile.read.reader.series.PaginationController; import org.apache.iotdb.tsfile.utils.Binary; import java.io.IOException; +import java.io.Serializable; import java.util.Collections; import java.util.List; @@ -94,7 +96,7 @@ public class MemPageReader implements IPageReader { } private boolean pageSatisfy() { - Statistics statistics = getStatistics(); + Statistics<Serializable> statistics = getStatistics(); if (valueFilter == null || valueFilter.allSatisfy(statistics)) { long rowCount = statistics.getCount(); if (paginationController.hasCurOffset(rowCount)) { @@ -117,130 +119,22 @@ public class MemPageReader implements IPageReader { if (pageSatisfy()) { switch (dataType) { case BOOLEAN: - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - long time = tsBlock.getTimeColumn().getLong(i); - boolean value = tsBlock.getColumn(0).getBoolean(i); - if (valueFilter != null && !valueFilter.satisfy(time, value)) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (paginationController.hasCurLimit()) { - timeBuilder.writeLong(time); - valueBuilder.writeBoolean(value); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - break; - } - } + doWithBoolean(builder, timeBuilder, valueBuilder); break; case INT32: - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - long time = tsBlock.getTimeColumn().getLong(i); - int value = tsBlock.getColumn(0).getInt(i); - if (valueFilter != null && !valueFilter.satisfy(time, value)) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (paginationController.hasCurLimit()) { - timeBuilder.writeLong(time); - valueBuilder.writeInt(value); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - break; - } - } + doWithInt32(builder, timeBuilder, valueBuilder); break; case INT64: - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - long time = tsBlock.getTimeColumn().getLong(i); - long value = tsBlock.getColumn(0).getLong(i); - if (valueFilter != null && !valueFilter.satisfy(time, value)) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (paginationController.hasCurLimit()) { - timeBuilder.writeLong(time); - valueBuilder.writeLong(value); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - break; - } - } + doWithInt64(builder, timeBuilder, valueBuilder); break; case FLOAT: - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - long time = tsBlock.getTimeColumn().getLong(i); - float value = tsBlock.getColumn(0).getFloat(i); - if (valueFilter != null && !valueFilter.satisfy(time, value)) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (paginationController.hasCurLimit()) { - timeBuilder.writeLong(time); - valueBuilder.writeFloat(value); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - break; - } - } + doWithFloat(builder, timeBuilder, valueBuilder); break; case DOUBLE: - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - long time = tsBlock.getTimeColumn().getLong(i); - double value = tsBlock.getColumn(0).getDouble(i); - if (valueFilter != null && !valueFilter.satisfy(time, value)) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (paginationController.hasCurLimit()) { - timeBuilder.writeLong(time); - valueBuilder.writeDouble(value); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - break; - } - } + doWithDouble(builder, timeBuilder, valueBuilder); break; case TEXT: - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - long time = tsBlock.getTimeColumn().getLong(i); - Binary value = tsBlock.getColumn(0).getBinary(i); - if (valueFilter != null && !valueFilter.satisfy(time, value)) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (paginationController.hasCurLimit()) { - timeBuilder.writeLong(time); - valueBuilder.writeBinary(value); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - break; - } - } + doWithText(builder, timeBuilder, valueBuilder); break; default: throw new UnSupportedDataTypeException(String.valueOf(dataType)); @@ -249,8 +143,188 @@ public class MemPageReader implements IPageReader { return builder.build(); } + private void doWithBoolean( + TsBlockBuilder builder, TimeColumnBuilder timeBuilder, ColumnBuilder valueBuilder) { + int endIndex = tsBlock.getPositionCount(); + for (int i = 0; i < endIndex; i++) { + long time = tsBlock.getTimeColumn().getLong(i); + boolean value = tsBlock.getColumn(0).getBoolean(i); + if (needCurrentBooleanRow(time, value)) { + if (paginationController.hasCurLimit()) { + timeBuilder.writeLong(time); + valueBuilder.writeBoolean(value); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + endIndex = i; + } + } + } + } + + private boolean needCurrentBooleanRow(long time, boolean value) { + if (valueFilter != null && !valueFilter.satisfy(time, value)) { + return false; + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + return false; + } + return true; + } + + private void doWithInt32( + TsBlockBuilder builder, TimeColumnBuilder timeBuilder, ColumnBuilder valueBuilder) { + int endIndex = tsBlock.getPositionCount(); + for (int i = 0; i < endIndex; i++) { + long time = tsBlock.getTimeColumn().getLong(i); + int value = tsBlock.getColumn(0).getInt(i); + if (needCurrentInt32Row(time, value)) { + if (paginationController.hasCurLimit()) { + timeBuilder.writeLong(time); + valueBuilder.writeInt(value); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + endIndex = i; + } + } + } + } + + private boolean needCurrentInt32Row(long time, int value) { + if (valueFilter != null && !valueFilter.satisfy(time, value)) { + return false; + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + return false; + } + return true; + } + + private void doWithInt64( + TsBlockBuilder builder, TimeColumnBuilder timeBuilder, ColumnBuilder valueBuilder) { + int endIndex = tsBlock.getPositionCount(); + for (int i = 0; i < endIndex; i++) { + long time = tsBlock.getTimeColumn().getLong(i); + long value = tsBlock.getColumn(0).getLong(i); + if (needCurrentInt64Row(time, value)) { + if (paginationController.hasCurLimit()) { + timeBuilder.writeLong(time); + valueBuilder.writeLong(value); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + endIndex = i; + } + } + } + } + + private boolean needCurrentInt64Row(long time, long value) { + if (valueFilter != null && !valueFilter.satisfy(time, value)) { + return false; + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + return false; + } + return true; + } + + private void doWithFloat( + TsBlockBuilder builder, TimeColumnBuilder timeBuilder, ColumnBuilder valueBuilder) { + int endIndex = tsBlock.getPositionCount(); + for (int i = 0; i < endIndex; i++) { + long time = tsBlock.getTimeColumn().getLong(i); + float value = tsBlock.getColumn(0).getFloat(i); + if (needCurrentFloatRow(time, value)) { + if (paginationController.hasCurLimit()) { + timeBuilder.writeLong(time); + valueBuilder.writeFloat(value); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + endIndex = i; + } + } + } + } + + private boolean needCurrentFloatRow(long time, float value) { + if (valueFilter != null && !valueFilter.satisfy(time, value)) { + return false; + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + return false; + } + return true; + } + + private void doWithDouble( + TsBlockBuilder builder, TimeColumnBuilder timeBuilder, ColumnBuilder valueBuilder) { + int endIndex = tsBlock.getPositionCount(); + for (int i = 0; i < endIndex; i++) { + long time = tsBlock.getTimeColumn().getLong(i); + double value = tsBlock.getColumn(0).getDouble(i); + if (needCurrentDoubleRow(time, value)) { + if (paginationController.hasCurLimit()) { + timeBuilder.writeLong(time); + valueBuilder.writeDouble(value); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + endIndex = i; + } + } + } + } + + private boolean needCurrentDoubleRow(long time, double value) { + if (valueFilter != null && !valueFilter.satisfy(time, value)) { + return false; + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + return false; + } + return true; + } + + private void doWithText( + TsBlockBuilder builder, TimeColumnBuilder timeBuilder, ColumnBuilder valueBuilder) { + int endIndex = tsBlock.getPositionCount(); + for (int i = 0; i < endIndex; i++) { + long time = tsBlock.getTimeColumn().getLong(i); + Binary value = tsBlock.getColumn(0).getBinary(i); + if (needCurrentTextRow(time, value)) { + if (paginationController.hasCurLimit()) { + timeBuilder.writeLong(time); + valueBuilder.writeBinary(value); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + endIndex = i; + } + } + } + } + + private boolean needCurrentTextRow(long time, Binary value) { + if (valueFilter != null && !valueFilter.satisfy(time, value)) { + return false; + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + return false; + } + return true; + } + @Override - public Statistics getStatistics() { + public Statistics<Serializable> getStatistics() { return chunkMetadata.getStatistics(); } @@ -274,5 +348,7 @@ public class MemPageReader implements IPageReader { } @Override - public void initTsBlockBuilder(List<TSDataType> dataTypes) {} + public void initTsBlockBuilder(List<TSDataType> dataTypes) { + // non-aligned page reader don't need to init TsBlockBuilder at the very beginning + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java index 165b41d374f..7c7f1d6160c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.AlignedPath; @@ -64,12 +65,12 @@ public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { @Override public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { - long t1 = System.nanoTime(); + final long t1 = System.nanoTime(); try { List<AlignedChunkMetadata> alignedChunkMetadataList = ((AlignedTimeSeriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); - long t2 = System.nanoTime(); + final long t2 = System.nanoTime(); // get all sub sensors' modifications List<List<Modification>> pathModifications = context.getPathModifications(resource.getModFile(), seriesPath); @@ -93,7 +94,7 @@ public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { CHUNK_METADATA_MODIFICATION_ALIGNED_DISK, System.nanoTime() - t2); // remove not satisfied ChunkMetaData - long t3 = System.nanoTime(); + final long t3 = System.nanoTime(); alignedChunkMetadataList.removeIf( alignedChunkMetaData -> (filter != null diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java index 407f12d705f..d60e9079ac6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; @@ -62,12 +63,12 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { @Override public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { - long t1 = System.nanoTime(); + final long t1 = System.nanoTime(); try { List<IChunkMetadata> chunkMetadataList = ((TimeseriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); - long t2 = System.nanoTime(); + final long t2 = System.nanoTime(); List<Modification> pathModifications = context.getPathModifications(resource.getModFile(), seriesPath); @@ -103,7 +104,7 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { }); // remove not satisfied ChunkMetaData - long t3 = System.nanoTime(); + final long t3 = System.nanoTime(); chunkMetadataList.removeIf( chunkMetaData -> (filter != null diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java index c3efc79fd2c..4804fa2ae7d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java index 57dd83dd04e..92c8d14a3ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedDescPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedDescPriorityMergeReader.java index 822a3749a79..4b2498dc294 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedDescPriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedDescPriorityMergeReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.universal; import org.apache.iotdb.tsfile.read.TimeValuePair; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java index 9d5875abdee..6bc49219554 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.universal; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -38,13 +39,13 @@ public class AlignedPriorityMergeReader extends PriorityMergeReader { fillNullValueInAligned(v, c); } - static void fillNullValueInAligned(TimeValuePair v, TimeValuePair c) { - TsPrimitiveType[] vArray = v.getValue().getVector(); - TsPrimitiveType[] cArray = c.getValue().getVector(); - for (int i = 0; i < vArray.length; i++) { - if ((vArray[i] == null || vArray[i].getValue() == null) - && (cArray[i] != null && cArray[i].getValue() != null)) { - vArray[i] = cArray[i]; + static void fillNullValueInAligned(TimeValuePair to, TimeValuePair from) { + TsPrimitiveType[] toArray = to.getValue().getVector(); + TsPrimitiveType[] fromArray = from.getValue().getVector(); + for (int i = 0, size = toArray.length; i < size; i++) { + if ((toArray[i] == null || toArray[i].getValue() == null) + && (fromArray[i] != null && fromArray[i].getValue() != null)) { + toArray[i] = fromArray[i]; } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java index 08ae2c21c9c..3b49db7415c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.query.reader.universal; -import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.tsfile.read.reader.IPointReader; import java.io.IOException; @@ -38,8 +37,7 @@ public class DescPriorityMergeReader extends PriorityMergeReader { } @Override - public void addReader( - IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context) + public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime) throws IOException { if (reader.hasNextTimeValuePair()) { heap.add(new Element(reader, reader.nextTimeValuePair(), priority)); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/Element.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/Element.java index c5b3ad5caac..52c4b10ff4e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/Element.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/Element.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.universal; import org.apache.iotdb.tsfile.read.TimeValuePair; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java index 471090bb627..cbfeca45447 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java @@ -16,14 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.universal; -import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.reader.IPointReader; import java.io.IOException; -import java.util.List; import java.util.Objects; import java.util.PriorityQueue; @@ -47,21 +46,6 @@ public class PriorityMergeReader implements IPointReader { }); } - // only used in external sort, need to refactor later - public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int startPriority) - throws IOException { - heap = - new PriorityQueue<>( - (o1, o2) -> { - int timeCompare = - Long.compare(o1.timeValuePair.getTimestamp(), o2.timeValuePair.getTimestamp()); - return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority); - }); - for (IPointReader reader : prioritySeriesReaders) { - addReader(reader, startPriority++); - } - } - public void addReader(IPointReader reader, long priority) throws IOException { if (reader.hasNextTimeValuePair()) { heap.add( @@ -71,8 +55,7 @@ public class PriorityMergeReader implements IPointReader { } } - public void addReader( - IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context) + public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime) throws IOException { if (reader.hasNextTimeValuePair()) { heap.add(new Element(reader, reader.nextTimeValuePair(), priority)); @@ -116,7 +99,10 @@ public class PriorityMergeReader implements IPointReader { /** * remove all the TimeValuePair that shares the same timestamp if it's an aligned path we may need * to use those records that share the same timestamp to fill the null sub sensor value in current - * TimeValuePair + * TimeValuePair. + * + * @throws IOException while reading next value and close the Element while there is no value, + * there may throw IOException */ protected void updateHeap(TimeValuePair ret, TimeValuePair topNext) throws IOException { long topTime = ret.getTimestamp(); @@ -145,7 +131,7 @@ public class PriorityMergeReader implements IPointReader { } } - /** this method only take effect for aligned time series, so the override version */ + /** this method only take effect for aligned time series, so the override version. */ protected void fillNullValue(TimeValuePair v, TimeValuePair c) { // do nothing for non-aligned time series } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java index fd7d9d02f15..1e427a8da83 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java @@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; import java.io.OutputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -44,26 +45,26 @@ public class AlignedChunkMetadata implements IChunkMetadata { } @Override - public Statistics getStatistics() { + public Statistics<Serializable> getStatistics() { return valueChunkMetadataList.size() == 1 && valueChunkMetadataList.get(0) != null ? valueChunkMetadataList.get(0).getStatistics() : timeChunkMetadata.getStatistics(); } - public Statistics getStatistics(int index) { + public Statistics<Serializable> getStatistics(int index) { IChunkMetadata v = valueChunkMetadataList.get(index); return v == null ? null : v.getStatistics(); } - public List<Statistics> getValueStatisticsList() { - List<Statistics> valueStatisticsList = new ArrayList<>(); + public List<Statistics<Serializable>> getValueStatisticsList() { + List<Statistics<Serializable>> valueStatisticsList = new ArrayList<>(); for (IChunkMetadata v : valueChunkMetadataList) { valueStatisticsList.add(v == null ? null : v.getStatistics()); } return valueStatisticsList; } - public Statistics getTimeStatistics() { + public Statistics<Serializable> getTimeStatistics() { return timeChunkMetadata.getStatistics(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java index e2e654eeb10..dca2e25cfa2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java @@ -31,7 +31,7 @@ import java.util.List; public interface IChunkMetadata { - Statistics<? extends Serializable> getStatistics(); + Statistics<Serializable> getStatistics(); boolean isModified(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java index 27827f5c32b..b0a3600c8c6 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java @@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.series.PaginationController; import java.io.IOException; +import java.io.Serializable; import java.util.List; public interface IPageReader { @@ -38,7 +39,7 @@ public interface IPageReader { TsBlock getAllSatisfiedData() throws IOException; - Statistics getStatistics(); + Statistics<Serializable> getStatistics(); void setFilter(Filter filter); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index 17e33cf8486..5f4566c8cb9 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.read.reader.series.PaginationController; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -262,8 +263,8 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { return timePageReader.getStatistics(); } - private List<Statistics> getValueStatisticsList() { - List<Statistics> valueStatisticsList = new ArrayList<>(); + private List<Statistics<Serializable>> getValueStatisticsList() { + List<Statistics<Serializable>> valueStatisticsList = new ArrayList<>(); for (ValuePageReader v : valuePageReaderList) { valueStatisticsList.add(v == null ? null : v.getStatistics()); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index 60b74a18703..1abbf873624 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -38,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -318,7 +319,7 @@ public class PageReader implements IPageReader { } @Override - public Statistics getStatistics() { + public Statistics<Serializable> getStatistics() { return pageHeader.getStatistics(); }
