This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2868432447cf31928acbb54c7e454a9ee08dd475 Author: Alima777 <[email protected]> AuthorDate: Sat Feb 11 20:00:27 2023 +0800 Fix DriverSchedulerTest --- .../main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 7 +------ .../java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java | 2 ++ .../apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java | 5 ++--- .../apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java | 5 +++++ .../iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java | 4 ++++ 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java index c580ab047e..71fe7ccea2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java @@ -29,11 +29,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.Duration; +import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.GuardedBy; - import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -105,10 +104,6 @@ public abstract class Driver implements IDriver { /** release resource this driver used */ protected abstract void releaseResource(); - public boolean hasDependency() { - return driverContext.getDependencyDriverIndex() != -1; - } - public int getDependencyDriverIndex() { return driverContext.getDependencyDriverIndex(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java index 8a55d098de..ff55d5456c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java @@ -71,4 +71,6 @@ public interface IDriver { /** @return get SinkHandle of current IDriver */ ISinkHandle getSinkHandle(); + + int getDependencyDriverIndex(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index ef191ec5cb..7be42a9859 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.QueryId; -import org.apache.iotdb.db.mpp.execution.driver.Driver; import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService; @@ -189,8 +188,8 @@ public class DriverScheduler implements IDriverScheduler, IService { List<DriverTask> submittedTasks = new ArrayList<>(); for (DriverTask task : tasks) { - Driver driver = (Driver) task.getDriver(); - if (driver.hasDependency()) { + IDriver driver = task.getDriver(); + if (driver.getDependencyDriverIndex() != -1) { SettableFuture<?> blockedDependencyFuture = tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver(); blockedDependencyFuture.addListener( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java index f692451cd1..66ce5ff677 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java @@ -267,5 +267,10 @@ public class DriverTask implements IDIndexedAccessible { public ISinkHandle getSinkHandle() { return null; } + + @Override + public int getDependencyDriverIndex() { + return -1; + } } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java index 7e3612e499..ff8d0fdebf 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java @@ -63,10 +63,12 @@ public class DriverSchedulerTest { DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0); IDriver mockDriver1 = Mockito.mock(IDriver.class); Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1); + Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1); FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1"); DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0); IDriver mockDriver2 = Mockito.mock(IDriver.class); Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2); + Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1); List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2); manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); @@ -91,6 +93,7 @@ public class DriverSchedulerTest { FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2"); DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0); Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3); + Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1); manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); Assert.assertEquals(1, manager.getQueryMap().size()); @@ -111,6 +114,7 @@ public class DriverSchedulerTest { DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0); IDriver mockDriver4 = Mockito.mock(IDriver.class); Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4); + Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1); manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); Assert.assertEquals(2, manager.getQueryMap().size());
