This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch IOTDB-5547
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-5547 by this push:
new 1c6d823d57 Fix concurrent modify bug
1c6d823d57 is described below
commit 1c6d823d57ef7bdf7e4db1367bf2691eff400ccf
Author: Alima777 <[email protected]>
AuthorDate: Fri Feb 17 12:12:56 2023 +0800
Fix concurrent modify bug
---
.../iotdb/db/mpp/execution/driver/DataDriver.java | 101 ++-------------------
.../db/mpp/execution/driver/DataDriverContext.java | 3 +-
.../fragment/FragmentInstanceContext.java | 84 ++++++++++++++++-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 3 +-
4 files changed, 95 insertions(+), 96 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 2153af9e74..3a4e1fdcf2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -19,20 +19,14 @@
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
-import org.apache.iotdb.db.query.control.FileReaderManager;
import com.google.common.util.concurrent.SettableFuture;
-
import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import static
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
@@ -45,15 +39,8 @@ public class DataDriver extends Driver {
private boolean init;
- /** closed tsfile used in this fragment instance */
- private Set<TsFileResource> closedFilePaths;
- /** unClosed tsfile used in this fragment instance */
- private Set<TsFileResource> unClosedFilePaths;
-
public DataDriver(Operator root, DriverContext driverContext) {
super(root, driverContext);
- this.closedFilePaths = new HashSet<>();
- this.unClosedFilePaths = new HashSet<>();
}
@Override
@@ -72,27 +59,11 @@ public class DataDriver extends Driver {
return true;
}
- /**
- * All file paths used by this fragment instance must be cleared and thus
the usage reference must
- * be decreased.
- */
- @Override
- protected void releaseResource() {
- for (TsFileResource tsFile : closedFilePaths) {
- FileReaderManager.getInstance().decreaseFileReaderReference(tsFile,
true);
- }
- closedFilePaths = null;
- for (TsFileResource tsFile : unClosedFilePaths) {
- FileReaderManager.getInstance().decreaseFileReaderReference(tsFile,
false);
- }
- unClosedFilePaths = null;
- }
-
/**
* init seq file list and unseq file list in QueryDataSource and set it into
each SourceNode TODO
* we should change all the blocked lock operation into tryLock
*/
- private void initialize() {
+ private void initialize() throws QueryProcessException {
long startTime = System.nanoTime();
try {
List<DataSourceOperator> sourceOperators =
@@ -117,70 +88,16 @@ public class DataDriver extends Driver {
}
}
- /**
- * The method is called in mergeLock() when executing query. This method
will get all the
- * QueryDataSource needed for this query
- */
- private QueryDataSource initQueryDataSource() {
- DataDriverContext context = (DataDriverContext) driverContext;
- IDataRegionForQuery dataRegion = context.getDataRegion();
- dataRegion.readLock();
- try {
-
- QueryDataSource dataSource = ((DataDriverContext)
driverContext).getSharedQueryDataSource();
-
- // used files should be added before mergeLock is unlocked, or they may
be deleted by
- // running merge
- if (dataSource != null) {
- addUsedFilesForQuery(dataSource);
- }
-
- return dataSource;
- } finally {
- dataRegion.readUnlock();
- }
- }
-
- /** Add the unique file paths to closeddFilePathsMap and
unClosedFilePathsMap. */
- private void addUsedFilesForQuery(QueryDataSource dataSource) {
-
- // sequence data
- addUsedFilesForQuery(dataSource.getSeqResources());
-
- // unsequence data
- addUsedFilesForQuery(dataSource.getUnseqResources());
- }
-
- private void addUsedFilesForQuery(List<TsFileResource> resources) {
- Iterator<TsFileResource> iterator = resources.iterator();
- while (iterator.hasNext()) {
- TsFileResource tsFileResource = iterator.next();
- boolean isClosed = tsFileResource.isClosed();
- addFilePathToMap(tsFileResource, isClosed);
-
- // this file may be deleted just before we lock it
- if (tsFileResource.isDeleted()) {
- Set<TsFileResource> pathSet = isClosed ? closedFilePaths :
unClosedFilePaths;
- // This resource may be removed by other threads of this query.
- if (pathSet.remove(tsFileResource)) {
-
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource,
isClosed);
- }
- iterator.remove();
- }
- }
+ @Override
+ protected void releaseResource() {
+ // do nothing
}
/**
- * Increase the usage reference of filePath of job id. Before the invoking
of this method, <code>
- * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
- * sealedFilePathsMap.get(queryId)</code> or
<code>unsealedFilePathsMap.get(queryId)</code> must
- * not return null.
+ * The method is called in mergeLock() when executing query. This method
will get all the
+ * QueryDataSource needed for this query
*/
- private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
- Set<TsFileResource> pathSet = isClosed ? closedFilePaths :
unClosedFilePaths;
- if (!pathSet.contains(tsFile)) {
- pathSet.add(tsFile);
- FileReaderManager.getInstance().increaseFileReaderReference(tsFile,
isClosed);
- }
+ private QueryDataSource initQueryDataSource() throws QueryProcessException {
+ return ((DataDriverContext) driverContext).getSharedQueryDataSource();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
index 8dca4dcfe5..0b0ad4332f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
@@ -61,7 +62,7 @@ public class DataDriverContext extends DriverContext {
return getFragmentInstanceContext().getDataRegion();
}
- public QueryDataSource getSharedQueryDataSource() {
+ public QueryDataSource getSharedQueryDataSource() throws
QueryProcessException {
return getFragmentInstanceContext().getSharedQueryDataSource();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index b1921ba307..46432df1be 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.slf4j.Logger;
@@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -52,8 +55,13 @@ public class FragmentInstanceContext extends QueryContext {
private IDataRegionForQuery dataRegion;
private Filter timeFilter;
+ List<PartialPath> sourcePaths;
// Shared by all scan operators in this fragment instance to avoid memory
problem
private QueryDataSource sharedQueryDataSource;
+ /** closed tsfile used in this fragment instance */
+ private Set<TsFileResource> closedFilePaths;
+ /** unClosed tsfile used in this fragment instance */
+ private Set<TsFileResource> unClosedFilePaths;
private final long createNanos = System.nanoTime();
@@ -253,7 +261,14 @@ public class FragmentInstanceContext extends QueryContext {
return dataRegion;
}
+ public void setSourcePaths(List<PartialPath> sourcePaths) {
+ this.sourcePaths = sourcePaths;
+ }
+
public void initQueryDataSource(List<PartialPath> sourcePaths) throws
QueryProcessException {
+ if (sourcePaths == null) {
+ return;
+ }
dataRegion.readLock();
try {
List<PartialPath> pathList = new ArrayList<>();
@@ -272,12 +287,79 @@ public class FragmentInstanceContext extends QueryContext
{
selectedDeviceIdSet.size() == 1 ?
selectedDeviceIdSet.iterator().next() : null,
this,
timeFilter != null ? timeFilter.copy() : null);
+
+ // used files should be added before mergeLock is unlocked, or they may
be deleted by
+ // running merge
+ if (sharedQueryDataSource != null) {
+ addUsedFilesForQuery(sharedQueryDataSource);
+ }
} finally {
dataRegion.readUnlock();
}
}
- public QueryDataSource getSharedQueryDataSource() {
+ public synchronized QueryDataSource getSharedQueryDataSource() throws
QueryProcessException {
+ if (sharedQueryDataSource == null) {
+ initQueryDataSource(sourcePaths);
+ }
return sharedQueryDataSource;
}
+
+ /** Add the unique file paths to closeddFilePathsMap and
unClosedFilePathsMap. */
+ private void addUsedFilesForQuery(QueryDataSource dataSource) {
+
+ // sequence data
+ addUsedFilesForQuery(dataSource.getSeqResources());
+
+ // unsequence data
+ addUsedFilesForQuery(dataSource.getUnseqResources());
+ }
+
+ private void addUsedFilesForQuery(List<TsFileResource> resources) {
+ Iterator<TsFileResource> iterator = resources.iterator();
+ while (iterator.hasNext()) {
+ TsFileResource tsFileResource = iterator.next();
+ boolean isClosed = tsFileResource.isClosed();
+ addFilePathToMap(tsFileResource, isClosed);
+
+ // this file may be deleted just before we lock it
+ if (tsFileResource.isDeleted()) {
+ Set<TsFileResource> pathSet = isClosed ? closedFilePaths :
unClosedFilePaths;
+ // This resource may be removed by other threads of this query.
+ if (pathSet.remove(tsFileResource)) {
+
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource,
isClosed);
+ }
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Increase the usage reference of filePath of job id. Before the invoking
of this method, <code>
+ * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
+ * sealedFilePathsMap.get(queryId)</code> or
<code>unsealedFilePathsMap.get(queryId)</code> must
+ * not return null.
+ */
+ private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+ Set<TsFileResource> pathSet = isClosed ? closedFilePaths :
unClosedFilePaths;
+ if (!pathSet.contains(tsFile)) {
+ pathSet.add(tsFile);
+ FileReaderManager.getInstance().increaseFileReaderReference(tsFile,
isClosed);
+ }
+ }
+
+ /**
+ * All file paths used by this fragment instance must be cleared and thus
the usage reference must
+ * be decreased.
+ */
+ protected void releaseResource() {
+ for (TsFileResource tsFile : closedFilePaths) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile,
true);
+ }
+ closedFilePaths = null;
+ for (TsFileResource tsFile : unClosedFilePaths) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFile,
false);
+ }
+ unClosedFilePaths = null;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 91b3c5187f..d7b28b8cc8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -69,8 +69,7 @@ public class LocalExecutionPlanner {
context.addPipelineDriverFactory(root, context.getDriverContext());
- List<PartialPath> sourcePaths = collectSourcePaths(context);
- instanceContext.initQueryDataSource(sourcePaths);
+ instanceContext.setSourcePaths(collectSourcePaths(context));
// set maxBytes one SourceHandle can reserve after visiting the whole tree
context.setMaxBytesOneHandleCanReserve();