Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Tue May 14 20:37:37 2024 @@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; @@ -73,7 +72,7 @@ public class SparkPigStats extends PigSt JobStatisticCollector jobStatisticCollector, JavaSparkContext sparkContext) { boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext); - SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf); + SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); jobStats.setSuccessful(isSuccess); jobStats.collectStats(jobStatisticCollector); jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector); @@ -90,7 +89,7 @@ public class SparkPigStats extends PigSt JavaSparkContext sparkContext, Exception e) { boolean isSuccess = false; - SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf); + SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); jobStats.setSuccessful(isSuccess); jobStats.collectStats(jobStatisticCollector); jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector); @@ -101,7 +100,7 @@ public class SparkPigStats extends PigSt } public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) { - SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf); + SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); jobStats.setSuccessful(isSuccess); jobSparkOperatorMap.put(jobStats, sparkOperator); jobPlan.add(jobStats);
Modified: pig/trunk/test/org/apache/pig/test/TestStoreBase.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (original) +++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Tue May 14 20:37:37 2024 @@ -143,8 +143,6 @@ public abstract class TestStoreBase { String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; - boolean isSpark2_2_plus = Util.isSpark2_2_plus(); - Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); if (mode.toString().startsWith("SPARK")) { filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); @@ -176,21 +174,12 @@ public abstract class TestStoreBase { filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); - if (isSpark2_2_plus) { - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); - } else { - // OutputCommitter.abortTask will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953 - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); - } + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); - if (isSpark2_2_plus) { - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); - } else { - // OutputCommitter.abortJob will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953 - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); - } + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); @@ -229,7 +218,7 @@ public abstract class TestStoreBase { if(mode.isLocal()) { // MR LocalJobRunner does not call abortTask if (!Util.isTezExecType(mode)) { - if (Util.isSparkExecType(mode) && isSpark2_2_plus) { + if (Util.isSparkExecType(mode)) { filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); } else { filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); Modified: pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1917723&r1=1917722&r2=1917723&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/Util.java (original) +++ pig/trunk/test/org/apache/pig/test/Util.java Tue May 14 20:37:37 2024 @@ -1336,11 +1336,6 @@ public class Util { return false; } - public static boolean isSpark2_2_plus() throws IOException { - String sparkVersion = package$.MODULE$.SPARK_VERSION(); - return sparkVersion != null && sparkVersion.matches("2\\.([\\d&&[^01]]|[\\d]{2,})\\..*"); - } - public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, boolean toSort){ if( toSort == true) { for (Tuple t : actualResList) {