Author: daijy
Date: Tue Apr  7 21:24:48 2015
New Revision: 1671956

URL: http://svn.apache.org/r1671956
Log:
PIG-3294: Allow Pig use Hive UDFs

Added:
    pig/trunk/src/org/apache/hadoop/
    pig/trunk/src/org/apache/hadoop/hive/
    pig/trunk/src/org/apache/hadoop/hive/serde2/
    pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/
    pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/
    
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
    
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
    
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
    
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
    
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
    
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
    pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java
    pig/trunk/src/org/apache/pig/builtin/HiveUDF.java
    pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java
    pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java
    pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java
    pig/trunk/src/org/apache/pig/impl/util/hive/
    pig/trunk/src/org/apache/pig/impl/util/hive/HiveUtils.java
    
pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/EvalFunc.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
    pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/src/org/apache/pig/impl/util/orc/OrcUtils.java
    
pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    
pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/test/e2e/pig/build.xml
    pig/trunk/test/e2e/pig/tests/nightly.conf

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr  7 21:24:48 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-3294: Allow Pig use Hive UDFs (daijy)
+
 PIG-4476: Fix logging in AvroStorage* classes and SchemaTuple class (rdsr via 
rohini)
 
 PIG-4458: Support UDFs in a FOREACH Before a Merge Join (wattsinabox via daijy)

Added: 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java?rev=1671956&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
 (added)
+++ 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
 Tue Apr  7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.BooleanWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantBooleanObjectInspector extends
+    JavaBooleanObjectInspector implements ConstantObjectInspector {
+  private Boolean value;
+
+  public JavaConstantBooleanObjectInspector(Boolean value) {
+    super();
+    this.value = value;
+  }
+
+  @Override
+  public Object getWritableConstantValue() {
+    if (value==null) {
+      return null;
+    }
+    return new BooleanWritable(value);
+  }
+}

Added: 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java?rev=1671956&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
 (added)
+++ 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
 Tue Apr  7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantDoubleObjectInspector extends
+    JavaDoubleObjectInspector implements ConstantObjectInspector {
+  private Double value;
+
+  public JavaConstantDoubleObjectInspector(Double value) {
+    super();
+    this.value = value;
+  }
+
+  @Override
+  public Object getWritableConstantValue() {
+    if (value==null) {
+      return null;
+    }
+    return new DoubleWritable(value);
+  }
+}

Added: 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java?rev=1671956&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
 (added)
+++ 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
 Tue Apr  7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.FloatWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantFloatObjectInspector extends JavaFloatObjectInspector
+    implements ConstantObjectInspector {
+  private Float value;
+
+  public JavaConstantFloatObjectInspector(Float value) {
+    super();
+    this.value = value;
+  }
+
+  @Override
+  public Object getWritableConstantValue() {
+    if (value==null) {
+      return null;
+    }
+    return new FloatWritable(value);
+  }
+}

Added: 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java?rev=1671956&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
 (added)
+++ 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
 Tue Apr  7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantIntObjectInspector extends JavaIntObjectInspector
+    implements ConstantObjectInspector {
+  private Integer value;
+
+  public JavaConstantIntObjectInspector(Integer value) {
+    super();
+    this.value = value;
+  }
+
+  @Override
+  public Object getWritableConstantValue() {
+    if (value==null) {
+      return null;
+    }
+    return new IntWritable(value);
+  }
+}

Added: 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java?rev=1671956&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
 (added)
+++ 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
 Tue Apr  7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantLongObjectInspector extends JavaLongObjectInspector
+    implements ConstantObjectInspector {
+  private Long value;
+
+  public JavaConstantLongObjectInspector(Long value) {
+    super();
+    this.value = value;
+  }
+
+  @Override
+  public Object getWritableConstantValue() {
+    if (value==null) {
+      return null;
+    }
+    return new LongWritable(value);
+  }
+}

Added: 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java?rev=1671956&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
 (added)
+++ 
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
 Tue Apr  7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.Text;
+
+public class JavaConstantStringObjectInspector extends
+    JavaStringObjectInspector implements ConstantObjectInspector {
+  private String value;
+
+  public JavaConstantStringObjectInspector(String value) {
+    super();
+    this.value = value;
+  }
+
+  @Override
+  public Object getWritableConstantValue() {
+    if (value==null) {
+      return null;
+    }
+    return new Text(value);
+  }
+}

Modified: pig/trunk/src/org/apache/pig/EvalFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/EvalFunc.java (original)
+++ pig/trunk/src/org/apache/pig/EvalFunc.java Tue Apr  7 21:24:48 2015
@@ -362,4 +362,11 @@ public abstract class EvalFunc<T>  {
     public boolean allowCompileTimeCalculation() {
         return false;
     }
+
+    public boolean needEndOfAllInputProcessing() {
+        return false;
+    }
+
+    public void setEndOfAllInput(boolean endOfAllInput) {
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Tue Apr  7 21:24:48 2015
@@ -643,7 +643,11 @@ public class JobControlCompiler{
                             }
                         }
                         if (!predeployed) {
-                            
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
+                            if (jar.getFile().toLowerCase().endsWith(".jar")) {
+                                
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
+                            } else {
+                                setupDistributedCache(pigContext, conf, new 
String[] {jar.getPath()}, true);
+                            }
                         }
                     }
 
@@ -804,6 +808,7 @@ public class JobControlCompiler{
             // set parent plan in all operators in map and reduce plans
             // currently the parent plan is really used only when POStream is 
present in the plan
             new PhyPlanSetter(mro.mapPlan).visit();
+            new PhyPlanSetter(mro.combinePlan).visit();
             new PhyPlanSetter(mro.reducePlan).visit();
 
             // this call modifies the ReplFiles names of POFRJoin operators

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Tue Apr  7 21:24:48 2015
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullableTuple;
@@ -98,6 +99,7 @@ public class PigCombiner {
                 pigContext = 
(PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
                 if (pigContext.getLog4jProperties()!=null)
                     
PropertyConfigurator.configure(pigContext.getLog4jProperties());
+                MapRedUtil.setupUDFContext(context.getConfiguration());
 
                 cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
                         .get("pig.combinePlan"));

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 Tue Apr  7 21:24:48 2015
@@ -112,7 +112,7 @@ public abstract class PigGenericMapBase
             return;
         }
 
-        
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP,
 "false").equals("true")) {
+        
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP,
 "false").equals("true") && !mp.isEmpty()) {
             // If there is a stream in the pipeline or if this map job belongs 
to merge-join we could
             // potentially have more to process - so lets
             // set the flag stating that all map input has been sent

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 Tue Apr  7 21:24:48 2015
@@ -609,7 +609,7 @@ public class PigGenericMapReduce {
                 return;
             }
 
-            if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", 
"false").equals("true")) {
+            if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", 
"false").equals("true") && !rp.isEmpty()) {
                 // If there is a stream in the pipeline we could
                 // potentially have more to process - so lets
                 // set the flag stating that all map input has been sent

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java?rev=1671956&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
 Tue Apr  7 21:24:48 2015
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UDFEndOfAllInputNeededVisitor extends PhyPlanVisitor {
+
+    private boolean needed = false;
+
+    public UDFEndOfAllInputNeededVisitor(PhysicalPlan plan) {
+        super(plan, new DependencyOrderWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+    }
+
+    @Override
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+        super.visitUserFunc(userFunc);
+        if (userFunc.needEndOfAllInputProcessing()) {
+            needed = true;
+        }
+    }
+
+    public boolean needEndOfAllInputProcessing() {
+        return needed;
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 Tue Apr  7 21:24:48 2015
@@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
@@ -110,6 +111,18 @@ public class EndOfAllInputSetter extends
             endOfAllInputFlag = true;
         }
 
+        @Override
+        public void visitPOForEach(POForEach foreach) throws VisitorException {
+            try {
+                if (foreach.needEndOfAllInputProcessing()) {
+                    endOfAllInputFlag = true;
+                }
+            } catch (Exception e) {
+                throw new VisitorException(e);
+            }
+        }
+
+
         /**
          * @return if end of all input is present
          */

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Tue Apr  7 21:24:48 2015
@@ -351,6 +351,9 @@ public class POUserFunc extends Expressi
                         }
                     }
                 } else {
+                    if (parentPlan!=null && parentPlan.endOfAllInput && 
needEndOfAllInputProcessing()) {
+                        func.setEndOfAllInput(true);
+                    }
                     if (executor != null) {
                         result.result = executor.monitorExec((Tuple) 
result.result);
                     } else {
@@ -655,4 +658,7 @@ public class POUserFunc extends Expressi
         }
     }
 
+    public boolean needEndOfAllInputProcessing() {
+        return getFunc().needEndOfAllInputProcessing();
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Tue Apr  7 21:24:48 2015
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFEndOfAllInputNeededVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -41,6 +42,7 @@ import org.apache.pig.data.SchemaTupleFa
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.TupleMaker;
+import org.apache.pig.data.UnlimitedNullTuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -92,10 +94,14 @@ public class POForEach extends PhysicalO
 
     protected Tuple inpTuple;
 
+    protected boolean endOfAllInputProcessed = false;
+
     // Indicate the foreach statement can only in map side
     // Currently only used in MR cross (See PIG-4175)
     protected boolean mapSideOnly = false;
 
+    protected Boolean endOfAllInputProcessing = false;
+
     private Schema schema;
 
     public POForEach(OperatorKey k) {
@@ -244,13 +250,21 @@ public class POForEach extends PhysicalO
             //read
             while (true) {
                 inp = processInput();
-                if (inp.returnStatus == POStatus.STATUS_EOP ||
-                        inp.returnStatus == POStatus.STATUS_ERR) {
+
+                if (inp.returnStatus == POStatus.STATUS_ERR) {
                     return inp;
                 }
                 if (inp.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 }
+                if (inp.returnStatus == POStatus.STATUS_EOP) {
+                    if (parentPlan!=null && parentPlan.endOfAllInput && 
!endOfAllInputProcessed && endOfAllInputProcessing) {
+                        // continue pull one more output
+                        inp = new Result(POStatus.STATUS_OK, new 
UnlimitedNullTuple());
+                    } else {
+                        return inp;
+                    }
+                }
 
                 attachInputToPlans((Tuple) inp.result);
                 inpTuple = (Tuple)inp.result;
@@ -357,6 +371,9 @@ public class POForEach extends PhysicalO
 
 
         if(its == null) {
+            if (endOfAllInputProcessed) {
+                return new Result(POStatus.STATUS_EOP, null);
+            }
             //getNext being called for the first time OR starting with a set 
of new data from inputs
             its = new Iterator[noItems];
             bags = new Object[noItems];
@@ -424,6 +441,9 @@ public class POForEach extends PhysicalO
                     its[i] = null;
                 }
             }
+            if (parentPlan!=null && parentPlan.endOfAllInput && 
endOfAllInputProcessing) {
+                endOfAllInputProcessed = true;
+            }
         }
 
         // if accumulating, we haven't got data yet for some fields, just 
return
@@ -794,4 +814,21 @@ public class POForEach extends PhysicalO
     public boolean isMapSideOnly() {
         return mapSideOnly;
     }
+
+    public boolean needEndOfAllInputProcessing() throws ExecException {
+        try {
+            for (PhysicalPlan innerPlan : inputPlans) {
+                UDFEndOfAllInputNeededVisitor endOfAllInputNeededVisitor
+                     = new UDFEndOfAllInputNeededVisitor(innerPlan);
+                endOfAllInputNeededVisitor.visit();
+                if (endOfAllInputNeededVisitor.needEndOfAllInputProcessing()) {
+                    endOfAllInputProcessing = true;
+                    return true;
+                }
+            }
+            return false;
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Tue Apr  7 21:24:48 2015
@@ -354,6 +354,7 @@ public class TezDagBuilder extends TezOp
         OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
 
         Configuration conf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        UDFContext.getUDFContext().serialize(conf);
         if (!combinePlan.isEmpty()) {
             addCombiner(combinePlan, to, conf);
         }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 Tue Apr  7 21:24:48 2015
@@ -206,7 +206,9 @@ public class CombinerOptimizerUtil {
                 // fix projection and function time for algebraic functions in 
reduce foreach
                 for (Pair<PhysicalOperator, PhysicalPlan> op2plan : 
algebraicOps) {
                     setProjectInput(op2plan.first, op2plan.second, 
op2newpos.get(op2plan.first));
+                    byte resultType = op2plan.first.getResultType();
                     
((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL);
+                    op2plan.first.setResultType(resultType);
                 }
 
                 // we have modified the foreach inner plans - so set them again

Added: pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java Tue Apr  7 21:24:48 2015
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.hive.HiveUtils;
+
+/**
+ * Use Hive UDAF or GenericUDAF.
+ * Example:
+ *      define avg HiveUDAF('avg');
+ *      A = load 'mydata' as (name:chararray, num:double);
+ *      B = group A by name;
+ *      C = foreach B generate group, avg(A.num);
+ */
+public class HiveUDAF extends HiveUDFBase implements Algebraic {
+
+    private boolean inited = false;
+    private String funcName;
+    private String params;
+    private GenericUDAFResolver udaf;
+
+    static class SchemaAndEvaluatorInfo {
+        private TypeInfo inputTypeInfo;
+        private TypeInfo outputTypeInfo;
+        private TypeInfo intermediateOutputTypeInfo;
+        private ObjectInspector[] inputObjectInspectorAsArray;
+        private ObjectInspector[] intermediateInputObjectInspectorAsArray;
+        private StructObjectInspector inputObjectInspector;
+        private ObjectInspector intermediateInputObjectInspector;
+        private ObjectInspector intermediateOutputObjectInspector;
+        private ObjectInspector outputObjectInspector;
+        private GenericUDAFEvaluator evaluator;
+
+        private static TypeInfo getInputTypeInfo(Schema inputSchema) throws 
IOException {
+            FieldSchema innerFieldSchema = 
inputSchema.getField(0).schema.getField(0);
+            ResourceFieldSchema rfs = new 
ResourceFieldSchema(innerFieldSchema);
+
+            TypeInfo inputTypeInfo = HiveUtils.getTypeInfo(rfs);
+            return inputTypeInfo;
+        }
+
+        private static ObjectInspector[] 
getInputObjectInspectorAsArray(TypeInfo inputTypeInfo,
+                ConstantObjectInspectInfo constantsInfo) throws IOException {
+
+            StructObjectInspector inputObjectInspector = 
(StructObjectInspector)HiveUtils.createObjectInspector(inputTypeInfo);
+
+            ObjectInspector[] arguments = new 
ObjectInspector[inputObjectInspector.getAllStructFieldRefs().size()];
+            for (int 
i=0;i<inputObjectInspector.getAllStructFieldRefs().size();i++) {
+                if (constantsInfo!=null && constantsInfo.get(i)!=null) {
+                    arguments[i] = constantsInfo.get(i);
+                } else {
+                    arguments[i] = 
inputObjectInspector.getAllStructFieldRefs().get(i).getFieldObjectInspector();
+                }
+            }
+            return arguments;
+        }
+
+        private static GenericUDAFEvaluator getEvaluator(TypeInfo 
inputTypeInfo, GenericUDAFResolver udaf,
+                ConstantObjectInspectInfo constantsInfo) throws IOException {
+            try {
+                GenericUDAFEvaluator evaluator;
+                ObjectInspector[] arguments = 
getInputObjectInspectorAsArray(inputTypeInfo, constantsInfo);
+                
+                if (udaf instanceof GenericUDAFResolver2) {
+                    GenericUDAFParameterInfo paramInfo =
+                            new SimpleGenericUDAFParameterInfo(
+                                    arguments, false, false);
+                    evaluator = 
((GenericUDAFResolver2)udaf).getEvaluator(paramInfo);
+                } else {
+                    TypeInfo[] params = ((StructTypeInfo)inputTypeInfo)
+                            .getAllStructFieldTypeInfos().toArray(new 
TypeInfo[0]);
+                    evaluator = udaf.getEvaluator(params);
+                }
+                return evaluator;
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+
+        private void init(Schema inputSchema, GenericUDAFResolver udaf, Mode 
m, ConstantObjectInspectInfo constantsInfo) throws IOException {
+            try {
+                inputTypeInfo = getInputTypeInfo(inputSchema);
+                inputObjectInspector = 
(StructObjectInspector)HiveUtils.createObjectInspector(inputTypeInfo);
+                if (constantsInfo!=null) {
+                    
constantsInfo.injectConstantObjectInspector(inputObjectInspector);
+                }
+                inputObjectInspectorAsArray = 
getInputObjectInspectorAsArray(inputTypeInfo, constantsInfo);
+                evaluator = getEvaluator(inputTypeInfo, udaf, constantsInfo);
+
+                if (m == Mode.COMPLETE) {
+                    outputObjectInspector = evaluator.init(Mode.COMPLETE, 
inputObjectInspectorAsArray);
+                    outputTypeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(outputObjectInspector);
+                    return;
+                }
+
+                if (m == Mode.PARTIAL1 || m == Mode.FINAL) {
+                    intermediateOutputObjectInspector = 
evaluator.init(Mode.PARTIAL1, inputObjectInspectorAsArray);
+                    intermediateOutputTypeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(intermediateOutputObjectInspector);
+
+                    if (m == Mode.FINAL) {
+                        intermediateInputObjectInspector = 
HiveUtils.createObjectInspector(intermediateOutputTypeInfo);
+                        intermediateInputObjectInspectorAsArray = new 
ObjectInspector[] {intermediateInputObjectInspector};
+                        outputObjectInspector = evaluator.init(Mode.FINAL, 
intermediateInputObjectInspectorAsArray);
+                        outputTypeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(outputObjectInspector);
+                    }
+                }
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new 
SchemaAndEvaluatorInfo();
+
+    ConstantObjectInspectInfo constantsInfo;
+
+    public HiveUDAF(String funcName) throws IOException, 
InstantiationException, IllegalAccessException {
+        this.funcName = funcName;
+        this.udaf = instantiateUDAF(funcName);
+    }
+
+    public HiveUDAF(String funcName, String params) throws IOException, 
InstantiationException, IllegalAccessException {
+        this(funcName);
+        constantsInfo = ConstantObjectInspectInfo.parse(params);
+        this.params = params;
+    }
+
+    private static GenericUDAFResolver instantiateUDAF(String funcName) throws 
IOException, InstantiationException, IllegalAccessException {
+        GenericUDAFResolver udaf;
+        Class hiveUDAFClass = resolveFunc(funcName);
+        if (UDAF.class.isAssignableFrom(hiveUDAFClass)) {
+            udaf =  new GenericUDAFBridge((UDAF)hiveUDAFClass.newInstance());
+        } else if (GenericUDAFResolver.class.isAssignableFrom(hiveUDAFClass)){
+            udaf = (GenericUDAFResolver)hiveUDAFClass.newInstance();
+        } else {
+            throw new IOException(getErrorMessage(hiveUDAFClass));
+        }
+        return udaf;
+    }
+
+    @Override
+    public String getInitial() {
+        if (params == null) {
+            return Initial.class.getName() + "('" + funcName + "')";
+        } else {
+            return Initial.class.getName() + "('" + funcName + "," + params + 
"')";
+        }
+    }
+
+    @Override
+    public String getIntermed() {
+        if (params == null) {
+            return Intermediate.class.getName() + "('" + funcName + "')";
+        } else {
+            return Intermediate.class.getName() + "('" + funcName + "," + 
params + "')";
+        }
+    }
+
+    @Override
+    public String getFinal() {
+        if (params == null) {
+            return Final.class.getName() + "('" + funcName + "')";
+        } else {
+            return Final.class.getName() + "('" + funcName + "," + params + 
"')";
+        }
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+        public Initial(String funcName) {
+        }
+        public Initial(String funcName, String params) {
+        }
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+
+            DataBag bg = (DataBag) input.get(0);
+            Tuple tp = null;
+            if(bg.iterator().hasNext()) {
+                tp = bg.iterator().next();
+            }
+
+            return tp;
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+
+        private boolean inited = false;
+        private String funcName;
+        ConstantObjectInspectInfo constantsInfo;
+        private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new 
SchemaAndEvaluatorInfo();
+        private static TupleFactory tf = TupleFactory.getInstance();
+
+        public Intermediate(String funcName) {
+            this.funcName = funcName;
+        }
+        public Intermediate(String funcName, String params) throws IOException 
{
+            this.funcName = funcName;
+            constantsInfo = ConstantObjectInspectInfo.parse(params);
+        }
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                if (!inited) {
+                    schemaAndEvaluatorInfo.init(getInputSchema(), 
instantiateUDAF(funcName), Mode.PARTIAL1, constantsInfo);
+                    inited = true;
+                }
+                DataBag b = (DataBag)input.get(0);
+                AggregationBuffer agg = 
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+                for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
+                    Tuple t = it.next();
+                    List inputs = 
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(t);
+                    schemaAndEvaluatorInfo.evaluator.iterate(agg, 
inputs.toArray());
+                }
+                Object returnValue = 
schemaAndEvaluatorInfo.evaluator.terminatePartial(agg);
+                Tuple result = tf.newTuple();
+                result.append(HiveUtils.convertHiveToPig(returnValue, 
schemaAndEvaluatorInfo.intermediateOutputObjectInspector, null));
+                return result;
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    static public class Final extends EvalFunc<Object> {
+
+        private boolean inited = false;
+        private String funcName;
+        ConstantObjectInspectInfo constantsInfo;
+        private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new 
SchemaAndEvaluatorInfo();
+
+        public Final(String funcName) {
+            this.funcName = funcName;
+        }
+        public Final(String funcName, String params) throws IOException {
+            this.funcName = funcName;
+            constantsInfo = ConstantObjectInspectInfo.parse(params);
+        }
+        @Override
+        public Object exec(Tuple input) throws IOException {
+            try {
+                if (!inited) {
+                    schemaAndEvaluatorInfo.init(getInputSchema(), 
instantiateUDAF(funcName), Mode.FINAL, constantsInfo);
+                    
schemaAndEvaluatorInfo.evaluator.configure(instantiateMapredContext());
+                    inited = true;
+                }
+                DataBag b = (DataBag)input.get(0);
+                AggregationBuffer agg = 
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+                for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
+                    Tuple t = it.next();
+                    schemaAndEvaluatorInfo.evaluator.merge(agg, t.get(0));
+                }
+
+                Object returnValue = 
schemaAndEvaluatorInfo.evaluator.terminate(agg);
+                Object result = HiveUtils.convertHiveToPig(returnValue, 
schemaAndEvaluatorInfo.outputObjectInspector, null);
+                return result;
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    @Override
+    public Object exec(Tuple input) throws IOException {
+        try {
+            if (!inited) {
+                schemaAndEvaluatorInfo.init(getInputSchema(), 
instantiateUDAF(funcName), Mode.COMPLETE, constantsInfo);
+                inited = true;
+            }
+            AggregationBuffer agg = 
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+            DataBag bg = (DataBag) input.get(0);
+            Tuple tp = null;
+            for (Iterator<Tuple> it = bg.iterator(); it.hasNext();) {
+                tp = it.next();
+                List inputs = 
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(tp);
+                schemaAndEvaluatorInfo.evaluator.iterate(agg, 
inputs.toArray());
+            }
+            Object returnValue = 
schemaAndEvaluatorInfo.evaluator.terminate(agg);
+            Object result = HiveUtils.convertHiveToPig(returnValue, 
schemaAndEvaluatorInfo.outputObjectInspector, null);
+            return result;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        try {
+            if (!inited) {
+                schemaAndEvaluatorInfo.init(getInputSchema(), 
instantiateUDAF(funcName), Mode.COMPLETE, constantsInfo);
+                inited = true;
+            }
+
+            ResourceFieldSchema rfs = 
HiveUtils.getResourceFieldSchema(schemaAndEvaluatorInfo.outputTypeInfo);
+            ResourceSchema outputSchema = new ResourceSchema();
+            outputSchema.setFields(new ResourceFieldSchema[] {rfs});
+            return Schema.getPigSchema(outputSchema);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/HiveUDF.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDF.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDF.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDF.java Tue Apr  7 21:24:48 2015
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.hive.HiveUtils;
+
+/**
+ * Use Hive UDF or GenericUDF.
+ * Example:
+ *      define sin HiveUDF('sin');
+ *      A = load 'mydata' as (num:double);
+ *      B = foreach A generate sin(num);
+ * HiveUDF takes an optional second parameter if the Hive UDF require constant 
parameters
+ *      define in_file HiveUDF('in_file', '(null, "names.txt")');
+ */
+public class HiveUDF extends HiveUDFBase {
+
+    private boolean inited = false;
+    private GenericUDF evalUDF;
+
+    static class SchemaInfo {
+        private StructObjectInspector inputObjectInspector;
+        private ObjectInspector outputObjectInspector;
+
+        private void init(Schema inputSchema, GenericUDF evalUDF, 
ConstantObjectInspectInfo constantsInfo) throws IOException {
+            ResourceSchema rs = new ResourceSchema(inputSchema);
+            ResourceFieldSchema wrappedTupleFieldSchema = new 
ResourceFieldSchema();
+            wrappedTupleFieldSchema.setType(DataType.TUPLE);
+            wrappedTupleFieldSchema.setSchema(rs);
+
+            TypeInfo ti = HiveUtils.getTypeInfo(wrappedTupleFieldSchema);
+            inputObjectInspector = 
(StructObjectInspector)HiveUtils.createObjectInspector(ti);
+
+            try {
+                ObjectInspector[] arguments = new 
ObjectInspector[inputSchema.size()];
+                for (int i=0;i<inputSchema.size();i++) {
+                    if (constantsInfo!=null && !constantsInfo.isEmpty() && 
constantsInfo.get(i)!=null) {
+                        arguments[i] = constantsInfo.get(i);
+                    } else {
+                        arguments[i] = 
inputObjectInspector.getAllStructFieldRefs().get(i).getFieldObjectInspector();
+                    }
+                }
+                outputObjectInspector = evalUDF.initialize(arguments);
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    SchemaInfo schemaInfo = new SchemaInfo();
+
+    ConstantObjectInspectInfo constantsInfo;
+
+    public HiveUDF(String funcName) throws IOException, 
InstantiationException, IllegalAccessException {
+        Class hiveUDFClass = resolveFunc(funcName);
+        if (UDF.class.isAssignableFrom(hiveUDFClass)) {
+            evalUDF =  new GenericUDFBridge(funcName, false, 
hiveUDFClass.getName());
+        } else if (GenericUDF.class.isAssignableFrom(hiveUDFClass)){
+            evalUDF = (GenericUDF)hiveUDFClass.newInstance();
+        } else {
+            throw new IOException(getErrorMessage(hiveUDFClass));
+        }
+    }
+
+    public HiveUDF(String funcName, String params) throws IOException, 
InstantiationException, IllegalAccessException {
+        this(funcName);
+        constantsInfo = ConstantObjectInspectInfo.parse(params);
+    }
+
+    @Override
+    public Object exec(Tuple input) throws IOException {
+        if (!inited) {
+            evalUDF.configure(instantiateMapredContext());
+            schemaInfo.init(getInputSchema(), evalUDF, constantsInfo);
+            inited = true;
+        }
+        List inputs = 
schemaInfo.inputObjectInspector.getStructFieldsDataAsList(input);
+        DeferredObject[] arguments = new DeferredObject[inputs.size()];
+        for (int i=0 ; i<inputs.size() ; i++) {
+            arguments[i] = new DeferredJavaObject(inputs.get(i));
+        }
+        try {
+            Object returnValue = evalUDF.evaluate(arguments);
+            return HiveUtils.convertHiveToPig(returnValue, 
schemaInfo.outputObjectInspector, null);
+        } catch (HiveException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public List<String> getShipFiles() {
+        try {
+            if (!inited) {
+                schemaInfo.init(getInputSchema(), evalUDF, constantsInfo);
+                inited = true;
+            }
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+        List<String> files = super.getShipFiles();
+        if (evalUDF.getRequiredFiles() != null) {
+            files.addAll(Arrays.asList(evalUDF.getRequiredFiles()));
+        }
+        if (evalUDF.getRequiredJars() != null) {
+            files.addAll(Arrays.asList(evalUDF.getRequiredJars()));
+        }
+
+        return files;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        try {
+            if (!inited) {
+                schemaInfo.init(getInputSchema(), evalUDF, constantsInfo);
+                inited = true;
+            }
+            ResourceFieldSchema rfs = HiveUtils.getResourceFieldSchema(
+                    
TypeInfoUtils.getTypeInfoFromObjectInspector(schemaInfo.outputObjectInspector));
+            ResourceSchema outputSchema = new ResourceSchema();
+            outputSchema.setFields(new ResourceFieldSchema[] {rfs});
+            return Schema.getPigSchema(outputSchema);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void finish() {
+        try {
+            evalUDF.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java Tue Apr  7 21:24:48 
2015
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.FunctionInfo;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.EvalFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.util.hive.HiveUtils;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+import com.esotericsoftware.kryo.Serializer;
+
+abstract class HiveUDFBase extends EvalFunc<Object> {
+
+    static protected class ConstantObjectInspectInfo {
+        ConstantObjectInspector[] constants;
+        static ConstantObjectInspectInfo parse(String params) throws 
IOException {
+            ConstantObjectInspectInfo info = new ConstantObjectInspectInfo();
+            params = params.replaceAll("\"", "'");
+            Object constant = Utils.parseConstant(params);
+            if (DataType.findType(constant) == DataType.TUPLE) {
+                Tuple t = (Tuple)constant;
+                info.constants = new ConstantObjectInspector[t.size()];
+                for (int i=0;i<t.size();i++) {
+                    if (t.get(i) != null) {
+                        info.constants[i] = 
HiveUtils.getConstantObjectInspector(t.get(i));
+                    }
+                }
+            } else {
+                info.constants = new ConstantObjectInspector[1];
+                info.constants[0] = 
HiveUtils.getConstantObjectInspector(constant);
+            }
+            return info;
+        }
+        boolean isEmpty() {
+            return (constants == null);
+        }
+        int size() {
+            return constants.length;
+        }
+        ConstantObjectInspector get(int i) {
+            return constants[i];
+        }
+        void injectConstantObjectInspector(StructObjectInspector 
inputObjectInspector) {
+            if (!isEmpty()) {
+                for (int i=0;i<size();i++) {
+                    if (get(i)!=null) {
+                        StructField origField = 
inputObjectInspector.getAllStructFieldRefs().get(i);
+                        StructField newfield = new 
HiveUtils.Field(origField.getFieldName(), get(i), i);
+                        
((List<HiveUtils.Field>)inputObjectInspector.getAllStructFieldRefs()).set(i, 
(HiveUtils.Field)newfield);
+                    }
+                }
+            }
+        }
+    }
+
+    static protected Class resolveFunc(String funcName) throws IOException {
+        String className = funcName;
+        Class udfClass;
+        if (FunctionRegistry.getFunctionNames().contains(funcName)) {
+            FunctionInfo func = FunctionRegistry.getFunctionInfo(funcName);
+            udfClass = func.getFunctionClass();
+        } else {
+            udfClass = PigContext.resolveClassName(className);
+            if (udfClass == null) {
+                throw new IOException("Cannot find Hive UDF " + funcName);
+            }
+        }
+        return udfClass;
+    }
+
+    /**
+     * A constant of Reporter type that does nothing.
+     */
+    static protected class HiveReporter implements Reporter {
+        PigStatusReporter rep;
+        HiveReporter(PigStatusReporter rep) {
+            this.rep = rep;
+        }
+        public void setStatus(String s) {
+            rep.setStatus(s);
+        }
+        public void progress() {
+            rep.progress();
+        }
+        public Counter getCounter(Enum<?> name) {
+            try {
+                Counters counters = new Counters();
+                counters.incrCounter(name, rep.getCounter(name).getValue());
+                return counters.findCounter(name);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        public Counter getCounter(String group, String name) {
+            try {
+                Counters counters = new Counters();
+                counters.incrCounter(group, name, rep.getCounter(group, 
name).getValue());
+                return counters.findCounter(group, name);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        public void incrCounter(Enum<?> key, long amount) {
+            rep.incrCounter(key, amount);
+        }
+        public void incrCounter(String group, String counter, long amount) {
+            rep.incrCounter(group, counter, amount);
+        }
+        public InputSplit getInputSplit() throws UnsupportedOperationException 
{
+          throw new UnsupportedOperationException("NULL reporter has no 
input");
+        }
+        public float getProgress() {
+          return 0;
+        }
+    };
+
+    protected static MapredContext instantiateMapredContext() {
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+        boolean isMap = conf.getBoolean(MRConfiguration.TASK_IS_MAP, false);
+        if (conf.get("exectype").startsWith("TEZ")) {
+            isMap = true;
+            HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+        }
+        MapredContext context = MapredContext.init(isMap, new 
JobConf(UDFContext.getUDFContext().getJobConf()));
+        context.setReporter(new HiveReporter(PigStatusReporter.getInstance()));
+        return context;
+    }
+
+    @Override
+    public List<String> getShipFiles() {
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        Class hadoopVersionShimsClass;
+        try {
+            hadoopVersionShimsClass = 
Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+                    hadoopVersion + "Shims");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + 
"ShimsClass in classpath");
+        }
+        List<String> files = FuncUtils.getShipFiles(new Class[] 
{GenericUDF.class,
+                PrimitiveObjectInspector.class, HiveConf.class, 
Serializer.class, ShimLoader.class, 
+                hadoopVersionShimsClass, HadoopShimsSecure.class, 
Collector.class});
+        return files;
+    }
+
+    static protected String getErrorMessage(Class c) {
+        StringBuffer message = new StringBuffer("Please declare " + 
c.getName() + " as ");
+        if (UDF.class.isAssignableFrom(c) || 
GenericUDF.class.isAssignableFrom(c)) {
+            message.append(HiveUDF.class.getName());
+        } else if (GenericUDTF.class.isAssignableFrom(c)) {
+            message.append(HiveUDTF.class.getName());
+        } else if (UDAF.class.isAssignableFrom(c) || 
GenericUDAFResolver.class.isAssignableFrom(c)) {
+            message.append(HiveUDAF.class.getName());
+        } else {
+            message = new StringBuffer(c.getName() + " is not Hive UDF");
+        }
+        return message.toString();
+    }
+}

Added: pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java Tue Apr  7 21:24:48 2015
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.hive.HiveUtils;
+
+/**
+ * Use Hive GenericUDTF.
+ * Example:
+ *      define explode HiveUDTF('explode');
+ *      A = load 'mydata' as (a0:{(b0:chararray)});
+ *      B = foreach A generate flatten(explode(a0));
+ */
+public class HiveUDTF extends HiveUDFBase {
+
+    private boolean inited = false;
+    private GenericUDTF udtf;
+    private boolean endOfAllInput = false;
+
+    static class SchemaInfo {
+        StructObjectInspector inputObjectInspector;
+        ObjectInspector outputObjectInspector;
+        private void init(Schema inputSchema, GenericUDTF udtf, 
ConstantObjectInspectInfo constantsInfo) throws IOException {
+            ResourceSchema rs = new ResourceSchema(inputSchema);
+            ResourceFieldSchema wrappedTupleFieldSchema = new 
ResourceFieldSchema();
+            wrappedTupleFieldSchema.setType(DataType.TUPLE);
+            wrappedTupleFieldSchema.setSchema(rs);
+
+            TypeInfo ti = HiveUtils.getTypeInfo(wrappedTupleFieldSchema);
+            inputObjectInspector = 
(StructObjectInspector)HiveUtils.createObjectInspector(ti);
+            if (constantsInfo!=null) {
+                
constantsInfo.injectConstantObjectInspector(inputObjectInspector);
+            }
+
+            try {
+                outputObjectInspector = udtf.initialize(inputObjectInspector);
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    SchemaInfo schemaInfo = new SchemaInfo();
+
+    ConstantObjectInspectInfo constantsInfo;
+
+    private static BagFactory bf = BagFactory.getInstance();
+    private HiveUDTFCollector collector = null;
+
+    public HiveUDTF(String funcName) throws InstantiationException, 
IllegalAccessException, IOException {
+        Class hiveUDTFClass = resolveFunc(funcName);
+        if (GenericUDTF.class.isAssignableFrom(hiveUDTFClass)) {
+            udtf =  (GenericUDTF)hiveUDTFClass.newInstance();
+        } else {
+            throw new IOException(getErrorMessage(hiveUDTFClass));
+        }
+    }
+
+    public HiveUDTF(String funcName, String params) throws 
InstantiationException, IllegalAccessException, IOException {
+        this(funcName);
+        constantsInfo = ConstantObjectInspectInfo.parse(params);
+    }
+    @Override
+    public Object exec(Tuple input) throws IOException {
+        if (!inited) {
+            udtf.configure(instantiateMapredContext());
+            schemaInfo.init(getInputSchema(), udtf, constantsInfo);
+            inited = true;
+        }
+
+        if (collector == null) {
+            collector = new HiveUDTFCollector();
+            udtf.setCollector(collector);
+        } else {
+            collector.init();
+        }
+
+        try {
+            if (!endOfAllInput) {
+                udtf.process(input.getAll().toArray());
+            } else {
+                udtf.close();
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        return collector.getBag();
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        try {
+            if (!inited) {
+                schemaInfo.init(getInputSchema(), udtf, constantsInfo);
+                inited = true;
+            }
+            ResourceFieldSchema rfs = HiveUtils.getResourceFieldSchema(
+                    
TypeInfoUtils.getTypeInfoFromObjectInspector(schemaInfo.outputObjectInspector));
+
+            ResourceSchema tupleSchema = new ResourceSchema();
+            tupleSchema.setFields(new ResourceFieldSchema[] {rfs});
+
+            ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
+            bagFieldSchema.setType(DataType.BAG);
+            bagFieldSchema.setSchema(tupleSchema);
+
+            ResourceSchema bagSchema = new ResourceSchema();
+            bagSchema.setFields(new ResourceFieldSchema[] {bagFieldSchema});
+            return Schema.getPigSchema(bagSchema);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    class HiveUDTFCollector implements Collector {
+        DataBag bag = bf.newDefaultBag();
+
+        public void init() {
+            bag.clear();
+        }
+
+        @Override
+        public void collect(Object input) throws HiveException {
+            try {
+                Tuple outputTuple = (Tuple)HiveUtils.convertHiveToPig(input, 
schemaInfo.outputObjectInspector, null);
+                if (outputTuple.size()==1 && outputTuple.get(0) instanceof 
Tuple) {
+                    bag.add((Tuple)outputTuple.get(0));
+                } else {
+                    bag.add(outputTuple);
+                }
+            } catch(Exception e) {
+                throw new HiveException(e);
+            }
+        }
+
+        public DataBag getBag() {
+            return bag;
+        }
+    }
+
+    @Override
+    public boolean needEndOfAllInputProcessing() {
+        return true;
+    }
+
+    @Override
+    public void setEndOfAllInput(boolean endOfAllInput) {
+        this.endOfAllInput = endOfAllInput;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Tue Apr  7 21:24:48 
2015
@@ -91,7 +91,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
-import org.apache.pig.impl.util.orc.OrcUtils;
+import org.apache.pig.impl.util.hive.HiveUtils;
 import org.joda.time.DateTime;
 
 import com.esotericsoftware.kryo.io.Input;
@@ -235,7 +235,7 @@ public class OrcStorage extends LoadFunc
             typeInfo = 
(TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + 
SchemaSignatureSuffix));
         }
         if (oi==null) {
-            oi = OrcUtils.createObjectInspector(typeInfo);
+            oi = HiveUtils.createObjectInspector(typeInfo);
         }
     }
 
@@ -244,7 +244,7 @@ public class OrcStorage extends LoadFunc
         ResourceFieldSchema fs = new ResourceFieldSchema();
         fs.setType(DataType.TUPLE);
         fs.setSchema(rs);
-        typeInfo = OrcUtils.getTypeInfo(fs);
+        typeInfo = HiveUtils.getTypeInfo(fs);
         Properties p = 
UDFContext.getUDFContext().getUDFProperties(this.getClass());
         p.setProperty(signature + SchemaSignatureSuffix, 
ObjectSerializer.serialize(typeInfo));
     }
@@ -376,7 +376,7 @@ public class OrcStorage extends LoadFunc
             }
             Object value = in.getCurrentValue();
 
-            Tuple t = (Tuple)OrcUtils.convertOrcToPig(value, oi, 
mRequiredColumns);
+            Tuple t = (Tuple)HiveUtils.convertHiveToPig(value, oi, 
mRequiredColumns);
             return t;
         } catch (InterruptedException e) {
             int errCode = 6018;
@@ -438,7 +438,7 @@ public class OrcStorage extends LoadFunc
             }
         }
 
-        ResourceFieldSchema fs = OrcUtils.getResourceFieldSchema(typeInfo);
+        ResourceFieldSchema fs = HiveUtils.getResourceFieldSchema(typeInfo);
         return fs.getSchema();
     }
 

Added: pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java (added)
+++ pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java Tue Apr  7 
21:24:48 2015
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+public class UnlimitedNullTuple extends AbstractTuple {
+
+    @Override
+    public int size() {
+        throw new RuntimeException("Unimplemented");
+    }
+
+    @Override
+    public Object get(int fieldNum) throws ExecException {
+        return null;
+    }
+
+    @Override
+    public List<Object> getAll() {
+        throw new RuntimeException("Unimplemented");
+    }
+
+    @Override
+    public void set(int fieldNum, Object val) throws ExecException {
+        throw new ExecException("Unimplemented");
+    }
+
+    @Override
+    public void append(Object val) {
+        throw new RuntimeException("Unimplemented");
+    }
+
+    @Override
+    public long getMemorySize() {
+        throw new RuntimeException("Unimplemented");
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+        throw new IOException("Unimplemented");
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+        throw new IOException("Unimplemented");
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        throw new RuntimeException("Unimplemented");
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Tue Apr  7 21:24:48 2015
@@ -255,6 +255,13 @@ public class Utils {
         return schema;
     }
 
+    public static Object parseConstant(String constantString) throws 
ParserException {
+        QueryParserDriver queryParser = new QueryParserDriver( new 
PigContext(),
+                "util", new HashMap<String, String>() ) ;
+        Object constant = queryParser.parseConstant(constantString);
+        return constant;
+    }
+
     /**
      * This method adds FieldSchema of 'input source tag/path' as the first
      * field. This will be called only when PigStorage is invoked with


Reply via email to