[ 
https://issues.apache.org/jira/browse/PIG-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14346127#comment-14346127
 ] 

liyunzhang_intel commented on PIG-4193:
---------------------------------------

Hi [~praveenr019]:
   If the only problem for PIG-4193-1.patch is changing code in 
POCollectedGroup.java, following way can solve the problem:
1. Revert changes in POCollectedGroup.java
2. Create POCollectedGroupSpark.java in 
org.apache.pig.backend.hadoop.executionengine.spark.operator
  code maybe like following:
{code}      
    public class POCollectedGroupSpark extends POCollectedGroup{
    public POCollectedGroupSpark(POCollectedGroup copy){
        super(copy.getOperatorKey());
    }
   ...
    public Result getNextTuple(boolean proceed) throws ExecException {
    ...
    }
{code}

3. Change SparkCompiler#visitCollectedGroup
{code}
 public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
        POCollectedGroupSpark newOp =  new 
POCollectedGroupSpark(op.getKeyType())
        ...
}
{code}
4. Change CollectedGroupConverter.java
from
{code}
 public class CollectedGroupConverter implements POConverter<Tuple, Tuple, 
POCollectedGroup> 
{code}
to
{code}
 public class CollectedGroupConverter implements POConverter<Tuple, Tuple, 
POCollectedGroupSpark> 
{code}

> Make collected group work with Spark
> ------------------------------------
>
>                 Key: PIG-4193
>                 URL: https://issues.apache.org/jira/browse/PIG-4193
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Praveen Rachabattuni
>            Assignee: Praveen Rachabattuni
>             Fix For: spark-branch
>
>         Attachments: PIG-4193-1.patch
>
>
> Related e2e tests: CollectedGroup_1 - CollectedGroup_6
> Sample script:
> a = load '/user/pig/tests/data/singlefile/studenttab10k'; 
>                             b = order a by $0;
>                             store b into 
> '/user/pig/out/praveenr-1411383735-nightly.conf/CollectedGroup_1.out.intermediate';
>                             exec;
>                                   register ./lib/java/testudf.jar;         
>                           c = load 
> '/user/pig/out/praveenr-1411383735-nightly.conf/CollectedGroup_1.out.intermediate'
>  using org.apache.pig.test.udf.storefunc.SimpleCollectableLoader();
>                             d = group c by $0 using 'collected';
>                             e = foreach d generate group, COUNT(c);
>                             store e into 
> '/user/pig/out/praveenr-1411383735-nightly.conf/CollectedGroup_1.out';
> Pig Stack Trace
> ---------------
> ERROR 0: java.lang.IllegalArgumentException: Spork unsupported 
> PhysicalOperator: (Name: d: Map side group [tuple]{bytearray} - scope-28 
> Operator Key: scope-28)
> org.apache.pig.backend.executionengine.ExecException: ERROR 0: 
> java.lang.IllegalArgumentException: Spork unsupported PhysicalOperator: 
> (Name: d: Map side group [tuple]{bytearray} - scope-28 Operator Key: scope-28)
>       at 
> org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:285)
>       at org.apache.pig.PigServer.launchPlan(PigServer.java:1378)
>       at 
> org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1363)
>       at org.apache.pig.PigServer.execute(PigServer.java:1352)
>       at org.apache.pig.PigServer.executeBatch(PigServer.java:403)
>       at org.apache.pig.PigServer.executeBatch(PigServer.java:386)
>       at 
> org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:170)
>       at 
> org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:233)
>       at 
> org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:204)
>       at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
>       at org.apache.pig.Main.run(Main.java:611)
>       at org.apache.pig.Main.main(Main.java:164)
> Caused by: java.lang.IllegalArgumentException: Spork unsupported 
> PhysicalOperator: (Name: d: Map side group [tuple]{bytearray} - scope-28 
> Operator Key: scope-28)
>       at 
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:239)
>       at 
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:232)
>       at 
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:232)
>       at 
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.launchPig(SparkLauncher.java:140)
>       at 
> org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:279)
>       ... 11 more
> ================================================================================



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to