This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 41a33ffcad Return Option<TsBlock> instead of TsBlock to let caller
clearly be aware of that the method will return null (#5777)
41a33ffcad is described below
commit 41a33ffcad099cd5f5a4b6f56ac242ace18f10f8
Author: Jackie Tien <[email protected]>
AuthorDate: Thu May 5 12:01:09 2022 +0800
Return Option<TsBlock> instead of TsBlock to let caller clearly be aware of
that the method will return null (#5777)
---
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 7 ++++---
.../db/mpp/plan/execution/IQueryExecution.java | 4 +++-
.../db/mpp/plan/execution/QueryExecution.java | 9 +++++----
.../mpp/plan/execution/config/ConfigExecution.java | 7 ++++---
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 6 ++++--
.../db/mpp/execution/ConfigExecutionTest.java | 23 +++++++++++-----------
6 files changed, 32 insertions(+), 24 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 256b53779e..e9de9a302e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class ClusterSchemaFetcher implements ISchemaFetcher {
@@ -79,14 +80,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
}
SchemaTree result = new SchemaTree();
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
- TsBlock tsBlock =
coordinator.getQueryExecution(queryId).getBatchResult();
- if (tsBlock == null) {
+ Optional<TsBlock> tsBlock =
coordinator.getQueryExecution(queryId).getBatchResult();
+ if (!tsBlock.isPresent()) {
break;
}
result.setStorageGroups(storageGroups);
Binary binary;
SchemaTree fetchedSchemaTree;
- Column column = tsBlock.getColumn(0);
+ Column column = tsBlock.get().getColumn(0);
for (int i = 0; i < column.getPositionCount(); i++) {
binary = column.getBinary(i);
fetchedSchemaTree =
SchemaTree.deserialize(ByteBuffer.wrap(binary.getValues()));
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index ad21e71531..49434afec5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.mpp.plan.execution;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public interface IQueryExecution {
void start();
@@ -32,7 +34,7 @@ public interface IQueryExecution {
ExecutionResult getStatus();
- TsBlock getBatchResult();
+ Optional<TsBlock> getBatchResult();
boolean hasNextResult();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index dce5dccc71..763ca26a82 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -229,18 +230,18 @@ public class QueryExecution implements IQueryExecution {
* implemented with DataStreamManager)
*/
@Override
- public TsBlock getBatchResult() {
+ public Optional<TsBlock> getBatchResult() {
try {
if (resultHandle == null || resultHandle.isAborted() ||
resultHandle.isFinished()) {
- return null;
+ return Optional.empty();
}
ListenableFuture<Void> blocked = resultHandle.isBlocked();
blocked.get();
if (resultHandle.isFinished()) {
releaseResource();
- return null;
+ return Optional.empty();
}
- return resultHandle.receive();
+ return Optional.of(resultHandle.receive());
} catch (ExecutionException | CancellationException e) {
stateMachine.transitionToFailed(e);
throwIfUnchecked(e.getCause());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index eccb8e19d6..907b7ff122 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import jersey.repackaged.com.google.common.util.concurrent.SettableFuture;
import org.jetbrains.annotations.NotNull;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -129,12 +130,12 @@ public class ConfigExecution implements IQueryExecution {
}
@Override
- public TsBlock getBatchResult() {
+ public Optional<TsBlock> getBatchResult() {
if (!resultSetConsumed) {
resultSetConsumed = true;
- return resultSet;
+ return Optional.of(resultSet);
}
- return null;
+ return Optional.empty();
}
// According to the execution process of ConfigExecution, there is only one
TsBlock for
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index a50bc92564..3b8d2bd608 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -39,6 +39,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
/** TimeValuePairUtils to convert between thrift format and TsFile format. */
public class QueryDataSetUtils {
@@ -196,10 +197,11 @@ public class QueryDataSetUtils {
// used to record a bitmap for every 8 points
int[] bitmaps = new int[columnNum];
while (rowCount < fetchSize) {
- TsBlock tsBlock = queryExecution.getBatchResult();
- if (tsBlock == null) {
+ Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent()) {
break;
}
+ TsBlock tsBlock = optionalTsBlock.get();
int currentCount = tsBlock.getPositionCount();
// serialize time column
for (int i = 0; i < currentCount; i++) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 306bb21d07..e51e324290 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -37,7 +37,6 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
@@ -45,6 +44,8 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class ConfigExecutionTest {
@@ -55,7 +56,7 @@ public class ConfigExecutionTest {
new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
result.status.code);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
result.status.code);
}
@Test
@@ -77,10 +78,12 @@ public class ConfigExecutionTest {
ExecutionResult result = execution.getStatus();
TsBlock tsBlockFromExecution = null;
if (execution.hasNextResult()) {
- tsBlockFromExecution = execution.getBatchResult();
+ Optional<TsBlock> optionalTsBlock = execution.getBatchResult();
+ assertTrue(optionalTsBlock.isPresent());
+ tsBlockFromExecution = optionalTsBlock.get();
}
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
result.status.code);
- Assert.assertEquals(tsBlock, tsBlockFromExecution);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
result.status.code);
+ assertEquals(tsBlock, tsBlockFromExecution);
}
@Test
@@ -93,7 +96,7 @@ public class ConfigExecutionTest {
new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
- Assert.assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(),
result.status.code);
+ assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(),
result.status.code);
}
@Test
@@ -120,8 +123,7 @@ public class ConfigExecutionTest {
new Thread(
() -> {
ExecutionResult result = execution.getStatus();
- Assert.assertEquals(
- TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
result.status.code);
+ assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
result.status.code);
});
resultThread.start();
taskResult.cancel(true);
@@ -140,8 +142,7 @@ public class ConfigExecutionTest {
new Thread(
() -> {
ExecutionResult result = execution.getStatus();
- Assert.assertEquals(
- TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
result.status.code);
+ assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
result.status.code);
});
resultThread.start();
execution.start();
@@ -154,7 +155,7 @@ public class ConfigExecutionTest {
// Assert.fail("InterruptedException should be threw here");
} catch (InterruptedException e) {
ExecutionResult result = execution.getStatus();
- Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
result.status.code);
+ assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(),
result.status.code);
execution.stop();
}
}