Author: xuefu
Date: Wed May 13 03:55:31 2015
New Revision: 1679131

URL: http://svn.apache.org/r1679131
Log:
PIG-4542: OutputConsumerIterator should flush buffered records (Mohit via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
Removed:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Wed May 13 03:55:31 2015
@@ -484,6 +484,10 @@ public abstract class PhysicalOperator e
        parentPlan = physicalPlan;
     }
 
+    public PhysicalPlan getParentPlan() {
+        return parentPlan;
+    }
+
     public Log getLogger() {
         return log;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 Wed May 13 03:55:31 2015
@@ -213,104 +213,6 @@ public class POCollectedGroup extends Ph
         return inp;
     }
 
-    public Result getNextTuple(boolean proceed) throws ExecException {
-
-        Result inp = null;
-        Result res = null;
-
-        while (true) {
-            inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP ||
-                    inp.returnStatus == POStatus.STATUS_ERR) {
-                break;
-            }
-
-            if (inp.returnStatus == POStatus.STATUS_NULL) {
-                continue;
-            }
-
-            for (PhysicalPlan ep : plans) {
-                ep.attachInput((Tuple)inp.result);
-            }
-
-            List<Result> resLst = new ArrayList<Result>();
-            for (ExpressionOperator op : leafOps) {
-                res = op.getNext(op.getResultType());
-                if (res.returnStatus != POStatus.STATUS_OK) {
-                    return new Result();
-                }
-                resLst.add(res);
-            }
-
-            Tuple tup = constructOutput(resLst,(Tuple)inp.result);
-            Object curKey = tup.get(0);
-
-            // the first time, just create a new buffer and continue.
-            if (prevKey == null && outputBag == null) {
-
-                if (PigMapReduce.sJobConfInternal.get() != null) {
-                    String bagType = 
PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
-                    if (bagType != null && 
bagType.equalsIgnoreCase("default")) {
-                        useDefaultBag = true;
-                    }
-                }
-                prevKey = curKey;
-                outputBag = useDefaultBag ? 
BagFactory.getInstance().newDefaultBag()
-                // In a very rare case if there is a POStream after this
-                // POCollectedGroup in the pipeline and is also blocking the 
pipeline;
-                // constructor argument should be 2. But for one obscure
-                // case we don't want to pay the penalty all the time.
-
-                // Additionally, if there is a merge join(on a different key) 
following POCollectedGroup
-                // default bags should be used. But since we don't allow 
anything
-                // before Merge Join currently we are good.
-                        : new InternalCachedBag(1);
-                outputBag.add((Tuple)tup.get(1));
-                continue;
-            }
-
-            // no key change
-            if (prevKey == null && curKey == null) {
-                outputBag.add((Tuple)tup.get(1));
-                continue;
-            }
-
-            // no key change
-            if (prevKey != null && curKey != null && 
((Comparable)curKey).compareTo(prevKey) == 0) {
-                outputBag.add((Tuple)tup.get(1));
-                continue;
-            }
-
-            // key change
-            Tuple tup2 = mTupleFactory.newTuple(2);
-            tup2.set(0, prevKey);
-            tup2.set(1, outputBag);
-            res.result = tup2;
-
-            prevKey = curKey;
-            outputBag = useDefaultBag ? 
BagFactory.getInstance().newDefaultBag()
-                    : new InternalCachedBag(1);
-            outputBag.add((Tuple)tup.get(1));
-            return res;
-        }
-
-        // Since the output is buffered, we need to flush the last
-        // set of records when the close method is called by mapper.
-        if (this.parentPlan.endOfAllInput || proceed) {
-            if (outputBag != null) {
-                Tuple tup = mTupleFactory.newTuple(2);
-                tup.set(0, prevKey);
-                tup.set(1, outputBag);
-                outputBag = null;
-                return new Result(POStatus.STATUS_OK, tup);
-            }
-
-            return new Result(POStatus.STATUS_EOP, null);
-        }
-
-        return inp;
-    }
-
     protected Tuple constructOutput(List<Result> resLst, Tuple value) throws 
ExecException{
 
         // Construct key

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed May 13 03:55:31 2015
@@ -46,6 +46,7 @@ import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -62,6 +63,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -83,7 +85,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
-import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
@@ -180,7 +181,7 @@ public class SparkLauncher extends Launc
                convertMap.put(POCollectedGroup.class, new 
CollectedGroupConverter());
                convertMap.put(POCounter.class, new CounterConverter());
                convertMap.put(PORank.class, new RankConverter());
-               convertMap.put(POStreamSpark.class, new 
StreamConverter(confBytes));
+               convertMap.put(POStream.class, new StreamConverter(confBytes));
 
                sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
                cleanUpSparkJob(pigContext, currentDirectoryPath);
@@ -447,6 +448,7 @@ public class SparkLauncher extends Launc
                        if (LOG.isDebugEnabled())
                            LOG.debug("Converting " + leaves.size() + " Spark 
Operators");
                        for (SparkOperator leaf : leaves) {
+                               new PhyPlanSetter(leaf.physicalPlan).visit();
                                Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = 
new HashMap();
                                sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
                                                physicalOpToRdds, convertMap, 
seenJobIDs, sparkStats,

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 Wed May 13 03:55:31 2015
@@ -29,7 +29,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
-
 @SuppressWarnings({ "serial"})
 public class CollectedGroupConverter implements POConverter<Tuple, Tuple, 
POCollectedGroup> {
 
@@ -38,72 +37,52 @@ public class CollectedGroupConverter imp
       POCollectedGroup physicalOperator) throws IOException {
     SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
     RDD<Tuple> rdd = predecessors.get(0);
-    // return predecessors.get(0);
-    RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
-    long count = 0;
-    try {
-
-      count = rdd2.count();
-
-    } catch (Exception e) {
-
-    }
     CollectedGroupFunction collectedGroupFunction
-        = new CollectedGroupFunction(physicalOperator, count);
-    return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true).rdd();
+        = new CollectedGroupFunction(physicalOperator);
+    return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true)
+                       .rdd();
   }
 
-       private static class CollectedGroupFunction implements 
FlatMapFunction<Iterator<Tuple>, Tuple> {
+       private static class CollectedGroupFunction
+                       implements FlatMapFunction<Iterator<Tuple>, Tuple> {
 
-               /**
-                *
-                */
                private POCollectedGroup poCollectedGroup;
 
-               public long total_limit;
                public long current_val;
                public boolean proceed;
 
-               private CollectedGroupFunction(POCollectedGroup 
poCollectedGroup, long count) {
+               private CollectedGroupFunction(POCollectedGroup 
poCollectedGroup) {
                        this.poCollectedGroup = poCollectedGroup;
-                       this.total_limit = count;
                        this.current_val = 0;
                }
 
                public Iterable<Tuple> call(final Iterator<Tuple> input) {
 
-                 return new Iterable<Tuple>() {
+                         return new Iterable<Tuple>() {
+
+                               @Override
+                               public Iterator<Tuple> iterator() {
+
+                                 return new OutputConsumerIterator(input) {
 
-                   @Override
-                   public Iterator<Tuple> iterator() {
-                     return new POOutputConsumerIterator(input) {
-                       protected void attach(Tuple tuple) {
-                         poCollectedGroup.setInputs(null);
-                         poCollectedGroup.attachInput(tuple);
-                         
poCollectedGroup.setParentPlan(poCollectedGroup.getPlans().get(0));
-
-                         try{
-
-                           current_val = current_val + 1;
-                           //System.out.println("Row: =>" + current_val);
-                           if (current_val == total_limit) {
-                             proceed = true;
-                           } else {
-                             proceed = false;
-                           }
-
-                         } catch(Exception e){
-                           System.out.println("Crashhh in 
CollectedGroupConverter :" + e);
-                           e.printStackTrace();
-                         }
-                       }
-
-                       protected Result getNextResult() throws ExecException {
-                         return poCollectedGroup.getNextTuple(proceed);
-                       }
-                     };
-                   }
-      };
+                                         @Override
+                                         protected void attach(Tuple tuple) {
+                                                 
poCollectedGroup.setInputs(null);
+                                                 
poCollectedGroup.attachInput(tuple);
+                                         }
+
+                                         @Override
+                                         protected Result getNextResult() 
throws ExecException {
+                                             return 
poCollectedGroup.getNextTuple();
+                                         }
+
+                                         @Override
+                                         protected void endOfInput() {
+                                                 
poCollectedGroup.getParentPlan().endOfAllInput = true;
+                                         }
+                                 };
+                               }
+                 };
                }
        }
 }
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
 Wed May 13 03:55:31 2015
@@ -22,9 +22,7 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
-
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -46,6 +44,7 @@ import org.apache.spark.rdd.RDD;
  */
 @SuppressWarnings({"serial" })
 public class ForEachConverter implements POConverter<Tuple, Tuple, POForEach> {
+
     private byte[] confBytes;
 
     public ForEachConverter(byte[] confBytes) {
@@ -106,16 +105,22 @@ public class ForEachConverter implements
 
                 @Override
                 public Iterator<Tuple> iterator() {
-                    return new POOutputConsumerIterator(input) {
+                    return new OutputConsumerIterator(input) {
 
+                        @Override
                         protected void attach(Tuple tuple) {
                             poForEach.setInputs(null);
                             poForEach.attachInput(tuple);
                         }
 
+                        @Override
                         protected Result getNextResult() throws ExecException {
                             return poForEach.getNextTuple();
                         }
+
+                        @Override
+                        protected void endOfInput() {
+                        }
                     };
                 }
             };

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
 Wed May 13 03:55:31 2015
@@ -56,16 +56,22 @@ public class LimitConverter implements P
             return new Iterable<Tuple>() {
 
                 public Iterator<Tuple> iterator() {
-                    return new POOutputConsumerIterator(tuples) {
+                    return new OutputConsumerIterator(tuples) {
 
+                        @Override
                         protected void attach(Tuple tuple) {
                             poLimit.setInputs(null);
                             poLimit.attachInput(tuple);
                         }
 
+                        @Override
                         protected Result getNextResult() throws ExecException {
                             return poLimit.getNextTuple();
                         }
+
+                        @Override
+                        protected void endOfInput() {
+                        }
                     };
                 }
             };

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java?rev=1679131&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
 Wed May 13 03:55:31 2015
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+
+abstract class OutputConsumerIterator implements java.util.Iterator<Tuple> {
+    private final java.util.Iterator<Tuple> input;
+    private Result result = null;
+    private boolean returned = true;
+    private boolean done = false;
+
+    OutputConsumerIterator(java.util.Iterator<Tuple> input) {
+        this.input = input;
+    }
+
+    abstract protected void attach(Tuple tuple);
+
+    abstract protected Result getNextResult() throws ExecException;
+
+    /**
+     * Certain operators may buffer the output.
+     * We need to flush the last set of records from such operators,
+     * when we encounter the last input record, before calling
+     * getNextTuple() for the last time.
+     */
+    abstract protected void endOfInput();
+
+    private void readNext() {
+        while (true) {
+            try {
+                // result is set in hasNext() call and returned
+                // to the user in next() call
+                if (result != null && !returned) {
+                    return;
+                }
+
+                if (result == null) {
+                    if (!input.hasNext()) {
+                        done = true;
+                        return;
+                    }
+                    Tuple v1 = input.next();
+                    attach(v1);
+                }
+
+                if (!input.hasNext()) {
+                    endOfInput();
+                }
+
+                result = getNextResult();
+                returned = false;
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        returned = false;
+                        break;
+                    case POStatus.STATUS_NULL:
+                        returned = true;
+                        break;
+                    case POStatus.STATUS_EOP:
+                        done = !input.hasNext();
+                        if (!done) {
+                            result = null;
+                        }
+                        break;
+                    case POStatus.STATUS_ERR:
+                        throw new RuntimeException("Error while processing " + 
result);
+                }
+
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        readNext();
+        return !done;
+    }
+
+    @Override
+    public Tuple next() {
+        readNext();
+        if (done) {
+            throw new RuntimeException("Past the end. Call hasNext() before 
calling next()");
+        }
+        if (result == null || result.returnStatus != POStatus.STATUS_OK) {
+            throw new RuntimeException("Unexpected response code from 
operator: "
+                    + result);
+        }
+        returned = true;
+        return (Tuple) result.result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 Wed May 13 03:55:31 2015
@@ -17,44 +17,28 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.UDFContext;
-import scala.Function1;
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.pig.data.Tuple;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.SparkContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 public class StreamConverter implements
-               POConverter<Tuple, Tuple, POStreamSpark> {
-       private static Log LOG = LogFactory.getLog(StreamConverter.class);
+               POConverter<Tuple, Tuple, POStream> {
        private byte[] confBytes;
 
        public StreamConverter(byte[] confBytes) {
@@ -63,36 +47,21 @@ public class StreamConverter implements
 
        @Override
        public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-                       POStreamSpark poStream) throws IOException {
+                       POStream poStream) throws IOException {
                SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
                RDD<Tuple> rdd = predecessors.get(0);
-               RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
-               long count = 0;
-               try {
-                       count = rdd2.count();
-               } catch (Exception e) {
-                       System.out.println("Crash in StreamConverter :" + e);
-                       LOG.info("Crash in StreamConverter ", e);
-               }
-               StreamFunction streamFunction = new StreamFunction(poStream, 
count,
-                               confBytes);
-               return rdd2.toJavaRDD().mapPartitions(streamFunction, 
true).rdd();
+               StreamFunction streamFunction = new StreamFunction(poStream, 
confBytes);
+               return rdd.toJavaRDD().mapPartitions(streamFunction, 
true).rdd();
        }
 
        private static class StreamFunction implements
                        FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
-               private POStreamSpark poStream;
-               private long total_limit;
-               private long current_val;
-               private boolean proceed = false;
+               private POStream poStream;
                private transient JobConf jobConf;
                private byte[] confBytes;
 
-               private StreamFunction(POStreamSpark poStream, long total_limit,
-                               byte[] confBytes) {
+               private StreamFunction(POStream poStream, byte[] confBytes) {
                        this.poStream = poStream;
-                       this.total_limit = total_limit;
-                       this.current_val = 0;
                        this.confBytes = confBytes;
                }
 
@@ -119,35 +88,27 @@ public class StreamConverter implements
                        return new Iterable<Tuple>() {
                                @Override
                                public Iterator<Tuple> iterator() {
-                                       return new 
POOutputConsumerIterator(input) {
+                                       return new 
OutputConsumerIterator(input) {
 
                                                @Override
                                                protected void attach(Tuple 
tuple) {
                                                        
poStream.setInputs(null);
                                                        
poStream.attachInput(tuple);
-                                                       try {
-                                                               current_val = 
current_val + 1;
-                                                               if (current_val 
== total_limit) {
-                                                                       proceed 
= true;
-                                                               } else {
-                                                                       proceed 
= false;
-                                                               }
-
-                                                       } catch (Exception e) {
-                                                               
System.out.println("Crash in StreamConverter :"
-                                                                               
+ e);
-                                                               LOG.info("Crash 
in StreamConverter ", e);
-                                                       }
                                                }
 
                                                @Override
                                                protected Result 
getNextResult() throws ExecException {
-                                                       Result result = 
poStream.getNextTuple(proceed);
+                                                       Result result = 
poStream.getNextTuple();
                                                        return result;
                                                }
+
+                                               @Override
+                                               protected void endOfInput() {
+                                                       
poStream.setFetchable(true);
+                                               }
                                        };
                                }
                        };
                }
        }
-}
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Wed May 13 03:55:31 2015
@@ -57,7 +57,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
-import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -421,8 +420,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitStream(POStream op) throws VisitorException {
                try {
-                       POStreamSpark poStreamSpark = new POStreamSpark(op);
-                       addToPlan(poStreamSpark);
+                       addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
                        int errCode = 2034;

Modified: pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java?rev=1679131&r1=1679130&r2=1679131&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java Wed May 
13 03:55:31 2015
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.CollectableLoadFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -337,4 +336,4 @@ public class TestCollectedGroup {
         }
 
     }
-}
+}
\ No newline at end of file


Reply via email to