This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch IOTDB-5547
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-5547 by this push:
     new 1c6d823d57 Fix concurrent modify bug
1c6d823d57 is described below

commit 1c6d823d57ef7bdf7e4db1367bf2691eff400ccf
Author: Alima777 <[email protected]>
AuthorDate: Fri Feb 17 12:12:56 2023 +0800

    Fix concurrent modify bug
---
 .../iotdb/db/mpp/execution/driver/DataDriver.java  | 101 ++-------------------
 .../db/mpp/execution/driver/DataDriverContext.java |   3 +-
 .../fragment/FragmentInstanceContext.java          |  84 ++++++++++++++++-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   3 +-
 4 files changed, 95 insertions(+), 96 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 2153af9e74..3a4e1fdcf2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -19,20 +19,14 @@
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
-import org.apache.iotdb.db.query.control.FileReaderManager;
 
 import com.google.common.util.concurrent.SettableFuture;
-
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 
 import static 
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
 
@@ -45,15 +39,8 @@ public class DataDriver extends Driver {
 
   private boolean init;
 
-  /** closed tsfile used in this fragment instance */
-  private Set<TsFileResource> closedFilePaths;
-  /** unClosed tsfile used in this fragment instance */
-  private Set<TsFileResource> unClosedFilePaths;
-
   public DataDriver(Operator root, DriverContext driverContext) {
     super(root, driverContext);
-    this.closedFilePaths = new HashSet<>();
-    this.unClosedFilePaths = new HashSet<>();
   }
 
   @Override
@@ -72,27 +59,11 @@ public class DataDriver extends Driver {
     return true;
   }
 
-  /**
-   * All file paths used by this fragment instance must be cleared and thus 
the usage reference must
-   * be decreased.
-   */
-  @Override
-  protected void releaseResource() {
-    for (TsFileResource tsFile : closedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
true);
-    }
-    closedFilePaths = null;
-    for (TsFileResource tsFile : unClosedFilePaths) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
false);
-    }
-    unClosedFilePaths = null;
-  }
-
   /**
    * init seq file list and unseq file list in QueryDataSource and set it into 
each SourceNode TODO
    * we should change all the blocked lock operation into tryLock
    */
-  private void initialize() {
+  private void initialize() throws QueryProcessException {
     long startTime = System.nanoTime();
     try {
       List<DataSourceOperator> sourceOperators =
@@ -117,70 +88,16 @@ public class DataDriver extends Driver {
     }
   }
 
-  /**
-   * The method is called in mergeLock() when executing query. This method 
will get all the
-   * QueryDataSource needed for this query
-   */
-  private QueryDataSource initQueryDataSource() {
-    DataDriverContext context = (DataDriverContext) driverContext;
-    IDataRegionForQuery dataRegion = context.getDataRegion();
-    dataRegion.readLock();
-    try {
-
-      QueryDataSource dataSource = ((DataDriverContext) 
driverContext).getSharedQueryDataSource();
-
-      // used files should be added before mergeLock is unlocked, or they may 
be deleted by
-      // running merge
-      if (dataSource != null) {
-        addUsedFilesForQuery(dataSource);
-      }
-
-      return dataSource;
-    } finally {
-      dataRegion.readUnlock();
-    }
-  }
-
-  /** Add the unique file paths to closeddFilePathsMap and 
unClosedFilePathsMap. */
-  private void addUsedFilesForQuery(QueryDataSource dataSource) {
-
-    // sequence data
-    addUsedFilesForQuery(dataSource.getSeqResources());
-
-    // unsequence data
-    addUsedFilesForQuery(dataSource.getUnseqResources());
-  }
-
-  private void addUsedFilesForQuery(List<TsFileResource> resources) {
-    Iterator<TsFileResource> iterator = resources.iterator();
-    while (iterator.hasNext()) {
-      TsFileResource tsFileResource = iterator.next();
-      boolean isClosed = tsFileResource.isClosed();
-      addFilePathToMap(tsFileResource, isClosed);
-
-      // this file may be deleted just before we lock it
-      if (tsFileResource.isDeleted()) {
-        Set<TsFileResource> pathSet = isClosed ? closedFilePaths : 
unClosedFilePaths;
-        // This resource may be removed by other threads of this query.
-        if (pathSet.remove(tsFileResource)) {
-          
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, 
isClosed);
-        }
-        iterator.remove();
-      }
-    }
+  @Override
+  protected void releaseResource() {
+    // do nothing
   }
 
   /**
-   * Increase the usage reference of filePath of job id. Before the invoking 
of this method, <code>
-   * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
-   * sealedFilePathsMap.get(queryId)</code> or 
<code>unsealedFilePathsMap.get(queryId)</code> must
-   * not return null.
+   * The method is called in mergeLock() when executing query. This method 
will get all the
+   * QueryDataSource needed for this query
    */
-  private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
-    Set<TsFileResource> pathSet = isClosed ? closedFilePaths : 
unClosedFilePaths;
-    if (!pathSet.contains(tsFile)) {
-      pathSet.add(tsFile);
-      FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
isClosed);
-    }
+  private QueryDataSource initQueryDataSource() throws QueryProcessException {
+    return ((DataDriverContext) driverContext).getSharedQueryDataSource();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
index 8dca4dcfe5..0b0ad4332f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.driver;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 
@@ -61,7 +62,7 @@ public class DataDriverContext extends DriverContext {
     return getFragmentInstanceContext().getDataRegion();
   }
 
-  public QueryDataSource getSharedQueryDataSource() {
+  public QueryDataSource getSharedQueryDataSource() throws 
QueryProcessException {
     return getFragmentInstanceContext().getSharedQueryDataSource();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index b1921ba307..46432df1be 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.slf4j.Logger;
@@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -52,8 +55,13 @@ public class FragmentInstanceContext extends QueryContext {
 
   private IDataRegionForQuery dataRegion;
   private Filter timeFilter;
+  List<PartialPath> sourcePaths;
   // Shared by all scan operators in this fragment instance to avoid memory 
problem
   private QueryDataSource sharedQueryDataSource;
+  /** closed tsfile used in this fragment instance */
+  private Set<TsFileResource> closedFilePaths;
+  /** unClosed tsfile used in this fragment instance */
+  private Set<TsFileResource> unClosedFilePaths;
 
   private final long createNanos = System.nanoTime();
 
@@ -253,7 +261,14 @@ public class FragmentInstanceContext extends QueryContext {
     return dataRegion;
   }
 
+  public void setSourcePaths(List<PartialPath> sourcePaths) {
+    this.sourcePaths = sourcePaths;
+  }
+
   public void initQueryDataSource(List<PartialPath> sourcePaths) throws 
QueryProcessException {
+    if (sourcePaths == null) {
+      return;
+    }
     dataRegion.readLock();
     try {
       List<PartialPath> pathList = new ArrayList<>();
@@ -272,12 +287,79 @@ public class FragmentInstanceContext extends QueryContext 
{
               selectedDeviceIdSet.size() == 1 ? 
selectedDeviceIdSet.iterator().next() : null,
               this,
               timeFilter != null ? timeFilter.copy() : null);
+
+      // used files should be added before mergeLock is unlocked, or they may 
be deleted by
+      // running merge
+      if (sharedQueryDataSource != null) {
+        addUsedFilesForQuery(sharedQueryDataSource);
+      }
     } finally {
       dataRegion.readUnlock();
     }
   }
 
-  public QueryDataSource getSharedQueryDataSource() {
+  public synchronized QueryDataSource getSharedQueryDataSource() throws 
QueryProcessException {
+    if (sharedQueryDataSource == null) {
+      initQueryDataSource(sourcePaths);
+    }
     return sharedQueryDataSource;
   }
+
+  /** Add the unique file paths to closeddFilePathsMap and 
unClosedFilePathsMap. */
+  private void addUsedFilesForQuery(QueryDataSource dataSource) {
+
+    // sequence data
+    addUsedFilesForQuery(dataSource.getSeqResources());
+
+    // unsequence data
+    addUsedFilesForQuery(dataSource.getUnseqResources());
+  }
+
+  private void addUsedFilesForQuery(List<TsFileResource> resources) {
+    Iterator<TsFileResource> iterator = resources.iterator();
+    while (iterator.hasNext()) {
+      TsFileResource tsFileResource = iterator.next();
+      boolean isClosed = tsFileResource.isClosed();
+      addFilePathToMap(tsFileResource, isClosed);
+
+      // this file may be deleted just before we lock it
+      if (tsFileResource.isDeleted()) {
+        Set<TsFileResource> pathSet = isClosed ? closedFilePaths : 
unClosedFilePaths;
+        // This resource may be removed by other threads of this query.
+        if (pathSet.remove(tsFileResource)) {
+          
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, 
isClosed);
+        }
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Increase the usage reference of filePath of job id. Before the invoking 
of this method, <code>
+   * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
+   * sealedFilePathsMap.get(queryId)</code> or 
<code>unsealedFilePathsMap.get(queryId)</code> must
+   * not return null.
+   */
+  private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+    Set<TsFileResource> pathSet = isClosed ? closedFilePaths : 
unClosedFilePaths;
+    if (!pathSet.contains(tsFile)) {
+      pathSet.add(tsFile);
+      FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
isClosed);
+    }
+  }
+
+  /**
+   * All file paths used by this fragment instance must be cleared and thus 
the usage reference must
+   * be decreased.
+   */
+  protected void releaseResource() {
+    for (TsFileResource tsFile : closedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
true);
+    }
+    closedFilePaths = null;
+    for (TsFileResource tsFile : unClosedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
false);
+    }
+    unClosedFilePaths = null;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 91b3c5187f..d7b28b8cc8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -69,8 +69,7 @@ public class LocalExecutionPlanner {
 
     context.addPipelineDriverFactory(root, context.getDriverContext());
 
-    List<PartialPath> sourcePaths = collectSourcePaths(context);
-    instanceContext.initQueryDataSource(sourcePaths);
+    instanceContext.setSourcePaths(collectSourcePaths(context));
 
     // set maxBytes one SourceHandle can reserve after visiting the whole tree
     context.setMaxBytesOneHandleCanReserve();

Reply via email to