Author: szita
Date: Mon May 29 15:19:17 2017
New Revision: 1796647

URL: http://svn.apache.org/viewvc?rev=1796647&view=rev
Log:
PIG-5194: HiveUDF fails with Spark exec type (szita)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1796647&r1=1796646&r2=1796647&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 29 15:19:17 2017
@@ -109,6 +109,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5194: HiveUDF fails with Spark exec type (szita)
+
 PIG-5231: PigStorage with -schema may produce inconsistent outputs with more 
fields (knoguchi)
 
 PIG-5224: Extra foreach from ColumnPrune preventing Accumulator usage 
(knoguchi)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1796647&r1=1796646&r2=1796647&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Mon May 29 15:19:17 2017
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -387,8 +388,8 @@ public class SparkLauncher extends Launc
             for (String file : shipFiles.split(",")) {
                 File shipFile = new File(file.trim());
                 if (shipFile.exists()) {
-                    addResourceToSparkJobWorkingDirectory(shipFile,
-                            shipFile.getName(), ResourceType.FILE);
+                    addResourceToSparkJobWorkingDirectory(shipFile, 
shipFile.getName(),
+                            shipFile.getName().endsWith(".jar") ? 
ResourceType.JAR : ResourceType.FILE );
                 }
             }
         }
@@ -429,7 +430,7 @@ public class SparkLauncher extends Launc
         Set<String> allJars = new HashSet<String>();
         LOG.info("Add default jars to Spark Job");
         allJars.addAll(JarManager.getDefaultJars());
-        LOG.info("Add extra jars to Spark Job");
+        LOG.info("Add script jars to Spark Job");
         for (String scriptJar : pigContext.scriptJars) {
             allJars.add(scriptJar);
         }
@@ -448,6 +449,11 @@ public class SparkLauncher extends Launc
             allJars.add(scriptUDFJarFile.getAbsolutePath().toString());
         }
 
+        LOG.info("Add extra jars to Spark job");
+        for (URL extraJarUrl : pigContext.extraJars) {
+            allJars.add(extraJarUrl.getFile());
+        }
+
         //Upload all jars to spark working directory
         for (String jar : allJars) {
             File jarFile = new File(jar);

Modified: pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java?rev=1796647&r1=1796646&r2=1796647&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java Mon May 29 15:19:17 2017
@@ -135,11 +135,11 @@ public class HiveUDAF extends HiveUDFBas
                     return;
                 }
 
-                if (m == Mode.PARTIAL1 || m == Mode.FINAL) {
+                if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2 || m == 
Mode.FINAL) {
                     intermediateOutputObjectInspector = 
evaluator.init(Mode.PARTIAL1, inputObjectInspectorAsArray);
                     intermediateOutputTypeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(intermediateOutputObjectInspector);
 
-                    if (m == Mode.FINAL) {
+                    if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
                         intermediateInputObjectInspector = 
HiveUtils.createObjectInspector(intermediateOutputTypeInfo);
                         intermediateInputObjectInspectorAsArray = new 
ObjectInspector[] {intermediateInputObjectInspector};
                         outputObjectInspector = evaluator.init(Mode.FINAL, 
intermediateInputObjectInspectorAsArray);
@@ -208,20 +208,41 @@ public class HiveUDAF extends HiveUDFBas
     }
 
     static public class Initial extends EvalFunc<Tuple> {
+
+        private boolean inited = false;
+        private String funcName;
+        ConstantObjectInspectInfo constantsInfo;
+        private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new 
SchemaAndEvaluatorInfo();
+        private static TupleFactory tf = TupleFactory.getInstance();
+
         public Initial(String funcName) {
+            this.funcName = funcName;
         }
-        public Initial(String funcName, String params) {
+        public Initial(String funcName, String params) throws IOException {
+            this.funcName = funcName;
+            constantsInfo = ConstantObjectInspectInfo.parse(params);
         }
         @Override
         public Tuple exec(Tuple input) throws IOException {
-
-            DataBag bg = (DataBag) input.get(0);
-            Tuple tp = null;
-            if(bg.iterator().hasNext()) {
-                tp = bg.iterator().next();
+            try {
+                if (!inited) {
+                    schemaAndEvaluatorInfo.init(getInputSchema(), 
instantiateUDAF(funcName), Mode.PARTIAL1, constantsInfo);
+                    inited = true;
+                }
+                DataBag b = (DataBag)input.get(0);
+                AggregationBuffer agg = 
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+                for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
+                    Tuple t = it.next();
+                    List inputs = 
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(t);
+                    schemaAndEvaluatorInfo.evaluator.iterate(agg, 
inputs.toArray());
+                }
+                Object returnValue = 
schemaAndEvaluatorInfo.evaluator.terminatePartial(agg);
+                Tuple result = tf.newTuple();
+                result.append(HiveUtils.convertHiveToPig(returnValue, 
schemaAndEvaluatorInfo.intermediateOutputObjectInspector, null));
+                return result;
+            } catch (Exception e) {
+                throw new IOException(e);
             }
-
-            return tp;
         }
     }
 
@@ -244,15 +265,14 @@ public class HiveUDAF extends HiveUDFBas
         public Tuple exec(Tuple input) throws IOException {
             try {
                 if (!inited) {
-                    schemaAndEvaluatorInfo.init(getInputSchema(), 
instantiateUDAF(funcName), Mode.PARTIAL1, constantsInfo);
+                    schemaAndEvaluatorInfo.init(getInputSchema(), 
instantiateUDAF(funcName), Mode.PARTIAL2, constantsInfo);
                     inited = true;
                 }
                 DataBag b = (DataBag)input.get(0);
                 AggregationBuffer agg = 
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
                 for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
                     Tuple t = it.next();
-                    List inputs = 
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(t);
-                    schemaAndEvaluatorInfo.evaluator.iterate(agg, 
inputs.toArray());
+                    schemaAndEvaluatorInfo.evaluator.merge(agg, t.get(0));
                 }
                 Object returnValue = 
schemaAndEvaluatorInfo.evaluator.terminatePartial(agg);
                 Tuple result = tf.newTuple();


Reply via email to