This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d30aa2022 Fix a memory free bug and delete useless O(N^2) check in
DriverContext
6d30aa2022 is described below
commit 6d30aa202235e438e26788d976c476e34e557e3f
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Apr 6 08:57:36 2023 +0800
Fix a memory free bug and delete useless O(N^2) check in DriverContext
---
.../iotdb/db/mpp/common/FragmentInstanceId.java | 2 +-
.../db/mpp/execution/driver/DriverContext.java | 10 ----
.../execution/exchange/MPPDataExchangeManager.java | 57 +++++++++++++---------
3 files changed, 36 insertions(+), 33 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 24164c4e89..1d5ca26b22 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -58,7 +58,7 @@ public class FragmentInstanceId {
}
public String getFragmentInstanceId() {
- return fragmentId + "." + instanceId;
+ return fragmentId.getId() + "." + instanceId;
}
public String toString() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 58a131545f..9b0ae75f28 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -29,8 +29,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import static com.google.common.base.Preconditions.checkArgument;
-
public class DriverContext {
private boolean inputDriver = true;
@@ -51,14 +49,6 @@ public class DriverContext {
public OperatorContext addOperatorContext(
int operatorId, PlanNodeId planNodeId, String operatorType) {
- checkArgument(operatorId >= 0, "operatorId is negative");
-
- for (OperatorContext operatorContext : operatorContexts) {
- checkArgument(
- operatorId != operatorContext.getOperatorId(),
- "A context already exists for operatorId %s",
- operatorId);
- }
OperatorContext operatorContext =
new OperatorContext(operatorId, planNodeId, operatorType, this);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 9a58904c79..73729fe0ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -485,11 +485,13 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
// FragmentInstanceContext
FragmentInstanceContext instanceContext) {
- LOGGER.debug(
- "Create local sink handle to plan node {} of {} for {}",
- remotePlanNodeId,
- remoteFragmentInstanceId,
- localFragmentInstanceId);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Create local sink handle to plan node {} of {} for {}",
+ remotePlanNodeId,
+ remoteFragmentInstanceId,
+ localFragmentInstanceId);
+ }
SharedTsBlockQueue queue;
Map<String, ISourceHandle> sourceHandleMap =
sourceHandles.get(remoteFragmentInstanceId);
@@ -515,7 +517,9 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
*/
public ISinkChannel createLocalSinkChannelForPipeline(
DriverContext driverContext, String planNodeId) {
- LOGGER.debug("Create local sink handle for {}",
driverContext.getDriverTaskID());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Create local sink handle for {}",
driverContext.getDriverTaskID());
+ }
SharedTsBlockQueue queue =
new SharedTsBlockQueue(
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
@@ -537,11 +541,13 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
// FragmentInstanceContext
FragmentInstanceContext instanceContext) {
- LOGGER.debug(
- "Create sink handle to plan node {} of {} for {}",
- remotePlanNodeId,
- remoteFragmentInstanceId,
- localFragmentInstanceId);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Create sink handle to plan node {} of {} for {}",
+ remotePlanNodeId,
+ remoteFragmentInstanceId,
+ localFragmentInstanceId);
+ }
return new SinkChannel(
remoteEndpoint,
@@ -623,7 +629,9 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
*/
public ISourceHandle createLocalSourceHandleForPipeline(
SharedTsBlockQueue queue, DriverContext context) {
- LOGGER.debug("Create local source handle for {}",
context.getDriverTaskID());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Create local source handle for {}",
context.getDriverTaskID());
+ }
return new LocalSourceHandle(
queue,
new PipelineSourceHandleListenerImpl(context::failed),
@@ -647,11 +655,14 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
+ " exists.");
}
- LOGGER.debug(
- "Create local source handle from {} for plan node {} of {}",
- remoteFragmentInstanceId,
- localPlanNodeId,
- localFragmentInstanceId);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Create local source handle from {} for plan node {} of {}",
+ remoteFragmentInstanceId,
+ localPlanNodeId,
+ localFragmentInstanceId);
+ }
+
SharedTsBlockQueue queue;
ISinkHandle sinkHandle = shuffleSinkHandles.get(remoteFragmentInstanceId);
if (sinkHandle != null) {
@@ -692,11 +703,13 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
+ " exists.");
}
- LOGGER.debug(
- "Create source handle from {} for plan node {} of {}",
- remoteFragmentInstanceId,
- localPlanNodeId,
- localFragmentInstanceId);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Create source handle from {} for plan node {} of {}",
+ remoteFragmentInstanceId,
+ localPlanNodeId,
+ localFragmentInstanceId);
+ }
SourceHandle sourceHandle =
new SourceHandle(