Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Tue 
May 14 20:37:37 2024
@@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -73,7 +72,7 @@ public class SparkPigStats extends PigSt
                             JobStatisticCollector jobStatisticCollector,
                             JavaSparkContext sparkContext) {
         boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
-        SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, 
jobPlan, conf);
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
         jobStats.collectStats(jobStatisticCollector);
         jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector);
@@ -90,7 +89,7 @@ public class SparkPigStats extends PigSt
                                 JavaSparkContext sparkContext,
                                 Exception e) {
         boolean isSuccess = false;
-        SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, 
jobPlan, conf);
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
         jobStats.collectStats(jobStatisticCollector);
         jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector);
@@ -101,7 +100,7 @@ public class SparkPigStats extends PigSt
     }
 
     public void addNativeJobStats(NativeSparkOperator sparkOperator, String 
jobId, boolean isSuccess, Exception e) {
-        SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, 
jobPlan, conf);
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);

Modified: pig/trunk/test/org/apache/pig/test/TestStoreBase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Tue May 14 20:37:37 
2024
@@ -143,8 +143,6 @@ public abstract class TestStoreBase {
         String outputFileName1 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
         String outputFileName2 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
 
-        boolean isSpark2_2_plus = Util.isSpark2_2_plus();
-        
         Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
         if (mode.toString().startsWith("SPARK")) {
             filesToVerify.put(outputFileName1 + 
"_cleanupOnFailure_succeeded1", Boolean.TRUE);
@@ -176,21 +174,12 @@ public abstract class TestStoreBase {
             filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + 
"2", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"2", Boolean.FALSE);
-            if (isSpark2_2_plus) {
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.TRUE);
-            } else {
-                // OutputCommitter.abortTask will not be invoked in spark mode 
before spark 2.2.x. Detail see SPARK-7953
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.FALSE);
-            }
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.TRUE);
             filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"2", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"2", Boolean.FALSE);
-            if (isSpark2_2_plus) {
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + 
"1", Boolean.TRUE);
-            } else {
-                // OutputCommitter.abortJob will not be invoked in spark mode 
before spark 2.2.x. Detail see SPARK-7953
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + 
"1", Boolean.FALSE);
-            }
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.TRUE);
+
             filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"2", Boolean.FALSE);
@@ -229,7 +218,7 @@ public abstract class TestStoreBase {
         if(mode.isLocal()) {
             // MR LocalJobRunner does not call abortTask
             if (!Util.isTezExecType(mode)) {
-                if (Util.isSparkExecType(mode) && isSpark2_2_plus) {
+                if (Util.isSparkExecType(mode)) {
                     
filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", 
Boolean.TRUE);
                 } else {
                     
filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", 
Boolean.FALSE);

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1917723&r1=1917722&r2=1917723&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Tue May 14 20:37:37 2024
@@ -1336,11 +1336,6 @@ public class Util {
         return false;
     }
 
-    public static boolean isSpark2_2_plus() throws IOException {
-        String sparkVersion = package$.MODULE$.SPARK_VERSION();
-        return sparkVersion != null && 
sparkVersion.matches("2\\.([\\d&&[^01]]|[\\d]{2,})\\..*");
-    }
-
     public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, 
boolean toSort){
         if( toSort == true) {
             for (Tuple t : actualResList) {


Reply via email to