This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 30c5a054c69b60618bdc5cc5ac032ad8da0c67ba
Author: Murtadha Hubail <[email protected]>
AuthorDate: Thu Apr 23 00:03:43 2020 +0300

    [NO ISSUE][RT] Ensure Fail is Called on RunFileWriter
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - When OptimizedHybridHashJoin fails, ensure that fail is
      called on any RunFileWriter that was initialized. This
      will ensure that any open run files are closed.
    
    Change-Id: I27fa54367045e90540ef571a4cf33723aca66c53
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5924
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../org/apache/hyracks/api/util/CleanupUtils.java   |  4 +++-
 .../dataflow/std/join/OptimizedHybridHashJoin.java  | 14 ++++++++++++++
 .../OptimizedHybridHashJoinOperatorDescriptor.java  | 21 +++++++++++++++------
 3 files changed, 32 insertions(+), 7 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index 6e6d342..220311e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -96,7 +96,9 @@ public class CleanupUtils {
             } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching 
Throwable
                 // NOSONAR ignore logging failure
             }
-            root.addSuppressed(th);
+            if (root != null) {
+                root.addSuppressed(th);
+            }
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 68b4b7b..59bb7ba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -219,6 +220,19 @@ public class OptimizedHybridHashJoin {
         }
     }
 
+    public void fail() throws HyracksDataException {
+        for (RunFileWriter writer : buildRFWriters) {
+            if (writer != null) {
+                CleanupUtils.fail(writer, null);
+            }
+        }
+        for (RunFileWriter writer : probeRFWriters) {
+            if (writer != null) {
+                CleanupUtils.fail(writer, null);
+            }
+        }
+    }
+
     private void closeAllSpilledPartitions(RunFileWriter[] runFileWriters, 
String refName) throws HyracksDataException {
         try {
             for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < 
numOfPartitions; pid =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 1819b8d..97f9c24 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -273,7 +273,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                 ITuplePartitionComputer buildHpc =
                         new FieldHashPartitionComputerFamily(buildKeys, 
buildHashFunctionFactories)
                                 .createPartitioner(0);
-                boolean isFailed = false;
+                boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -302,21 +302,24 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                 @Override
                 public void close() throws HyracksDataException {
                     if (state.hybridHJ != null) {
-                        state.hybridHJ.closeBuild();
-                        if (isFailed) {
-                            state.hybridHJ.clearBuildTempFiles();
-                        } else {
+                        if (!failed) {
+                            state.hybridHJ.closeBuild();
                             ctx.setStateObject(state);
                             if (LOGGER.isTraceEnabled()) {
                                 LOGGER.trace("OptimizedHybridHashJoin closed 
its build phase");
                             }
+                        } else {
+                            state.hybridHJ.clearBuildTempFiles();
                         }
                     }
                 }
 
                 @Override
                 public void fail() throws HyracksDataException {
-                    isFailed = true;
+                    failed = true;
+                    if (state.hybridHJ != null) {
+                        state.hybridHJ.fail();
+                    }
                 }
 
                 @Override
@@ -401,6 +404,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                 @Override
                 public void fail() throws HyracksDataException {
                     failed = true;
+                    if (state.hybridHJ != null) {
+                        state.hybridHJ.fail();
+                    }
                     writer.fail();
                 }
 
@@ -447,6 +453,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                             joinPartitionPair(bReader, pReader, bSize, pSize, 
1);
                         }
                     } catch (Exception e) {
+                        if (state.hybridHJ != null) {
+                            state.hybridHJ.fail();
+                        }
                         // Since writer.nextFrame() is called in the above 
"try" body, we have to call writer.fail()
                         // to send the failure signal to the downstream, when 
there is a throwable thrown.
                         writer.fail();

Reply via email to