This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/FixIntoOperator1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 25bf877dbb03751c792772f1c56488b855578d32 Author: Minghui Liu <[email protected]> AuthorDate: Tue Nov 29 22:09:37 2022 +0800 fix --- .../operator/process/AbstractIntoOperator.java | 62 ++++++++++------------ .../operator/process/DeviceViewIntoOperator.java | 7 ++- .../execution/operator/process/IntoOperator.java | 7 ++- 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 9ec9b0d5bc..6181dab228 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -146,48 +146,40 @@ public abstract class AbstractIntoOperator implements ProcessOperator { isBlocked = SettableFuture.create(); writeOperationFuture = - writeOperationExecutor.submit( - () -> { - LOGGER.info(""); - return client.insertTablets(insertMultiTabletsStatement); - }); - + writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement)); writeOperationFuture.addListener( - () -> { - LOGGER.info(""); - ((SettableFuture<Void>) isBlocked).set(null); - }, - writeOperationExecutor); + () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor); } - protected boolean handleFuture() { - if (writeOperationFuture != null) { - if (writeOperationFuture.isDone()) { - try { - TSStatus executionStatus = writeOperationFuture.get(); - if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - String message = - String.format( - "Error occurred while inserting tablets in SELECT INTO: %s", - executionStatus.getMessage()); - throw new IntoProcessException(message); - } - - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - generator.reset(); - } + protected boolean writeOperationDone() { + if (writeOperationFuture == null) { + return true; + } - writeOperationFuture = null; - return true; - } catch (ExecutionException | InterruptedException e) { - throw new IntoProcessException(e.getMessage()); - } - } else { + try { + if (!writeOperationFuture.isDone()) { return false; } + + TSStatus executionStatus = writeOperationFuture.get(); + if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + String message = + String.format( + "Error occurred while inserting tablets in SELECT INTO: %s", + executionStatus.getMessage()); + throw new IntoProcessException(message); + } + + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + generator.reset(); + } + + writeOperationFuture = null; + return true; + } catch (ExecutionException | InterruptedException e) { + throw new IntoProcessException(e.getMessage()); } - return true; } private boolean existFullStatement( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index 7ef65c9e16..e36bb49168 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -76,7 +76,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { @Override public TsBlock next() { - if (!handleFuture()) { + if (!writeOperationDone()) { return null; } @@ -86,7 +86,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { cachedTsBlock = null; if (child.hasNext()) { - processTsBlock(child.next()); + TsBlock inputTsBlock = child.next(); + processTsBlock(inputTsBlock); + + // call child.next only once return null; } else { InsertMultiTabletsStatement insertMultiTabletsStatement = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 9e5c46e9fa..67a4fb0b0e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -60,7 +60,7 @@ public class IntoOperator extends AbstractIntoOperator { @Override public TsBlock next() { - if (!handleFuture()) { + if (!writeOperationDone()) { return null; } @@ -70,7 +70,10 @@ public class IntoOperator extends AbstractIntoOperator { cachedTsBlock = null; if (child.hasNext()) { - processTsBlock(child.next()); + TsBlock inputTsBlock = child.next(); + processTsBlock(inputTsBlock); + + // call child.next only once return null; } else { if (insertMultiTabletsInternally(false)) {
