This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch stable-mpp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cdc3846852cca337ed8918aeb85e07fcca1a37e1 Author: JackieTien97 <[email protected]> AuthorDate: Tue May 3 15:33:16 2022 +0800 Return Option<TsBlock> instead of TsBlock to let caller clearly be aware of that the method will return null --- .../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 7 ++++--- .../db/mpp/plan/execution/IQueryExecution.java | 4 +++- .../db/mpp/plan/execution/QueryExecution.java | 9 +++++---- .../mpp/plan/execution/config/ConfigExecution.java | 7 ++++--- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 6 ++++-- .../db/mpp/execution/ConfigExecutionTest.java | 23 +++++++++++----------- 6 files changed, 32 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java index 30cba02443..781e01eb50 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java @@ -41,6 +41,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class ClusterSchemaFetcher implements ISchemaFetcher { @@ -79,14 +80,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } SchemaTree result = new SchemaTree(); while (coordinator.getQueryExecution(queryId).hasNextResult()) { - TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); - if (tsBlock == null) { + Optional<TsBlock> tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); + if (!tsBlock.isPresent()) { break; } result.setStorageGroups(storageGroups); Binary binary; SchemaTree fetchedSchemaTree; - Column column = tsBlock.getColumn(0); + Column column = tsBlock.get().getColumn(0); for (int i = 0; i < column.getPositionCount(); i++) { binary = column.getBinary(i); fetchedSchemaTree = SchemaTree.deserialize(ByteBuffer.wrap(binary.getValues())); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java index ad21e71531..49434afec5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java @@ -22,6 +22,8 @@ package org.apache.iotdb.db.mpp.plan.execution; import org.apache.iotdb.db.mpp.common.header.DatasetHeader; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import java.util.Optional; + public interface IQueryExecution { void start(); @@ -32,7 +34,7 @@ public interface IQueryExecution { ExecutionResult getStatus(); - TsBlock getBatchResult(); + Optional<TsBlock> getBatchResult(); boolean hasNextResult(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 776a75d58e..4ba48805bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -221,18 +222,18 @@ public class QueryExecution implements IQueryExecution { * implemented with DataStreamManager) */ @Override - public TsBlock getBatchResult() { + public Optional<TsBlock> getBatchResult() { try { if (resultHandle.isAborted() || resultHandle.isFinished()) { - return null; + return Optional.empty(); } ListenableFuture<Void> blocked = resultHandle.isBlocked(); blocked.get(); if (resultHandle.isFinished()) { releaseResource(); - return null; + return Optional.empty(); } - return resultHandle.receive(); + return Optional.of(resultHandle.receive()); } catch (ExecutionException | CancellationException e) { stateMachine.transitionToFailed(e); throwIfUnchecked(e.getCause()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java index eccb8e19d6..907b7ff122 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture; import jersey.repackaged.com.google.common.util.concurrent.SettableFuture; import org.jetbrains.annotations.NotNull; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -129,12 +130,12 @@ public class ConfigExecution implements IQueryExecution { } @Override - public TsBlock getBatchResult() { + public Optional<TsBlock> getBatchResult() { if (!resultSetConsumed) { resultSetConsumed = true; - return resultSet; + return Optional.of(resultSet); } - return null; + return Optional.empty(); } // According to the execution process of ConfigExecution, there is only one TsBlock for diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index a50bc92564..3b8d2bd608 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; +import java.util.Optional; /** TimeValuePairUtils to convert between thrift format and TsFile format. */ public class QueryDataSetUtils { @@ -196,10 +197,11 @@ public class QueryDataSetUtils { // used to record a bitmap for every 8 points int[] bitmaps = new int[columnNum]; while (rowCount < fetchSize) { - TsBlock tsBlock = queryExecution.getBatchResult(); - if (tsBlock == null) { + Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); + if (!optionalTsBlock.isPresent()) { break; } + TsBlock tsBlock = optionalTsBlock.get(); int currentCount = tsBlock.getPositionCount(); // serialize time column for (int i = 0; i < currentCount; i++) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java index 306bb21d07..e51e324290 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java @@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -45,6 +44,8 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import static com.google.common.util.concurrent.Futures.immediateFuture; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ConfigExecutionTest { @@ -55,7 +56,7 @@ public class ConfigExecutionTest { new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task); execution.start(); ExecutionResult result = execution.getStatus(); - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code); } @Test @@ -77,10 +78,12 @@ public class ConfigExecutionTest { ExecutionResult result = execution.getStatus(); TsBlock tsBlockFromExecution = null; if (execution.hasNextResult()) { - tsBlockFromExecution = execution.getBatchResult(); + Optional<TsBlock> optionalTsBlock = execution.getBatchResult(); + assertTrue(optionalTsBlock.isPresent()); + tsBlockFromExecution = optionalTsBlock.get(); } - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code); - Assert.assertEquals(tsBlock, tsBlockFromExecution); + assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code); + assertEquals(tsBlock, tsBlockFromExecution); } @Test @@ -93,7 +96,7 @@ public class ConfigExecutionTest { new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task); execution.start(); ExecutionResult result = execution.getStatus(); - Assert.assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code); + assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code); } @Test @@ -120,8 +123,7 @@ public class ConfigExecutionTest { new Thread( () -> { ExecutionResult result = execution.getStatus(); - Assert.assertEquals( - TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); + assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); }); resultThread.start(); taskResult.cancel(true); @@ -140,8 +142,7 @@ public class ConfigExecutionTest { new Thread( () -> { ExecutionResult result = execution.getStatus(); - Assert.assertEquals( - TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); + assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); }); resultThread.start(); execution.start(); @@ -154,7 +155,7 @@ public class ConfigExecutionTest { // Assert.fail("InterruptedException should be threw here"); } catch (InterruptedException e) { ExecutionResult result = execution.getStatus(); - Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); + assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code); execution.stop(); } }
