Author: knoguchi
Date: Tue May 14 20:31:37 2024
New Revision: 1917721

URL: http://svn.apache.org/viewvc?rev=1917721&view=rev
Log:
PIG-5416: Spark unit tests failing randomly with "java.lang.RuntimeException: 
Unexpected job execution status RUNNING" (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917721&r1=1917720&r2=1917721&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 14 20:31:37 2024
@@ -27,6 +27,7 @@ IMPROVEMENTS
 OPTIMIZATIONS
  
 BUG FIXES
+PIG-5416: Spark unit tests failing randomly with "java.lang.RuntimeException: 
Unexpected job execution status RUNNING" (knoguchi)
 
 
 Release 0.18.0 - Unreleased

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1917721&r1=1917720&r2=1917721&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Tue 
May 14 20:31:37 2024
@@ -56,6 +56,20 @@ public class SparkStatsUtil {
         // this driver thread calling SparkStatusTracker.
         // To workaround this, we will wait for this job to "finish".
         jobStatisticCollector.waitForJobToEnd(jobID);
+
+        // Horrible hack.  After spark 3.2, somehow even after calling 
waitForJobToEnd
+        // job is still being reported as RUNNING.  Here, adding a poll to 
wait till the
+        // job gets out of the RUNNING state.
+        // Not sure if this issue is limited to local unit testings or not
+        for( int i=0; i < 10; i++) {
+            if( getJobInfo(jobID, sparkContext).status() != 
JobExecutionStatus.RUNNING ) {
+                break;
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {}
+        }
+
         sparkPigStats.addJobStats(poStore, sparkOperator, jobID, 
jobStatisticCollector,
                 sparkContext);
         jobStatisticCollector.cleanup(jobID);
@@ -160,4 +174,4 @@ public class SparkStatsUtil {
     public static void addFailedNativeJobStats(PigStats ps, 
NativeSparkOperator nativeSparkOperator, Exception e) {
         ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, 
nativeSparkOperator.getJobId(), false, e);
     }
-}
\ No newline at end of file
+}


Reply via email to