[ 
https://issues.apache.org/jira/browse/PIG-4243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15133852#comment-15133852
 ] 

liyunzhang_intel commented on PIG-4243:
---------------------------------------

In https://builds.apache.org/job/Pig-spark/298/#showFailuresLink, it shows  
following unit tests fail:
org.apache.pig.test.TestStore.testCleanupOnFailureMultiStore
org.apache.pig.test.TestStore.testCleanupOnFailure

PIG-4243.patch fixes these two failures.

Changes in PIG-4243.patch:
1. add "clean up  for all of the stores"(call PigStorage#cleanupOnFailure) 
2. add some judgements to give different results in different engine 
mode(TestStoreBase#testCleanupOnFailureMultiStore)

Explain more about TestStoreBase#testCleanupOnFailureMultiStore
The script like following:
{code}
A = load xx;
store A into '1.out' using DummyStore('true','1');   -- first job should fail
store A into '2.out' using DummyStore('false','1');  -- second job should 
success
{code}

the spark plan will be after multiquery optimization:
{code}
        Split - scope-14
        |   |
        |   a: Store(hdfs://1.out:myudfs.DummyStore('true','1')) - scope-4
        |   |
        |   a: Store(hdfs://2.out:myudfs.DummyStore('false','1')) - scope-7
        |
        |---a: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/multiStore.txt:org.apache.pig.builtin.PigStorage)
 - scope-0------
{code}    
  In spark mode ,when there are two POStore in the sub plan of POSplit, once 
the first job fails and throws exception,  the second job will not be executed. 
 FILE_SETUPJOB_CALLED( or  FILE_SETUPTASK_CALLED) of second job will not be 
generated.  *But why FILE_SETUPJOB_CALLED(or FILE_SETUPTASK_CALLED) of second 
job is generated even the second job is also not executed in mr mode?*
in MR mode:
  FILE_SETUPJOB_CALLED is genereated in 
org.apache.pig.test.TestStore.DummyOutputCommitter#setupJob.
          DummyOutputCommitter#setupJob stacktrace:
           {code}
           DummyOutputCommitter.setupJob
             
->org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.setupJob(PigOutputCommitter.java:407)
               -> 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:511)
          {code}
         
          
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter#PigOutputCommitter
          {code}
           public PigOutputCommitter(TaskAttemptContext context,
            List<POStore> mapStores, List<POStore> reduceStores)
            throws IOException {
        // create and store the map and reduce output committers
        mapOutputCommitters = getCommitters(context, mapStores);  // Kelly's 
comment: there will be 2 mapOutputCommitters in above case and later 
DummyOutputCommitter#setupJob will be invoked and  FILE_SETUPJOB_CALLED of 
first store and second store will be generated before the  mr job starts to 
compute.
        reduceOutputCommitters = getCommitters(context, reduceStores);
        recoverySupported = 
context.getConfiguration().getBoolean(PigConfiguration.PIG_OUTPUT_COMMITTER_RECOVERY,
 false);
    }
            {code}
           
In spark mode:
 DummyOutputCommitter#setupJob stacktrace
         {code}
                  DummyOutputCommitter.setupJob
               
->org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.setupJob(PigOutputCommitter.java:407)
                      
->org.apache.spark.rdd.PairRDDFunctions#saveAsNewAPIHadoopDataset
          {code}          
                  
        In spark mode, 1 store generates 1 spark job and  the 
PigOutputCommitter only has 1 reduceOutputCommitter for the spark job. 
StoreConverter#configureStorer: 
{code}  
  //Kelly's comment:We only set the location of current store as 
JobControlCompiler.PIG_REDUCE_STORES even there are more than 1 POStore in  the 
script. In spark, store is an action, 1 store generates 1 job.  So in above 
case, there will be two jobs and we execute jobs one by one, when first job 
fails and second job will be stopped and 
FILE_SETUPJOB_CALLED(FILE_SETUPTASK_CALLED) of second job is not generated
        private static POStore configureStorer(JobConf jobConf,
            PhysicalOperator op) throws IOException {
         ....
        jobConf.set(JobControlCompiler.PIG_MAP_STORES,
                ObjectSerializer.serialize(Lists.newArrayList()));
        jobConf.set(JobControlCompiler.PIG_REDUCE_STORES,
                ObjectSerializer.serialize(storeLocations));
         ....
    }
        {code}

[~pallavi.rao], [~mohitsabharwal],[~kexianda]: help review PIG-4243.patch, 
thanks


> Fix "TestStore" for Spark engine
> --------------------------------
>
>                 Key: PIG-4243
>                 URL: https://issues.apache.org/jira/browse/PIG-4243
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4243.patch, TEST-org.apache.pig.test.TestStore.txt
>
>
> 1. Build spark and pig env according to PIG-4168
> 2. add TestStore to $PIG_HOME/test/spark-tests
> cat  $PIG_HOME/test/spark-tests
> **/TestStore
> 3. run unit test TestStore
> ant test-spark
> 4. the unit test fails
> error log is attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to