[ 
https://issues.apache.org/jira/browse/PIG-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570555#comment-15570555
 ] 

liyunzhang_intel commented on PIG-4969:
---------------------------------------

[~kexianda]:  the difference between PIG-4696_3 and PIG-4969_2.patch
1.  Remove LocalRearrangeFunction.java
2. in PIG-4969_2.patch, we extract 
JoinGroupSparkConverter#LocalRearrangeFunction to a new class 
LocalRearrangeFunction.java to let it to reuse by JoinGroupSparkConverter and 
ReduceByConverter. Actually we can not do this, the reason is
 in JoinGroupSparkConverter,  we use LocalRearrangeFunction to convert a tuple( 
key, value) to Tuple2<IndexedKey, value>.
 in ReduceByConverter, we use LocalRearrangeFunction to convert a 
tuple(key,value) to Tuple2<IndexedKey, value with key>.

After patch PIG-4969_3.patch:
In ReduceByConverter.java#MergeValuesFunction#apply, v1 and v2 are the value 
with key because functionpoReduce.getPkg().getPkgr().attachInput needs key to 
calculate the value of getNext().
{code}
 @Override
        public Tuple apply(Tuple v1, Tuple v2) {
            LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2);
            Tuple result = tf.newTuple(2);
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            Tuple t = new DefaultTuple();
            try {
                // Package the input tuples so they can be processed by 
Algebraic functions.
                Object key = v1.get(0);
                if (key == null) {
                    key = "";
                } else {
                    result.set(0, key);
                }
                bag.add((Tuple) v1.get(1));
                bag.add((Tuple) v2.get(1));
                t.append(key);
                t.append(bag);

                poReduce.getPkg().getPkgr().attachInput(key, new 
DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
                Tuple packagedTuple = (Tuple) 
poReduce.getPkg().getPkgr().getNext().result;

                // Perform the operation
                LOG.debug("MergeValuesFunction packagedTuple : " + t);
                poReduce.attachInput(packagedTuple);
                Result r = poReduce.getNext(poReduce.getResultType());

                // Ensure output is consistent with the output of 
KeyValueFunction
                // If we return r.result, the result will be something like 
this:
                // (ABC,(2),(3)) - A tuple with key followed by values.
                // But, we want the result to look like this:
                // (ABC,((2),(3))) - A tuple with key and a value tuple 
(containing values).
                // Hence, the construction of a new value tuple

                Tuple valueTuple = tf.newTuple();
                for (Object o : ((Tuple) r.result).getAll()) {
                    if (!o.equals(key)) {
                        valueTuple.append(o);
                    }
                }
                result.set(1,valueTuple);
                LOG.debug("MergeValuesFunction out : " + result);
                return result;
            } catch (ExecException e) {
                throw new RuntimeException(e);
            }
        }
{code}


> Optimize combine case for spark mode
> ------------------------------------
>
>                 Key: PIG-4969
>                 URL: https://issues.apache.org/jira/browse/PIG-4969
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4969_2.patch
>
>
> In our test result of 1 TB pigmix benchmark , it shows that it runs slower in 
> combine case in spark mode .
> ||Script||MR||Spark
> |L_1|8089     |10064
> L1.pig
> {code}
> register pigperf.jar;
> A = load '/user/pig/tests/data/pigmix/page_views' using 
> org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = foreach A generate user, (int)action as action, (map[])page_info as 
> page_info,
>     flatten((bag{tuple(map[])})page_links) as page_links;
> C = foreach B generate user,
>     (action == 1 ? page_info#'a' : page_links#'b') as header;
> D = group C by user parallel 40;
> E = foreach D generate group, COUNT(C) as cnt;
> store E into 'L1out';
> {code}
> Then spark plan
> {code}
> exec] #--------------------------------------------------
>      [exec] # Spark Plan                                  
>      [exec] #--------------------------------------------------
>      [exec] 
>      [exec] Spark node scope-38
>      [exec] E: 
> Store(hdfs://bdpe81:8020/user/root/output/pig/L1out:org.apache.pig.builtin.PigStorage)
>  - scope-37
>      [exec] |
>      [exec] |---E: New For Each(false,false)[tuple] - scope-42
>      [exec]     |   |
>      [exec]     |   Project[bytearray][0] - scope-39
>      [exec]     |   |
>      [exec]     |   Project[bag][1] - scope-40
>      [exec]     |   
>      [exec]     |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - 
> scope-41
>      [exec]     |   |
>      [exec]     |   |---Project[bag][1] - scope-57
>      [exec]     |
>      [exec]     |---Reduce By(false,false)[tuple] - scope-47
>      [exec]         |   |
>      [exec]         |   Project[bytearray][0] - scope-48
>      [exec]         |   |
>      [exec]         |   
> POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-49
>      [exec]         |   |
>      [exec]         |   |---Project[bag][1] - scope-50
>      [exec]         |
>      [exec]         |---D: Local Rearrange[tuple]{bytearray}(false) - scope-53
>      [exec]             |   |
>      [exec]             |   Project[bytearray][0] - scope-55
>      [exec]             |
>      [exec]             |---E: New For Each(false,false)[bag] - scope-43
>      [exec]                 |   |
>      [exec]                 |   Project[bytearray][0] - scope-44
>      [exec]                 |   |
>      [exec]                 |   
> POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-45
>      [exec]                 |   |
>      [exec]                 |   |---Project[bag][1] - scope-46
>      [exec]                 |
>      [exec]                 |---Pre Combiner Local Rearrange[tuple]{Unknown} 
> - scope-56
>      [exec]                     |
>      [exec]                     |---C: New For Each(false,false)[bag] - 
> scope-26
>      [exec]                         |   |
>      [exec]                         |   Project[bytearray][0] - scope-13
>      [exec]                         |   |
>      [exec]                         |   POBinCond[bytearray] - scope-22
>      [exec]                         |   |
>      [exec]                         |   |---Equal To[boolean] - scope-17
>      [exec]                         |   |   |
>      [exec]                         |   |   |---Project[int][1] - scope-15
>      [exec]                         |   |   |
>      [exec]                         |   |   |---Constant(1) - scope-16
>      [exec]                         |   |
>      [exec]                         |   |---POMapLookUp[bytearray] - scope-19
>      [exec]                         |   |   |
>      [exec]                         |   |   |---Project[map][2] - scope-18
>      [exec]                         |   |
>      [exec]                         |   |---POMapLookUp[bytearray] - scope-21
>      [exec]                         |       |
>      [exec]                         |       |---Project[map][3] - scope-20
>      [exec]                         |
>      [exec]                         |---B: New For 
> Each(false,false,false,true)[bag] - scope-12
>      [exec]                             |   |
>      [exec]                             |   Project[bytearray][0] - scope-1
>      [exec]                             |   |
>      [exec]                             |   Cast[int] - scope-4
>      [exec]                             |   |
>      [exec]                             |   |---Project[bytearray][1] - 
> scope-3
>      [exec]                             |   |
>      [exec]                             |   Cast[map:[]] - scope-7
>      [exec]                             |   |
>      [exec]                             |   |---Project[bytearray][2] - 
> scope-6
>      [exec]                             |   |
>      [exec]                             |   Cast[bag:{([])}] - scope-10
>      [exec]                             |   |
>      [exec]                             |   |---Project[bytearray][3] - 
> scope-9
>      [exec]                             |
>      [exec]                             |---A: 
> Load(/user/pig/tests/data/pigmix/page_views:org.apache.pig.test.pigmix.udf.PigPerformanceLoader)
>  - scope-0--------
> {code}
> We can combine LocalRearrange(scope-53) and ReduceBy(scope-47) as 1 physical 
> operator to remove the redundant map operations like what we did in 
> PIG-4797(Optimization for join/group case for spark mode).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to