Author: szita
Date: Mon May 29 15:19:17 2017
New Revision: 1796647
URL: http://svn.apache.org/viewvc?rev=1796647&view=rev
Log:
PIG-5194: HiveUDF fails with Spark exec type (szita)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1796647&r1=1796646&r2=1796647&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 29 15:19:17 2017
@@ -109,6 +109,8 @@ OPTIMIZATIONS
Â
BUG FIXES
+PIG-5194: HiveUDF fails with Spark exec type (szita)
+
PIG-5231: PigStorage with -schema may produce inconsistent outputs with more
fields (knoguchi)
PIG-5224: Extra foreach from ColumnPrune preventing Accumulator usage
(knoguchi)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1796647&r1=1796646&r2=1796647&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Mon May 29 15:19:17 2017
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -387,8 +388,8 @@ public class SparkLauncher extends Launc
for (String file : shipFiles.split(",")) {
File shipFile = new File(file.trim());
if (shipFile.exists()) {
- addResourceToSparkJobWorkingDirectory(shipFile,
- shipFile.getName(), ResourceType.FILE);
+ addResourceToSparkJobWorkingDirectory(shipFile,
shipFile.getName(),
+ shipFile.getName().endsWith(".jar") ?
ResourceType.JAR : ResourceType.FILE );
}
}
}
@@ -429,7 +430,7 @@ public class SparkLauncher extends Launc
Set<String> allJars = new HashSet<String>();
LOG.info("Add default jars to Spark Job");
allJars.addAll(JarManager.getDefaultJars());
- LOG.info("Add extra jars to Spark Job");
+ LOG.info("Add script jars to Spark Job");
for (String scriptJar : pigContext.scriptJars) {
allJars.add(scriptJar);
}
@@ -448,6 +449,11 @@ public class SparkLauncher extends Launc
allJars.add(scriptUDFJarFile.getAbsolutePath().toString());
}
+ LOG.info("Add extra jars to Spark job");
+ for (URL extraJarUrl : pigContext.extraJars) {
+ allJars.add(extraJarUrl.getFile());
+ }
+
//Upload all jars to spark working directory
for (String jar : allJars) {
File jarFile = new File(jar);
Modified: pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java?rev=1796647&r1=1796646&r2=1796647&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java Mon May 29 15:19:17 2017
@@ -135,11 +135,11 @@ public class HiveUDAF extends HiveUDFBas
return;
}
- if (m == Mode.PARTIAL1 || m == Mode.FINAL) {
+ if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2 || m ==
Mode.FINAL) {
intermediateOutputObjectInspector =
evaluator.init(Mode.PARTIAL1, inputObjectInspectorAsArray);
intermediateOutputTypeInfo =
TypeInfoUtils.getTypeInfoFromObjectInspector(intermediateOutputObjectInspector);
- if (m == Mode.FINAL) {
+ if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
intermediateInputObjectInspector =
HiveUtils.createObjectInspector(intermediateOutputTypeInfo);
intermediateInputObjectInspectorAsArray = new
ObjectInspector[] {intermediateInputObjectInspector};
outputObjectInspector = evaluator.init(Mode.FINAL,
intermediateInputObjectInspectorAsArray);
@@ -208,20 +208,41 @@ public class HiveUDAF extends HiveUDFBas
}
static public class Initial extends EvalFunc<Tuple> {
+
+ private boolean inited = false;
+ private String funcName;
+ ConstantObjectInspectInfo constantsInfo;
+ private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new
SchemaAndEvaluatorInfo();
+ private static TupleFactory tf = TupleFactory.getInstance();
+
public Initial(String funcName) {
+ this.funcName = funcName;
}
- public Initial(String funcName, String params) {
+ public Initial(String funcName, String params) throws IOException {
+ this.funcName = funcName;
+ constantsInfo = ConstantObjectInspectInfo.parse(params);
}
@Override
public Tuple exec(Tuple input) throws IOException {
-
- DataBag bg = (DataBag) input.get(0);
- Tuple tp = null;
- if(bg.iterator().hasNext()) {
- tp = bg.iterator().next();
+ try {
+ if (!inited) {
+ schemaAndEvaluatorInfo.init(getInputSchema(),
instantiateUDAF(funcName), Mode.PARTIAL1, constantsInfo);
+ inited = true;
+ }
+ DataBag b = (DataBag)input.get(0);
+ AggregationBuffer agg =
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+ for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ List inputs =
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(t);
+ schemaAndEvaluatorInfo.evaluator.iterate(agg,
inputs.toArray());
+ }
+ Object returnValue =
schemaAndEvaluatorInfo.evaluator.terminatePartial(agg);
+ Tuple result = tf.newTuple();
+ result.append(HiveUtils.convertHiveToPig(returnValue,
schemaAndEvaluatorInfo.intermediateOutputObjectInspector, null));
+ return result;
+ } catch (Exception e) {
+ throw new IOException(e);
}
-
- return tp;
}
}
@@ -244,15 +265,14 @@ public class HiveUDAF extends HiveUDFBas
public Tuple exec(Tuple input) throws IOException {
try {
if (!inited) {
- schemaAndEvaluatorInfo.init(getInputSchema(),
instantiateUDAF(funcName), Mode.PARTIAL1, constantsInfo);
+ schemaAndEvaluatorInfo.init(getInputSchema(),
instantiateUDAF(funcName), Mode.PARTIAL2, constantsInfo);
inited = true;
}
DataBag b = (DataBag)input.get(0);
AggregationBuffer agg =
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
Tuple t = it.next();
- List inputs =
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(t);
- schemaAndEvaluatorInfo.evaluator.iterate(agg,
inputs.toArray());
+ schemaAndEvaluatorInfo.evaluator.merge(agg, t.get(0));
}
Object returnValue =
schemaAndEvaluatorInfo.evaluator.terminatePartial(agg);
Tuple result = tf.newTuple();