Author: daijy
Date: Tue Apr 7 21:24:48 2015
New Revision: 1671956
URL: http://svn.apache.org/r1671956
Log:
PIG-3294: Allow Pig use Hive UDFs
Added:
pig/trunk/src/org/apache/hadoop/
pig/trunk/src/org/apache/hadoop/hive/
pig/trunk/src/org/apache/hadoop/hive/serde2/
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java
pig/trunk/src/org/apache/pig/builtin/HiveUDF.java
pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java
pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java
pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java
pig/trunk/src/org/apache/pig/impl/util/hive/
pig/trunk/src/org/apache/pig/impl/util/hive/HiveUtils.java
pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/EvalFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/src/org/apache/pig/impl/util/orc/OrcUtils.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/test/e2e/pig/build.xml
pig/trunk/test/e2e/pig/tests/nightly.conf
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr 7 21:24:48 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-3294: Allow Pig use Hive UDFs (daijy)
+
PIG-4476: Fix logging in AvroStorage* classes and SchemaTuple class (rdsr via
rohini)
PIG-4458: Support UDFs in a FOREACH Before a Merge Join (wattsinabox via daijy)
Added:
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java?rev=1671956&view=auto
==============================================================================
---
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
(added)
+++
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java
Tue Apr 7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.BooleanWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantBooleanObjectInspector extends
+ JavaBooleanObjectInspector implements ConstantObjectInspector {
+ private Boolean value;
+
+ public JavaConstantBooleanObjectInspector(Boolean value) {
+ super();
+ this.value = value;
+ }
+
+ @Override
+ public Object getWritableConstantValue() {
+ if (value==null) {
+ return null;
+ }
+ return new BooleanWritable(value);
+ }
+}
Added:
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java?rev=1671956&view=auto
==============================================================================
---
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
(added)
+++
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java
Tue Apr 7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantDoubleObjectInspector extends
+ JavaDoubleObjectInspector implements ConstantObjectInspector {
+ private Double value;
+
+ public JavaConstantDoubleObjectInspector(Double value) {
+ super();
+ this.value = value;
+ }
+
+ @Override
+ public Object getWritableConstantValue() {
+ if (value==null) {
+ return null;
+ }
+ return new DoubleWritable(value);
+ }
+}
Added:
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java?rev=1671956&view=auto
==============================================================================
---
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
(added)
+++
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java
Tue Apr 7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.FloatWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantFloatObjectInspector extends JavaFloatObjectInspector
+ implements ConstantObjectInspector {
+ private Float value;
+
+ public JavaConstantFloatObjectInspector(Float value) {
+ super();
+ this.value = value;
+ }
+
+ @Override
+ public Object getWritableConstantValue() {
+ if (value==null) {
+ return null;
+ }
+ return new FloatWritable(value);
+ }
+}
Added:
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java?rev=1671956&view=auto
==============================================================================
---
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
(added)
+++
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java
Tue Apr 7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantIntObjectInspector extends JavaIntObjectInspector
+ implements ConstantObjectInspector {
+ private Integer value;
+
+ public JavaConstantIntObjectInspector(Integer value) {
+ super();
+ this.value = value;
+ }
+
+ @Override
+ public Object getWritableConstantValue() {
+ if (value==null) {
+ return null;
+ }
+ return new IntWritable(value);
+ }
+}
Added:
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java?rev=1671956&view=auto
==============================================================================
---
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
(added)
+++
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java
Tue Apr 7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+public class JavaConstantLongObjectInspector extends JavaLongObjectInspector
+ implements ConstantObjectInspector {
+ private Long value;
+
+ public JavaConstantLongObjectInspector(Long value) {
+ super();
+ this.value = value;
+ }
+
+ @Override
+ public Object getWritableConstantValue() {
+ if (value==null) {
+ return null;
+ }
+ return new LongWritable(value);
+ }
+}
Added:
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java?rev=1671956&view=auto
==============================================================================
---
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
(added)
+++
pig/trunk/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java
Tue Apr 7 21:24:48 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.objectinspector.primitive;
+
+/**
+ * This class will be in Hive code base begin 1.2.0 (HIVE-9766).
+ * Will remove once we switch to use Hive 1.2.0
+ */
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.io.Text;
+
+public class JavaConstantStringObjectInspector extends
+ JavaStringObjectInspector implements ConstantObjectInspector {
+ private String value;
+
+ public JavaConstantStringObjectInspector(String value) {
+ super();
+ this.value = value;
+ }
+
+ @Override
+ public Object getWritableConstantValue() {
+ if (value==null) {
+ return null;
+ }
+ return new Text(value);
+ }
+}
Modified: pig/trunk/src/org/apache/pig/EvalFunc.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/EvalFunc.java (original)
+++ pig/trunk/src/org/apache/pig/EvalFunc.java Tue Apr 7 21:24:48 2015
@@ -362,4 +362,11 @@ public abstract class EvalFunc<T> {
public boolean allowCompileTimeCalculation() {
return false;
}
+
+ public boolean needEndOfAllInputProcessing() {
+ return false;
+ }
+
+ public void setEndOfAllInput(boolean endOfAllInput) {
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Tue Apr 7 21:24:48 2015
@@ -643,7 +643,11 @@ public class JobControlCompiler{
}
}
if (!predeployed) {
-
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
+ if (jar.getFile().toLowerCase().endsWith(".jar")) {
+
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
+ } else {
+ setupDistributedCache(pigContext, conf, new
String[] {jar.getPath()}, true);
+ }
}
}
@@ -804,6 +808,7 @@ public class JobControlCompiler{
// set parent plan in all operators in map and reduce plans
// currently the parent plan is really used only when POStream is
present in the plan
new PhyPlanSetter(mro.mapPlan).visit();
+ new PhyPlanSetter(mro.combinePlan).visit();
new PhyPlanSetter(mro.reducePlan).visit();
// this call modifies the ReplFiles names of POFRJoin operators
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
Tue Apr 7 21:24:48 2015
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullableTuple;
@@ -98,6 +99,7 @@ public class PigCombiner {
pigContext =
(PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
+ MapRedUtil.setupUDFContext(context.getConfiguration());
cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
.get("pig.combinePlan"));
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
Tue Apr 7 21:24:48 2015
@@ -112,7 +112,7 @@ public abstract class PigGenericMapBase
return;
}
-
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP,
"false").equals("true")) {
+
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP,
"false").equals("true") && !mp.isEmpty()) {
// If there is a stream in the pipeline or if this map job belongs
to merge-join we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
Tue Apr 7 21:24:48 2015
@@ -609,7 +609,7 @@ public class PigGenericMapReduce {
return;
}
- if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce",
"false").equals("true")) {
+ if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce",
"false").equals("true") && !rp.isEmpty()) {
// If there is a stream in the pipeline we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java?rev=1671956&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java
Tue Apr 7 21:24:48 2015
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UDFEndOfAllInputNeededVisitor extends PhyPlanVisitor {
+
+ private boolean needed = false;
+
+ public UDFEndOfAllInputNeededVisitor(PhysicalPlan plan) {
+ super(plan, new DependencyOrderWalker<PhysicalOperator,
PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+ super.visitUserFunc(userFunc);
+ if (userFunc.needEndOfAllInputProcessing()) {
+ needed = true;
+ }
+ }
+
+ public boolean needEndOfAllInputProcessing() {
+ return needed;
+ }
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
Tue Apr 7 21:24:48 2015
@@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
@@ -110,6 +111,18 @@ public class EndOfAllInputSetter extends
endOfAllInputFlag = true;
}
+ @Override
+ public void visitPOForEach(POForEach foreach) throws VisitorException {
+ try {
+ if (foreach.needEndOfAllInputProcessing()) {
+ endOfAllInputFlag = true;
+ }
+ } catch (Exception e) {
+ throw new VisitorException(e);
+ }
+ }
+
+
/**
* @return if end of all input is present
*/
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Tue Apr 7 21:24:48 2015
@@ -351,6 +351,9 @@ public class POUserFunc extends Expressi
}
}
} else {
+ if (parentPlan!=null && parentPlan.endOfAllInput &&
needEndOfAllInputProcessing()) {
+ func.setEndOfAllInput(true);
+ }
if (executor != null) {
result.result = executor.monitorExec((Tuple)
result.result);
} else {
@@ -655,4 +658,7 @@ public class POUserFunc extends Expressi
}
}
+ public boolean needEndOfAllInputProcessing() {
+ return getFunc().needEndOfAllInputProcessing();
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
Tue Apr 7 21:24:48 2015
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFEndOfAllInputNeededVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -41,6 +42,7 @@ import org.apache.pig.data.SchemaTupleFa
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.TupleMaker;
+import org.apache.pig.data.UnlimitedNullTuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -92,10 +94,14 @@ public class POForEach extends PhysicalO
protected Tuple inpTuple;
+ protected boolean endOfAllInputProcessed = false;
+
// Indicate the foreach statement can only in map side
// Currently only used in MR cross (See PIG-4175)
protected boolean mapSideOnly = false;
+ protected Boolean endOfAllInputProcessing = false;
+
private Schema schema;
public POForEach(OperatorKey k) {
@@ -244,13 +250,21 @@ public class POForEach extends PhysicalO
//read
while (true) {
inp = processInput();
- if (inp.returnStatus == POStatus.STATUS_EOP ||
- inp.returnStatus == POStatus.STATUS_ERR) {
+
+ if (inp.returnStatus == POStatus.STATUS_ERR) {
return inp;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
+ if (inp.returnStatus == POStatus.STATUS_EOP) {
+ if (parentPlan!=null && parentPlan.endOfAllInput &&
!endOfAllInputProcessed && endOfAllInputProcessing) {
+ // continue pull one more output
+ inp = new Result(POStatus.STATUS_OK, new
UnlimitedNullTuple());
+ } else {
+ return inp;
+ }
+ }
attachInputToPlans((Tuple) inp.result);
inpTuple = (Tuple)inp.result;
@@ -357,6 +371,9 @@ public class POForEach extends PhysicalO
if(its == null) {
+ if (endOfAllInputProcessed) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
//getNext being called for the first time OR starting with a set
of new data from inputs
its = new Iterator[noItems];
bags = new Object[noItems];
@@ -424,6 +441,9 @@ public class POForEach extends PhysicalO
its[i] = null;
}
}
+ if (parentPlan!=null && parentPlan.endOfAllInput &&
endOfAllInputProcessing) {
+ endOfAllInputProcessed = true;
+ }
}
// if accumulating, we haven't got data yet for some fields, just
return
@@ -794,4 +814,21 @@ public class POForEach extends PhysicalO
public boolean isMapSideOnly() {
return mapSideOnly;
}
+
+ public boolean needEndOfAllInputProcessing() throws ExecException {
+ try {
+ for (PhysicalPlan innerPlan : inputPlans) {
+ UDFEndOfAllInputNeededVisitor endOfAllInputNeededVisitor
+ = new UDFEndOfAllInputNeededVisitor(innerPlan);
+ endOfAllInputNeededVisitor.visit();
+ if (endOfAllInputNeededVisitor.needEndOfAllInputProcessing()) {
+ endOfAllInputProcessing = true;
+ return true;
+ }
+ }
+ return false;
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Tue Apr 7 21:24:48 2015
@@ -354,6 +354,7 @@ public class TezDagBuilder extends TezOp
OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
Configuration conf =
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+ UDFContext.getUDFContext().serialize(conf);
if (!combinePlan.isEmpty()) {
addCombiner(combinePlan, to, conf);
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
Tue Apr 7 21:24:48 2015
@@ -206,7 +206,9 @@ public class CombinerOptimizerUtil {
// fix projection and function time for algebraic functions in
reduce foreach
for (Pair<PhysicalOperator, PhysicalPlan> op2plan :
algebraicOps) {
setProjectInput(op2plan.first, op2plan.second,
op2newpos.get(op2plan.first));
+ byte resultType = op2plan.first.getResultType();
((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL);
+ op2plan.first.setResultType(resultType);
}
// we have modified the foreach inner plans - so set them again
Added: pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDAF.java Tue Apr 7 21:24:48 2015
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.hive.HiveUtils;
+
+/**
+ * Use Hive UDAF or GenericUDAF.
+ * Example:
+ * define avg HiveUDAF('avg');
+ * A = load 'mydata' as (name:chararray, num:double);
+ * B = group A by name;
+ * C = foreach B generate group, avg(A.num);
+ */
+public class HiveUDAF extends HiveUDFBase implements Algebraic {
+
+ private boolean inited = false;
+ private String funcName;
+ private String params;
+ private GenericUDAFResolver udaf;
+
+ static class SchemaAndEvaluatorInfo {
+ private TypeInfo inputTypeInfo;
+ private TypeInfo outputTypeInfo;
+ private TypeInfo intermediateOutputTypeInfo;
+ private ObjectInspector[] inputObjectInspectorAsArray;
+ private ObjectInspector[] intermediateInputObjectInspectorAsArray;
+ private StructObjectInspector inputObjectInspector;
+ private ObjectInspector intermediateInputObjectInspector;
+ private ObjectInspector intermediateOutputObjectInspector;
+ private ObjectInspector outputObjectInspector;
+ private GenericUDAFEvaluator evaluator;
+
+ private static TypeInfo getInputTypeInfo(Schema inputSchema) throws
IOException {
+ FieldSchema innerFieldSchema =
inputSchema.getField(0).schema.getField(0);
+ ResourceFieldSchema rfs = new
ResourceFieldSchema(innerFieldSchema);
+
+ TypeInfo inputTypeInfo = HiveUtils.getTypeInfo(rfs);
+ return inputTypeInfo;
+ }
+
+ private static ObjectInspector[]
getInputObjectInspectorAsArray(TypeInfo inputTypeInfo,
+ ConstantObjectInspectInfo constantsInfo) throws IOException {
+
+ StructObjectInspector inputObjectInspector =
(StructObjectInspector)HiveUtils.createObjectInspector(inputTypeInfo);
+
+ ObjectInspector[] arguments = new
ObjectInspector[inputObjectInspector.getAllStructFieldRefs().size()];
+ for (int
i=0;i<inputObjectInspector.getAllStructFieldRefs().size();i++) {
+ if (constantsInfo!=null && constantsInfo.get(i)!=null) {
+ arguments[i] = constantsInfo.get(i);
+ } else {
+ arguments[i] =
inputObjectInspector.getAllStructFieldRefs().get(i).getFieldObjectInspector();
+ }
+ }
+ return arguments;
+ }
+
+ private static GenericUDAFEvaluator getEvaluator(TypeInfo
inputTypeInfo, GenericUDAFResolver udaf,
+ ConstantObjectInspectInfo constantsInfo) throws IOException {
+ try {
+ GenericUDAFEvaluator evaluator;
+ ObjectInspector[] arguments =
getInputObjectInspectorAsArray(inputTypeInfo, constantsInfo);
+
+ if (udaf instanceof GenericUDAFResolver2) {
+ GenericUDAFParameterInfo paramInfo =
+ new SimpleGenericUDAFParameterInfo(
+ arguments, false, false);
+ evaluator =
((GenericUDAFResolver2)udaf).getEvaluator(paramInfo);
+ } else {
+ TypeInfo[] params = ((StructTypeInfo)inputTypeInfo)
+ .getAllStructFieldTypeInfos().toArray(new
TypeInfo[0]);
+ evaluator = udaf.getEvaluator(params);
+ }
+ return evaluator;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void init(Schema inputSchema, GenericUDAFResolver udaf, Mode
m, ConstantObjectInspectInfo constantsInfo) throws IOException {
+ try {
+ inputTypeInfo = getInputTypeInfo(inputSchema);
+ inputObjectInspector =
(StructObjectInspector)HiveUtils.createObjectInspector(inputTypeInfo);
+ if (constantsInfo!=null) {
+
constantsInfo.injectConstantObjectInspector(inputObjectInspector);
+ }
+ inputObjectInspectorAsArray =
getInputObjectInspectorAsArray(inputTypeInfo, constantsInfo);
+ evaluator = getEvaluator(inputTypeInfo, udaf, constantsInfo);
+
+ if (m == Mode.COMPLETE) {
+ outputObjectInspector = evaluator.init(Mode.COMPLETE,
inputObjectInspectorAsArray);
+ outputTypeInfo =
TypeInfoUtils.getTypeInfoFromObjectInspector(outputObjectInspector);
+ return;
+ }
+
+ if (m == Mode.PARTIAL1 || m == Mode.FINAL) {
+ intermediateOutputObjectInspector =
evaluator.init(Mode.PARTIAL1, inputObjectInspectorAsArray);
+ intermediateOutputTypeInfo =
TypeInfoUtils.getTypeInfoFromObjectInspector(intermediateOutputObjectInspector);
+
+ if (m == Mode.FINAL) {
+ intermediateInputObjectInspector =
HiveUtils.createObjectInspector(intermediateOutputTypeInfo);
+ intermediateInputObjectInspectorAsArray = new
ObjectInspector[] {intermediateInputObjectInspector};
+ outputObjectInspector = evaluator.init(Mode.FINAL,
intermediateInputObjectInspectorAsArray);
+ outputTypeInfo =
TypeInfoUtils.getTypeInfoFromObjectInspector(outputObjectInspector);
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new
SchemaAndEvaluatorInfo();
+
+ ConstantObjectInspectInfo constantsInfo;
+
+ public HiveUDAF(String funcName) throws IOException,
InstantiationException, IllegalAccessException {
+ this.funcName = funcName;
+ this.udaf = instantiateUDAF(funcName);
+ }
+
+ public HiveUDAF(String funcName, String params) throws IOException,
InstantiationException, IllegalAccessException {
+ this(funcName);
+ constantsInfo = ConstantObjectInspectInfo.parse(params);
+ this.params = params;
+ }
+
+ private static GenericUDAFResolver instantiateUDAF(String funcName) throws
IOException, InstantiationException, IllegalAccessException {
+ GenericUDAFResolver udaf;
+ Class hiveUDAFClass = resolveFunc(funcName);
+ if (UDAF.class.isAssignableFrom(hiveUDAFClass)) {
+ udaf = new GenericUDAFBridge((UDAF)hiveUDAFClass.newInstance());
+ } else if (GenericUDAFResolver.class.isAssignableFrom(hiveUDAFClass)){
+ udaf = (GenericUDAFResolver)hiveUDAFClass.newInstance();
+ } else {
+ throw new IOException(getErrorMessage(hiveUDAFClass));
+ }
+ return udaf;
+ }
+
+ @Override
+ public String getInitial() {
+ if (params == null) {
+ return Initial.class.getName() + "('" + funcName + "')";
+ } else {
+ return Initial.class.getName() + "('" + funcName + "," + params +
"')";
+ }
+ }
+
+ @Override
+ public String getIntermed() {
+ if (params == null) {
+ return Intermediate.class.getName() + "('" + funcName + "')";
+ } else {
+ return Intermediate.class.getName() + "('" + funcName + "," +
params + "')";
+ }
+ }
+
+ @Override
+ public String getFinal() {
+ if (params == null) {
+ return Final.class.getName() + "('" + funcName + "')";
+ } else {
+ return Final.class.getName() + "('" + funcName + "," + params +
"')";
+ }
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+ public Initial(String funcName) {
+ }
+ public Initial(String funcName, String params) {
+ }
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = null;
+ if(bg.iterator().hasNext()) {
+ tp = bg.iterator().next();
+ }
+
+ return tp;
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+
+ private boolean inited = false;
+ private String funcName;
+ ConstantObjectInspectInfo constantsInfo;
+ private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new
SchemaAndEvaluatorInfo();
+ private static TupleFactory tf = TupleFactory.getInstance();
+
+ public Intermediate(String funcName) {
+ this.funcName = funcName;
+ }
+ public Intermediate(String funcName, String params) throws IOException
{
+ this.funcName = funcName;
+ constantsInfo = ConstantObjectInspectInfo.parse(params);
+ }
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ if (!inited) {
+ schemaAndEvaluatorInfo.init(getInputSchema(),
instantiateUDAF(funcName), Mode.PARTIAL1, constantsInfo);
+ inited = true;
+ }
+ DataBag b = (DataBag)input.get(0);
+ AggregationBuffer agg =
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+ for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ List inputs =
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(t);
+ schemaAndEvaluatorInfo.evaluator.iterate(agg,
inputs.toArray());
+ }
+ Object returnValue =
schemaAndEvaluatorInfo.evaluator.terminatePartial(agg);
+ Tuple result = tf.newTuple();
+ result.append(HiveUtils.convertHiveToPig(returnValue,
schemaAndEvaluatorInfo.intermediateOutputObjectInspector, null));
+ return result;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ static public class Final extends EvalFunc<Object> {
+
+ private boolean inited = false;
+ private String funcName;
+ ConstantObjectInspectInfo constantsInfo;
+ private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new
SchemaAndEvaluatorInfo();
+
+ public Final(String funcName) {
+ this.funcName = funcName;
+ }
+ public Final(String funcName, String params) throws IOException {
+ this.funcName = funcName;
+ constantsInfo = ConstantObjectInspectInfo.parse(params);
+ }
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ try {
+ if (!inited) {
+ schemaAndEvaluatorInfo.init(getInputSchema(),
instantiateUDAF(funcName), Mode.FINAL, constantsInfo);
+
schemaAndEvaluatorInfo.evaluator.configure(instantiateMapredContext());
+ inited = true;
+ }
+ DataBag b = (DataBag)input.get(0);
+ AggregationBuffer agg =
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+ for (Iterator<Tuple> it = b.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ schemaAndEvaluatorInfo.evaluator.merge(agg, t.get(0));
+ }
+
+ Object returnValue =
schemaAndEvaluatorInfo.evaluator.terminate(agg);
+ Object result = HiveUtils.convertHiveToPig(returnValue,
schemaAndEvaluatorInfo.outputObjectInspector, null);
+ return result;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ try {
+ if (!inited) {
+ schemaAndEvaluatorInfo.init(getInputSchema(),
instantiateUDAF(funcName), Mode.COMPLETE, constantsInfo);
+ inited = true;
+ }
+ AggregationBuffer agg =
schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer();
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = null;
+ for (Iterator<Tuple> it = bg.iterator(); it.hasNext();) {
+ tp = it.next();
+ List inputs =
schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(tp);
+ schemaAndEvaluatorInfo.evaluator.iterate(agg,
inputs.toArray());
+ }
+ Object returnValue =
schemaAndEvaluatorInfo.evaluator.terminate(agg);
+ Object result = HiveUtils.convertHiveToPig(returnValue,
schemaAndEvaluatorInfo.outputObjectInspector, null);
+ return result;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ if (!inited) {
+ schemaAndEvaluatorInfo.init(getInputSchema(),
instantiateUDAF(funcName), Mode.COMPLETE, constantsInfo);
+ inited = true;
+ }
+
+ ResourceFieldSchema rfs =
HiveUtils.getResourceFieldSchema(schemaAndEvaluatorInfo.outputTypeInfo);
+ ResourceSchema outputSchema = new ResourceSchema();
+ outputSchema.setFields(new ResourceFieldSchema[] {rfs});
+ return Schema.getPigSchema(outputSchema);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Added: pig/trunk/src/org/apache/pig/builtin/HiveUDF.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDF.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDF.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDF.java Tue Apr 7 21:24:48 2015
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.hive.HiveUtils;
+
+/**
+ * Use Hive UDF or GenericUDF.
+ * Example:
+ * define sin HiveUDF('sin');
+ * A = load 'mydata' as (num:double);
+ * B = foreach A generate sin(num);
+ * HiveUDF takes an optional second parameter if the Hive UDF require constant
parameters
+ * define in_file HiveUDF('in_file', '(null, "names.txt")');
+ */
+public class HiveUDF extends HiveUDFBase {
+
+ private boolean inited = false;
+ private GenericUDF evalUDF;
+
+ static class SchemaInfo {
+ private StructObjectInspector inputObjectInspector;
+ private ObjectInspector outputObjectInspector;
+
+ private void init(Schema inputSchema, GenericUDF evalUDF,
ConstantObjectInspectInfo constantsInfo) throws IOException {
+ ResourceSchema rs = new ResourceSchema(inputSchema);
+ ResourceFieldSchema wrappedTupleFieldSchema = new
ResourceFieldSchema();
+ wrappedTupleFieldSchema.setType(DataType.TUPLE);
+ wrappedTupleFieldSchema.setSchema(rs);
+
+ TypeInfo ti = HiveUtils.getTypeInfo(wrappedTupleFieldSchema);
+ inputObjectInspector =
(StructObjectInspector)HiveUtils.createObjectInspector(ti);
+
+ try {
+ ObjectInspector[] arguments = new
ObjectInspector[inputSchema.size()];
+ for (int i=0;i<inputSchema.size();i++) {
+ if (constantsInfo!=null && !constantsInfo.isEmpty() &&
constantsInfo.get(i)!=null) {
+ arguments[i] = constantsInfo.get(i);
+ } else {
+ arguments[i] =
inputObjectInspector.getAllStructFieldRefs().get(i).getFieldObjectInspector();
+ }
+ }
+ outputObjectInspector = evalUDF.initialize(arguments);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ SchemaInfo schemaInfo = new SchemaInfo();
+
+ ConstantObjectInspectInfo constantsInfo;
+
+ public HiveUDF(String funcName) throws IOException,
InstantiationException, IllegalAccessException {
+ Class hiveUDFClass = resolveFunc(funcName);
+ if (UDF.class.isAssignableFrom(hiveUDFClass)) {
+ evalUDF = new GenericUDFBridge(funcName, false,
hiveUDFClass.getName());
+ } else if (GenericUDF.class.isAssignableFrom(hiveUDFClass)){
+ evalUDF = (GenericUDF)hiveUDFClass.newInstance();
+ } else {
+ throw new IOException(getErrorMessage(hiveUDFClass));
+ }
+ }
+
+ public HiveUDF(String funcName, String params) throws IOException,
InstantiationException, IllegalAccessException {
+ this(funcName);
+ constantsInfo = ConstantObjectInspectInfo.parse(params);
+ }
+
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ if (!inited) {
+ evalUDF.configure(instantiateMapredContext());
+ schemaInfo.init(getInputSchema(), evalUDF, constantsInfo);
+ inited = true;
+ }
+ List inputs =
schemaInfo.inputObjectInspector.getStructFieldsDataAsList(input);
+ DeferredObject[] arguments = new DeferredObject[inputs.size()];
+ for (int i=0 ; i<inputs.size() ; i++) {
+ arguments[i] = new DeferredJavaObject(inputs.get(i));
+ }
+ try {
+ Object returnValue = evalUDF.evaluate(arguments);
+ return HiveUtils.convertHiveToPig(returnValue,
schemaInfo.outputObjectInspector, null);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List<String> getShipFiles() {
+ try {
+ if (!inited) {
+ schemaInfo.init(getInputSchema(), evalUDF, constantsInfo);
+ inited = true;
+ }
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ List<String> files = super.getShipFiles();
+ if (evalUDF.getRequiredFiles() != null) {
+ files.addAll(Arrays.asList(evalUDF.getRequiredFiles()));
+ }
+ if (evalUDF.getRequiredJars() != null) {
+ files.addAll(Arrays.asList(evalUDF.getRequiredJars()));
+ }
+
+ return files;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ if (!inited) {
+ schemaInfo.init(getInputSchema(), evalUDF, constantsInfo);
+ inited = true;
+ }
+ ResourceFieldSchema rfs = HiveUtils.getResourceFieldSchema(
+
TypeInfoUtils.getTypeInfoFromObjectInspector(schemaInfo.outputObjectInspector));
+ ResourceSchema outputSchema = new ResourceSchema();
+ outputSchema.setFields(new ResourceFieldSchema[] {rfs});
+ return Schema.getPigSchema(outputSchema);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finish() {
+ try {
+ evalUDF.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Added: pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java Tue Apr 7 21:24:48
2015
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.FunctionInfo;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.EvalFunc;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.util.hive.HiveUtils;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+import com.esotericsoftware.kryo.Serializer;
+
+abstract class HiveUDFBase extends EvalFunc<Object> {
+
+ static protected class ConstantObjectInspectInfo {
+ ConstantObjectInspector[] constants;
+ static ConstantObjectInspectInfo parse(String params) throws
IOException {
+ ConstantObjectInspectInfo info = new ConstantObjectInspectInfo();
+ params = params.replaceAll("\"", "'");
+ Object constant = Utils.parseConstant(params);
+ if (DataType.findType(constant) == DataType.TUPLE) {
+ Tuple t = (Tuple)constant;
+ info.constants = new ConstantObjectInspector[t.size()];
+ for (int i=0;i<t.size();i++) {
+ if (t.get(i) != null) {
+ info.constants[i] =
HiveUtils.getConstantObjectInspector(t.get(i));
+ }
+ }
+ } else {
+ info.constants = new ConstantObjectInspector[1];
+ info.constants[0] =
HiveUtils.getConstantObjectInspector(constant);
+ }
+ return info;
+ }
+ boolean isEmpty() {
+ return (constants == null);
+ }
+ int size() {
+ return constants.length;
+ }
+ ConstantObjectInspector get(int i) {
+ return constants[i];
+ }
+ void injectConstantObjectInspector(StructObjectInspector
inputObjectInspector) {
+ if (!isEmpty()) {
+ for (int i=0;i<size();i++) {
+ if (get(i)!=null) {
+ StructField origField =
inputObjectInspector.getAllStructFieldRefs().get(i);
+ StructField newfield = new
HiveUtils.Field(origField.getFieldName(), get(i), i);
+
((List<HiveUtils.Field>)inputObjectInspector.getAllStructFieldRefs()).set(i,
(HiveUtils.Field)newfield);
+ }
+ }
+ }
+ }
+ }
+
+ static protected Class resolveFunc(String funcName) throws IOException {
+ String className = funcName;
+ Class udfClass;
+ if (FunctionRegistry.getFunctionNames().contains(funcName)) {
+ FunctionInfo func = FunctionRegistry.getFunctionInfo(funcName);
+ udfClass = func.getFunctionClass();
+ } else {
+ udfClass = PigContext.resolveClassName(className);
+ if (udfClass == null) {
+ throw new IOException("Cannot find Hive UDF " + funcName);
+ }
+ }
+ return udfClass;
+ }
+
+ /**
+ * A constant of Reporter type that does nothing.
+ */
+ static protected class HiveReporter implements Reporter {
+ PigStatusReporter rep;
+ HiveReporter(PigStatusReporter rep) {
+ this.rep = rep;
+ }
+ public void setStatus(String s) {
+ rep.setStatus(s);
+ }
+ public void progress() {
+ rep.progress();
+ }
+ public Counter getCounter(Enum<?> name) {
+ try {
+ Counters counters = new Counters();
+ counters.incrCounter(name, rep.getCounter(name).getValue());
+ return counters.findCounter(name);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ public Counter getCounter(String group, String name) {
+ try {
+ Counters counters = new Counters();
+ counters.incrCounter(group, name, rep.getCounter(group,
name).getValue());
+ return counters.findCounter(group, name);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ public void incrCounter(Enum<?> key, long amount) {
+ rep.incrCounter(key, amount);
+ }
+ public void incrCounter(String group, String counter, long amount) {
+ rep.incrCounter(group, counter, amount);
+ }
+ public InputSplit getInputSplit() throws UnsupportedOperationException
{
+ throw new UnsupportedOperationException("NULL reporter has no
input");
+ }
+ public float getProgress() {
+ return 0;
+ }
+ };
+
+ protected static MapredContext instantiateMapredContext() {
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
+ boolean isMap = conf.getBoolean(MRConfiguration.TASK_IS_MAP, false);
+ if (conf.get("exectype").startsWith("TEZ")) {
+ isMap = true;
+ HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ }
+ MapredContext context = MapredContext.init(isMap, new
JobConf(UDFContext.getUDFContext().getJobConf()));
+ context.setReporter(new HiveReporter(PigStatusReporter.getInstance()));
+ return context;
+ }
+
+ @Override
+ public List<String> getShipFiles() {
+ String hadoopVersion = "20S";
+ if (Utils.isHadoop23() || Utils.isHadoop2()) {
+ hadoopVersion = "23";
+ }
+ Class hadoopVersionShimsClass;
+ try {
+ hadoopVersionShimsClass =
Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+ hadoopVersion + "Shims");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find Hadoop" + hadoopVersion +
"ShimsClass in classpath");
+ }
+ List<String> files = FuncUtils.getShipFiles(new Class[]
{GenericUDF.class,
+ PrimitiveObjectInspector.class, HiveConf.class,
Serializer.class, ShimLoader.class,
+ hadoopVersionShimsClass, HadoopShimsSecure.class,
Collector.class});
+ return files;
+ }
+
+ static protected String getErrorMessage(Class c) {
+ StringBuffer message = new StringBuffer("Please declare " +
c.getName() + " as ");
+ if (UDF.class.isAssignableFrom(c) ||
GenericUDF.class.isAssignableFrom(c)) {
+ message.append(HiveUDF.class.getName());
+ } else if (GenericUDTF.class.isAssignableFrom(c)) {
+ message.append(HiveUDTF.class.getName());
+ } else if (UDAF.class.isAssignableFrom(c) ||
GenericUDAFResolver.class.isAssignableFrom(c)) {
+ message.append(HiveUDAF.class.getName());
+ } else {
+ message = new StringBuffer(c.getName() + " is not Hive UDF");
+ }
+ return message.toString();
+ }
+}
Added: pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDTF.java Tue Apr 7 21:24:48 2015
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.hive.HiveUtils;
+
+/**
+ * Use Hive GenericUDTF.
+ * Example:
+ * define explode HiveUDTF('explode');
+ * A = load 'mydata' as (a0:{(b0:chararray)});
+ * B = foreach A generate flatten(explode(a0));
+ */
+public class HiveUDTF extends HiveUDFBase {
+
+ private boolean inited = false;
+ private GenericUDTF udtf;
+ private boolean endOfAllInput = false;
+
+ static class SchemaInfo {
+ StructObjectInspector inputObjectInspector;
+ ObjectInspector outputObjectInspector;
+ private void init(Schema inputSchema, GenericUDTF udtf,
ConstantObjectInspectInfo constantsInfo) throws IOException {
+ ResourceSchema rs = new ResourceSchema(inputSchema);
+ ResourceFieldSchema wrappedTupleFieldSchema = new
ResourceFieldSchema();
+ wrappedTupleFieldSchema.setType(DataType.TUPLE);
+ wrappedTupleFieldSchema.setSchema(rs);
+
+ TypeInfo ti = HiveUtils.getTypeInfo(wrappedTupleFieldSchema);
+ inputObjectInspector =
(StructObjectInspector)HiveUtils.createObjectInspector(ti);
+ if (constantsInfo!=null) {
+
constantsInfo.injectConstantObjectInspector(inputObjectInspector);
+ }
+
+ try {
+ outputObjectInspector = udtf.initialize(inputObjectInspector);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ SchemaInfo schemaInfo = new SchemaInfo();
+
+ ConstantObjectInspectInfo constantsInfo;
+
+ private static BagFactory bf = BagFactory.getInstance();
+ private HiveUDTFCollector collector = null;
+
+ public HiveUDTF(String funcName) throws InstantiationException,
IllegalAccessException, IOException {
+ Class hiveUDTFClass = resolveFunc(funcName);
+ if (GenericUDTF.class.isAssignableFrom(hiveUDTFClass)) {
+ udtf = (GenericUDTF)hiveUDTFClass.newInstance();
+ } else {
+ throw new IOException(getErrorMessage(hiveUDTFClass));
+ }
+ }
+
+ public HiveUDTF(String funcName, String params) throws
InstantiationException, IllegalAccessException, IOException {
+ this(funcName);
+ constantsInfo = ConstantObjectInspectInfo.parse(params);
+ }
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ if (!inited) {
+ udtf.configure(instantiateMapredContext());
+ schemaInfo.init(getInputSchema(), udtf, constantsInfo);
+ inited = true;
+ }
+
+ if (collector == null) {
+ collector = new HiveUDTFCollector();
+ udtf.setCollector(collector);
+ } else {
+ collector.init();
+ }
+
+ try {
+ if (!endOfAllInput) {
+ udtf.process(input.getAll().toArray());
+ } else {
+ udtf.close();
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return collector.getBag();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ if (!inited) {
+ schemaInfo.init(getInputSchema(), udtf, constantsInfo);
+ inited = true;
+ }
+ ResourceFieldSchema rfs = HiveUtils.getResourceFieldSchema(
+
TypeInfoUtils.getTypeInfoFromObjectInspector(schemaInfo.outputObjectInspector));
+
+ ResourceSchema tupleSchema = new ResourceSchema();
+ tupleSchema.setFields(new ResourceFieldSchema[] {rfs});
+
+ ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
+ bagFieldSchema.setType(DataType.BAG);
+ bagFieldSchema.setSchema(tupleSchema);
+
+ ResourceSchema bagSchema = new ResourceSchema();
+ bagSchema.setFields(new ResourceFieldSchema[] {bagFieldSchema});
+ return Schema.getPigSchema(bagSchema);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ class HiveUDTFCollector implements Collector {
+ DataBag bag = bf.newDefaultBag();
+
+ public void init() {
+ bag.clear();
+ }
+
+ @Override
+ public void collect(Object input) throws HiveException {
+ try {
+ Tuple outputTuple = (Tuple)HiveUtils.convertHiveToPig(input,
schemaInfo.outputObjectInspector, null);
+ if (outputTuple.size()==1 && outputTuple.get(0) instanceof
Tuple) {
+ bag.add((Tuple)outputTuple.get(0));
+ } else {
+ bag.add(outputTuple);
+ }
+ } catch(Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ public DataBag getBag() {
+ return bag;
+ }
+ }
+
+ @Override
+ public boolean needEndOfAllInputProcessing() {
+ return true;
+ }
+
+ @Override
+ public void setEndOfAllInput(boolean endOfAllInput) {
+ this.endOfAllInput = endOfAllInput;
+ }
+}
Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Tue Apr 7 21:24:48
2015
@@ -91,7 +91,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
-import org.apache.pig.impl.util.orc.OrcUtils;
+import org.apache.pig.impl.util.hive.HiveUtils;
import org.joda.time.DateTime;
import com.esotericsoftware.kryo.io.Input;
@@ -235,7 +235,7 @@ public class OrcStorage extends LoadFunc
typeInfo =
(TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature +
SchemaSignatureSuffix));
}
if (oi==null) {
- oi = OrcUtils.createObjectInspector(typeInfo);
+ oi = HiveUtils.createObjectInspector(typeInfo);
}
}
@@ -244,7 +244,7 @@ public class OrcStorage extends LoadFunc
ResourceFieldSchema fs = new ResourceFieldSchema();
fs.setType(DataType.TUPLE);
fs.setSchema(rs);
- typeInfo = OrcUtils.getTypeInfo(fs);
+ typeInfo = HiveUtils.getTypeInfo(fs);
Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass());
p.setProperty(signature + SchemaSignatureSuffix,
ObjectSerializer.serialize(typeInfo));
}
@@ -376,7 +376,7 @@ public class OrcStorage extends LoadFunc
}
Object value = in.getCurrentValue();
- Tuple t = (Tuple)OrcUtils.convertOrcToPig(value, oi,
mRequiredColumns);
+ Tuple t = (Tuple)HiveUtils.convertHiveToPig(value, oi,
mRequiredColumns);
return t;
} catch (InterruptedException e) {
int errCode = 6018;
@@ -438,7 +438,7 @@ public class OrcStorage extends LoadFunc
}
}
- ResourceFieldSchema fs = OrcUtils.getResourceFieldSchema(typeInfo);
+ ResourceFieldSchema fs = HiveUtils.getResourceFieldSchema(typeInfo);
return fs.getSchema();
}
Added: pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java?rev=1671956&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java (added)
+++ pig/trunk/src/org/apache/pig/data/UnlimitedNullTuple.java Tue Apr 7
21:24:48 2015
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+public class UnlimitedNullTuple extends AbstractTuple {
+
+ @Override
+ public int size() {
+ throw new RuntimeException("Unimplemented");
+ }
+
+ @Override
+ public Object get(int fieldNum) throws ExecException {
+ return null;
+ }
+
+ @Override
+ public List<Object> getAll() {
+ throw new RuntimeException("Unimplemented");
+ }
+
+ @Override
+ public void set(int fieldNum, Object val) throws ExecException {
+ throw new ExecException("Unimplemented");
+ }
+
+ @Override
+ public void append(Object val) {
+ throw new RuntimeException("Unimplemented");
+ }
+
+ @Override
+ public long getMemorySize() {
+ throw new RuntimeException("Unimplemented");
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ throw new IOException("Unimplemented");
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ throw new IOException("Unimplemented");
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ throw new RuntimeException("Unimplemented");
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1671956&r1=1671955&r2=1671956&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Tue Apr 7 21:24:48 2015
@@ -255,6 +255,13 @@ public class Utils {
return schema;
}
+ public static Object parseConstant(String constantString) throws
ParserException {
+ QueryParserDriver queryParser = new QueryParserDriver( new
PigContext(),
+ "util", new HashMap<String, String>() ) ;
+ Object constant = queryParser.parseConstant(constantString);
+ return constant;
+ }
+
/**
* This method adds FieldSchema of 'input source tag/path' as the first
* field. This will be called only when PigStorage is invoked with