Author: knoguchi Date: Tue May 14 20:31:37 2024 New Revision: 1917721 URL: http://svn.apache.org/viewvc?rev=1917721&view=rev Log: PIG-5416: Spark unit tests failing randomly with "java.lang.RuntimeException: Unexpected job execution status RUNNING" (knoguchi)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917721&r1=1917720&r2=1917721&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue May 14 20:31:37 2024 @@ -27,6 +27,7 @@ IMPROVEMENTS OPTIMIZATIONS BUG FIXES +PIG-5416: Spark unit tests failing randomly with "java.lang.RuntimeException: Unexpected job execution status RUNNING" (knoguchi) Release 0.18.0 - Unreleased Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1917721&r1=1917720&r2=1917721&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Tue May 14 20:31:37 2024 @@ -56,6 +56,20 @@ public class SparkStatsUtil { // this driver thread calling SparkStatusTracker. // To workaround this, we will wait for this job to "finish". jobStatisticCollector.waitForJobToEnd(jobID); + + // Horrible hack. After spark 3.2, somehow even after calling waitForJobToEnd + // job is still being reported as RUNNING. Here, adding a poll to wait till the + // job gets out of the RUNNING state. + // Not sure if this issue is limited to local unit testings or not + for( int i=0; i < 10; i++) { + if( getJobInfo(jobID, sparkContext).status() != JobExecutionStatus.RUNNING ) { + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobStatisticCollector, sparkContext); jobStatisticCollector.cleanup(jobID); @@ -160,4 +174,4 @@ public class SparkStatsUtil { public static void addFailedNativeJobStats(PigStats ps, NativeSparkOperator nativeSparkOperator, Exception e) { ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, nativeSparkOperator.getJobId(), false, e); } -} \ No newline at end of file +}