This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/config-exe-result in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7de6a157ba19203d6715c4885b8bcc184571b960 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Apr 20 16:51:18 2022 +0800 add TsBlock as the returned result of IConfigTask --- .../mpp/execution/config/AuthorizerConfigTask.java | 6 ++-- .../db/mpp/execution/config/ConfigExecution.java | 33 +++++++++--------- ...SampleConfigTask.java => ConfigTaskResult.java} | 38 ++++++++++++--------- .../iotdb/db/mpp/execution/config/IConfigTask.java | 2 +- .../mpp/execution/config/SetStorageGroupTask.java | 6 ++-- .../db/mpp/execution/ConfigExecutionTest.java | 39 ++++++++++++++++++---- 6 files changed, 79 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java index 9b27ad3b9d..8cc891bf38 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java @@ -48,8 +48,8 @@ public class AuthorizerConfigTask implements IConfigTask { } @Override - public ListenableFuture<Void> execute() { - SettableFuture<Void> future = SettableFuture.create(); + public ListenableFuture<ConfigTaskResult> execute() { + SettableFuture<ConfigTaskResult> future = SettableFuture.create(); ConfigNodeClient configNodeClient = null; try { // Construct request using statement @@ -75,7 +75,7 @@ public class AuthorizerConfigTask implements IConfigTask { tsStatus); future.setException(new StatementExecutionException(tsStatus)); } else { - future.set(null); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } } catch (IoTDBConnectionException | BadNodeUrlException e) { LOGGER.error("Failed to connect to config node."); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java index fe63991dfc..b13d3006c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java @@ -47,7 +47,8 @@ public class ConfigExecution implements IQueryExecution { private final ExecutorService executor; private final QueryStateMachine stateMachine; - private final SettableFuture<Boolean> result; + private final SettableFuture<ConfigTaskResult> taskFuture; + private TsBlock resultSet; private final IConfigTask task; @@ -56,7 +57,7 @@ public class ConfigExecution implements IQueryExecution { this.statement = statement; this.executor = executor; this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); - this.result = SettableFuture.create(); + this.taskFuture = SettableFuture.create(); this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext()); } @@ -67,21 +68,21 @@ public class ConfigExecution implements IQueryExecution { this.statement = statement; this.executor = executor; this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); - this.result = SettableFuture.create(); + this.taskFuture = SettableFuture.create(); this.task = task; } @Override public void start() { try { - ListenableFuture<Void> future = task.execute(); + ListenableFuture<ConfigTaskResult> future = task.execute(); Futures.addCallback( future, - new FutureCallback<Void>() { + new FutureCallback<ConfigTaskResult>() { @Override - public void onSuccess(Void success) { + public void onSuccess(ConfigTaskResult taskRet) { stateMachine.transitionToFinished(); - result.set(true); + taskFuture.set(taskRet); } @Override @@ -98,7 +99,7 @@ public class ConfigExecution implements IQueryExecution { public void fail(Throwable cause) { stateMachine.transitionToFailed(cause); - result.set(false); + taskFuture.set(new ConfigTaskResult(TSStatusCode.INTERNAL_SERVER_ERROR)); } @Override @@ -110,10 +111,11 @@ public class ConfigExecution implements IQueryExecution { @Override public ExecutionResult getStatus() { try { - Boolean success = result.get(); - TSStatusCode statusCode = - success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR; - String message = success ? "" : stateMachine.getFailureMessage(); + ConfigTaskResult taskResult = taskFuture.get(); + TSStatusCode statusCode = taskResult.getStatusCode(); + resultSet = taskResult.getResultSet(); + String message = + statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage(); return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message)); } catch (InterruptedException | ExecutionException e) { Thread.currentThread().interrupt(); @@ -125,13 +127,14 @@ public class ConfigExecution implements IQueryExecution { @Override public TsBlock getBatchResult() { - // TODO - return null; + return resultSet; } + // According to the execution process of ConfigExecution. When the hasNextResult() is invoked, + // the getStatus() is already be invoked. So we always return true here. @Override public boolean hasNextResult() { - return false; + return true; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java similarity index 54% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java index 408b8cd448..7a8248b50e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java @@ -19,29 +19,35 @@ package org.apache.iotdb.db.mpp.execution.config; -import org.apache.iotdb.db.mpp.sql.statement.Statement; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +public class ConfigTaskResult { + private TSStatusCode statusCode; + private TsBlock resultSet; -public class SampleConfigTask implements IConfigTask { - - private Statement statement; + public ConfigTaskResult(TSStatusCode statusCode) { + this.statusCode = statusCode; + } - public SampleConfigTask(Statement statement) { - this.statement = statement; + public ConfigTaskResult(TSStatusCode statusCode, TsBlock resultSet) { + this.statusCode = statusCode; + this.resultSet = resultSet; } - @Override - public ListenableFuture<Void> execute() { - // Construct request using statement + public TSStatusCode getStatusCode() { + return statusCode; + } - // Send request to some API server + public void setStatusCode(TSStatusCode statusCode) { + this.statusCode = statusCode; + } - // Get response or throw exception + public TsBlock getResultSet() { + return resultSet; + } - // If the action is executed successfully, return the Future. - // If your operation is async, you can return the corresponding future directly. - return Futures.immediateVoidFuture(); + public void setResultSet(TsBlock resultSet) { + this.resultSet = resultSet; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java index fac0575fc3..a9c506d256 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java @@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config; import com.google.common.util.concurrent.ListenableFuture; public interface IConfigTask { - ListenableFuture<Void> execute() throws InterruptedException; + ListenableFuture<ConfigTaskResult> execute() throws InterruptedException; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java index cdfd6376cd..934d671e27 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java @@ -42,8 +42,8 @@ public class SetStorageGroupTask implements IConfigTask { } @Override - public ListenableFuture<Void> execute() { - SettableFuture<Void> future = SettableFuture.create(); + public ListenableFuture<ConfigTaskResult> execute() { + SettableFuture<ConfigTaskResult> future = SettableFuture.create(); // Construct request using statement TSetStorageGroupReq req = new TSetStorageGroupReq(setStorageGroupStatement.getStorageGroupPath().getFullPath()); @@ -61,7 +61,7 @@ public class SetStorageGroupTask implements IConfigTask { tsStatus); future.setException(new StatementExecutionException(tsStatus)); } else { - future.set(null); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } } catch (IoTDBConnectionException | BadNodeUrlException e) { LOGGER.error("Failed to connect to config node."); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java index 7ee608d7a0..9a1c36beb5 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java @@ -23,15 +23,20 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.config.ConfigExecution; +import org.apache.iotdb.db.mpp.execution.config.ConfigTaskResult; import org.apache.iotdb.db.mpp.execution.config.IConfigTask; import org.apache.iotdb.db.mpp.sql.analyze.QueryType; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.junit.Assert; import org.junit.Test; +import java.util.Optional; import java.util.concurrent.ExecutorService; import static com.google.common.util.concurrent.Futures.immediateFuture; @@ -40,7 +45,7 @@ public class ConfigExecutionTest { @Test public void normalConfigTaskTest() { - IConfigTask task = () -> immediateFuture(null); + IConfigTask task = () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task); execution.start(); @@ -48,6 +53,26 @@ public class ConfigExecutionTest { Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code); } + @Test + public void normalConfigTaskWithResultTest() { + TsBlock tsBlock = + new TsBlock( + new TimeColumn(1, new long[] {0}), + new IntColumn(1, Optional.of(new boolean[] {false}), new int[] {1})); + IConfigTask task = + () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock)); + ConfigExecution execution = + new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task); + execution.start(); + ExecutionResult result = execution.getStatus(); + TsBlock tsBlockFromExecution = null; + if (execution.hasNextResult()) { + tsBlockFromExecution = execution.getBatchResult(); + } + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code); + Assert.assertEquals(tsBlock, tsBlockFromExecution); + } + @Test public void exceptionConfigTaskTest() { IConfigTask task = @@ -63,16 +88,16 @@ public class ConfigExecutionTest { @Test public void configTaskCancelledTest() throws InterruptedException { - SettableFuture<Void> taskResult = SettableFuture.create(); + SettableFuture<ConfigTaskResult> taskResult = SettableFuture.create(); class SimpleTask implements IConfigTask { - private final ListenableFuture<Void> result; + private final ListenableFuture<ConfigTaskResult> result; - public SimpleTask(ListenableFuture<Void> future) { + public SimpleTask(ListenableFuture<ConfigTaskResult> future) { this.result = future; } @Override - public ListenableFuture<Void> execute() throws InterruptedException { + public ListenableFuture<ConfigTaskResult> execute() throws InterruptedException { return result; } } @@ -86,7 +111,7 @@ public class ConfigExecutionTest { () -> { ExecutionResult result = execution.getStatus(); Assert.assertEquals( - TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code); + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); }); resultThread.start(); taskResult.cancel(true); @@ -106,7 +131,7 @@ public class ConfigExecutionTest { () -> { ExecutionResult result = execution.getStatus(); Assert.assertEquals( - TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code); + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); }); resultThread.start(); execution.start();
