This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 30c5a054c69b60618bdc5cc5ac032ad8da0c67ba Author: Murtadha Hubail <[email protected]> AuthorDate: Thu Apr 23 00:03:43 2020 +0300 [NO ISSUE][RT] Ensure Fail is Called on RunFileWriter - user model changes: no - storage format changes: no - interface changes: no Details: - When OptimizedHybridHashJoin fails, ensure that fail is called on any RunFileWriter that was initialized. This will ensure that any open run files are closed. Change-Id: I27fa54367045e90540ef571a4cf33723aca66c53 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5924 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../org/apache/hyracks/api/util/CleanupUtils.java | 4 +++- .../dataflow/std/join/OptimizedHybridHashJoin.java | 14 ++++++++++++++ .../OptimizedHybridHashJoinOperatorDescriptor.java | 21 +++++++++++++++------ 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java index 6e6d342..220311e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java @@ -96,7 +96,9 @@ public class CleanupUtils { } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable // NOSONAR ignore logging failure } - root.addSuppressed(th); + if (root != null) { + root.addSuppressed(th); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index 68b4b7b..59bb7ba 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -34,6 +34,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.io.RunFileReader; @@ -219,6 +220,19 @@ public class OptimizedHybridHashJoin { } } + public void fail() throws HyracksDataException { + for (RunFileWriter writer : buildRFWriters) { + if (writer != null) { + CleanupUtils.fail(writer, null); + } + } + for (RunFileWriter writer : probeRFWriters) { + if (writer != null) { + CleanupUtils.fail(writer, null); + } + } + } + private void closeAllSpilledPartitions(RunFileWriter[] runFileWriters, String refName) throws HyracksDataException { try { for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid = diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 1819b8d..97f9c24 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -273,7 +273,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories) .createPartitioner(0); - boolean isFailed = false; + boolean failed = false; @Override public void open() throws HyracksDataException { @@ -302,21 +302,24 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public void close() throws HyracksDataException { if (state.hybridHJ != null) { - state.hybridHJ.closeBuild(); - if (isFailed) { - state.hybridHJ.clearBuildTempFiles(); - } else { + if (!failed) { + state.hybridHJ.closeBuild(); ctx.setStateObject(state); if (LOGGER.isTraceEnabled()) { LOGGER.trace("OptimizedHybridHashJoin closed its build phase"); } + } else { + state.hybridHJ.clearBuildTempFiles(); } } } @Override public void fail() throws HyracksDataException { - isFailed = true; + failed = true; + if (state.hybridHJ != null) { + state.hybridHJ.fail(); + } } @Override @@ -401,6 +404,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public void fail() throws HyracksDataException { failed = true; + if (state.hybridHJ != null) { + state.hybridHJ.fail(); + } writer.fail(); } @@ -447,6 +453,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD joinPartitionPair(bReader, pReader, bSize, pSize, 1); } } catch (Exception e) { + if (state.hybridHJ != null) { + state.hybridHJ.fail(); + } // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail() // to send the failure signal to the downstream, when there is a throwable thrown. writer.fail();
