This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch tsbs in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 589f7eb6fbe5262c3bc380069e7456764982901c Author: JackieTien97 <[email protected]> AuthorDate: Sun Dec 18 09:48:24 2022 +0800 remove modification and async dispatch --- .../scheduler/FragmentInstanceDispatcherImpl.java | 31 ++++++++-------- .../iotdb/db/query/context/QueryContext.java | 41 +++++++++++----------- .../metadata/DiskAlignedChunkMetadataLoader.java | 28 +++++++-------- .../chunk/metadata/DiskChunkMetadataLoader.java | 30 ++++++++-------- 4 files changed, 63 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index 1ba07ccb79..711beb58e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -95,22 +95,21 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { // unsafe for current FragmentInstance scheduler framework. We need to implement the // topological dispatch according to dependency relations between FragmentInstances private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) { - return executor.submit( - () -> { - for (FragmentInstance instance : instances) { - try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { - dispatchOneInstance(instance); - } catch (FragmentInstanceDispatchException e) { - return new FragInstanceDispatchResult(e.getFailureStatus()); - } catch (Throwable t) { - logger.warn("[DispatchFailed]", t); - return new FragInstanceDispatchResult( - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())); - } - } - return new FragInstanceDispatchResult(true); - }); + + for (FragmentInstance instance : instances) { + try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { + dispatchOneInstance(instance); + } catch (FragmentInstanceDispatchException e) { + return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); + } catch (Throwable t) { + logger.warn("[DispatchFailed]", t); + return immediateFuture( + new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()))); + } + } + return immediateFuture(new FragInstanceDispatchResult(true)); } private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java index 60cffd8226..33ebb5c7a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java +++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory; import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory.ModsSerializer; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; @@ -94,26 +93,28 @@ public class QueryContext { * them from 'modFile' and put then into the cache. */ public List<Modification> getPathModifications(ModificationFile modFile, PartialPath path) { + return Collections.emptyList(); // if the mods file does not exist, do not add it to the cache - if (!modFile.exists()) { - return Collections.emptyList(); - } - Map<String, List<Modification>> fileModifications = - filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new ConcurrentHashMap<>()); - return fileModifications.computeIfAbsent( - path.getFullPath(), - k -> { - PatternTreeMap<Modification, ModsSerializer> allModifications = - fileModCache.get(modFile.getFilePath()); - if (allModifications == null) { - allModifications = PatternTreeMapFactory.getModsPatternTreeMap(); - for (Modification modification : modFile.getModifications()) { - allModifications.append(modification.getPath(), modification); - } - fileModCache.put(modFile.getFilePath(), allModifications); - } - return sortAndMerge(allModifications.getOverlapped(path)); - }); + // if (!modFile.exists()) { + // return Collections.emptyList(); + // } + // Map<String, List<Modification>> fileModifications = + // filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new + // ConcurrentHashMap<>()); + // return fileModifications.computeIfAbsent( + // path.getFullPath(), + // k -> { + // PatternTreeMap<Modification, ModsSerializer> allModifications = + // fileModCache.get(modFile.getFilePath()); + // if (allModifications == null) { + // allModifications = PatternTreeMapFactory.getModsPatternTreeMap(); + // for (Modification modification : modFile.getModifications()) { + // allModifications.append(modification.getPath(), modification); + // } + // fileModCache.put(modFile.getFilePath(), allModifications); + // } + // return sortAndMerge(allModifications.getOverlapped(path)); + // }); } private List<Modification> sortAndMerge(List<Modification> modifications) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java index 58d4a1c98a..0165630e1c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java @@ -19,11 +19,9 @@ package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.AlignedPath; -import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.DiskAlignedChunkLoader; -import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; @@ -61,19 +59,19 @@ public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { ((AlignedTimeSeriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); // get all sub sensors' modifications - List<List<Modification>> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); - - if (context.isDebug()) { - DEBUG_LOGGER.info( - "Modifications size is {} for file Path: {} ", - pathModifications.size(), - resource.getTsFilePath()); - pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } - - // remove ChunkMetadata that have been deleted - QueryUtils.modifyAlignedChunkMetaData(alignedChunkMetadataList, pathModifications); + // List<List<Modification>> pathModifications = + // context.getPathModifications(resource.getModFile(), seriesPath); + // + // if (context.isDebug()) { + // DEBUG_LOGGER.info( + // "Modifications size is {} for file Path: {} ", + // pathModifications.size(), + // resource.getTsFilePath()); + // pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); + // } + // + // // remove ChunkMetadata that have been deleted + // QueryUtils.modifyAlignedChunkMetaData(alignedChunkMetadataList, pathModifications); if (context.isDebug()) { DEBUG_LOGGER.info("After modification Chunk meta data list is: "); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java index 7b8856ec69..b913308e28 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java @@ -19,11 +19,9 @@ package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.DiskChunkLoader; -import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; @@ -59,20 +57,20 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { List<IChunkMetadata> chunkMetadataList = ((TimeseriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); - List<Modification> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); - - if (context.isDebug()) { - DEBUG_LOGGER.info( - "Modifications size is {} for file Path: {} ", - pathModifications.size(), - resource.getTsFilePath()); - pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } - - if (!pathModifications.isEmpty()) { - QueryUtils.modifyChunkMetaData(chunkMetadataList, pathModifications); - } + // List<Modification> pathModifications = + // context.getPathModifications(resource.getModFile(), seriesPath); + // + // if (context.isDebug()) { + // DEBUG_LOGGER.info( + // "Modifications size is {} for file Path: {} ", + // pathModifications.size(), + // resource.getTsFilePath()); + // pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); + // } + // + // if (!pathModifications.isEmpty()) { + // QueryUtils.modifyChunkMetaData(chunkMetadataList, pathModifications); + // } if (context.isDebug()) { DEBUG_LOGGER.info("After modification Chunk meta data list is: ");
