[
https://issues.apache.org/jira/browse/PIG-4243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15133852#comment-15133852
]
liyunzhang_intel commented on PIG-4243:
---------------------------------------
In https://builds.apache.org/job/Pig-spark/298/#showFailuresLink, it shows
following unit tests fail:
org.apache.pig.test.TestStore.testCleanupOnFailureMultiStore
org.apache.pig.test.TestStore.testCleanupOnFailure
PIG-4243.patch fixes these two failures.
Changes in PIG-4243.patch:
1. add "clean up for all of the stores"(call PigStorage#cleanupOnFailure)
2. add some judgements to give different results in different engine
mode(TestStoreBase#testCleanupOnFailureMultiStore)
Explain more about TestStoreBase#testCleanupOnFailureMultiStore
The script like following:
{code}
A = load xx;
store A into '1.out' using DummyStore('true','1'); -- first job should fail
store A into '2.out' using DummyStore('false','1'); -- second job should
success
{code}
the spark plan will be after multiquery optimization:
{code}
Split - scope-14
| |
| a: Store(hdfs://1.out:myudfs.DummyStore('true','1')) - scope-4
| |
| a: Store(hdfs://2.out:myudfs.DummyStore('false','1')) - scope-7
|
|---a:
Load(hdfs://zly2.sh.intel.com:8020/user/root/multiStore.txt:org.apache.pig.builtin.PigStorage)
- scope-0------
{code}
In spark mode ,when there are two POStore in the sub plan of POSplit, once
the first job fails and throws exception, the second job will not be executed.
FILE_SETUPJOB_CALLED( or FILE_SETUPTASK_CALLED) of second job will not be
generated. *But why FILE_SETUPJOB_CALLED(or FILE_SETUPTASK_CALLED) of second
job is generated even the second job is also not executed in mr mode?*
in MR mode:
FILE_SETUPJOB_CALLED is genereated in
org.apache.pig.test.TestStore.DummyOutputCommitter#setupJob.
DummyOutputCommitter#setupJob stacktraceļ¼
{code}
DummyOutputCommitter.setupJob
->org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.setupJob(PigOutputCommitter.java:407)
->
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:511)
{code}
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter#PigOutputCommitter
{code}
public PigOutputCommitter(TaskAttemptContext context,
List<POStore> mapStores, List<POStore> reduceStores)
throws IOException {
// create and store the map and reduce output committers
mapOutputCommitters = getCommitters(context, mapStores); // Kelly's
comment: there will be 2 mapOutputCommitters in above case and later
DummyOutputCommitter#setupJob will be invoked and FILE_SETUPJOB_CALLED of
first store and second store will be generated before the mr job starts to
compute.
reduceOutputCommitters = getCommitters(context, reduceStores);
recoverySupported =
context.getConfiguration().getBoolean(PigConfiguration.PIG_OUTPUT_COMMITTER_RECOVERY,
false);
}
{code}
In spark mode:
DummyOutputCommitter#setupJob stacktrace
{code}
DummyOutputCommitter.setupJob
->org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.setupJob(PigOutputCommitter.java:407)
->org.apache.spark.rdd.PairRDDFunctions#saveAsNewAPIHadoopDataset
{code}
In spark mode, 1 store generates 1 spark job and the
PigOutputCommitter only has 1 reduceOutputCommitter for the spark job.
StoreConverter#configureStorer:
{code}
//Kelly's comment:We only set the location of current store as
JobControlCompiler.PIG_REDUCE_STORES even there are more than 1 POStore in the
script. In spark, store is an action, 1 store generates 1 job. So in above
case, there will be two jobs and we execute jobs one by one, when first job
fails and second job will be stopped and
FILE_SETUPJOB_CALLED(FILE_SETUPTASK_CALLED) of second job is not generated
private static POStore configureStorer(JobConf jobConf,
PhysicalOperator op) throws IOException {
....
jobConf.set(JobControlCompiler.PIG_MAP_STORES,
ObjectSerializer.serialize(Lists.newArrayList()));
jobConf.set(JobControlCompiler.PIG_REDUCE_STORES,
ObjectSerializer.serialize(storeLocations));
....
}
{code}
[~pallavi.rao], [~mohitsabharwal],[~kexianda]: help review PIG-4243.patch,
thanks
> Fix "TestStore" for Spark engine
> --------------------------------
>
> Key: PIG-4243
> URL: https://issues.apache.org/jira/browse/PIG-4243
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4243.patch, TEST-org.apache.pig.test.TestStore.txt
>
>
> 1. Build spark and pig env according to PIG-4168
> 2. add TestStore to $PIG_HOME/test/spark-tests
> cat $PIG_HOME/test/spark-tests
> **/TestStore
> 3. run unit test TestStore
> ant test-spark
> 4. the unit test fails
> error log is attached
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)