gates
Thu, 19 Nov 2009 14:35:12 -0800
Author: gates Date: Thu Nov 19 22:34:46 2009 New Revision: 882339 URL: http://svn.apache.org/viewvc?rev=882339&view=rev Log: Pass JobConf and UDF specific configuration information to UDFs. Added: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=882339&r1=882338&r2=882339&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/CHANGES.txt Thu Nov 19 22:34:46 2009 @@ -24,6 +24,9 @@ IMPROVEMENTS +PIG-1085: Pass JobConf and UDF specific configuration information to UDFs + (gates) + PIG-1089: Pig 0.6.0 Documentation (chandec via olgan) PIG-958: Splitting output data on key field (ankur via pradeepkth) Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=882339&r1=882338&r2=882339&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Nov 19 22:34:46 2009 @@ -81,6 +81,7 @@ import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.UDFContext; /** * This is compiler class that takes an MROperPlan and converts @@ -596,6 +597,9 @@ jobConf.setOutputCommitter(PigOutputCommitter.class); Job job = new Job(jobConf); jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations, tmpLocation)); + + // Serialize the UDF specific context info. + UDFContext.getUDFContext().serialize(jobConf); return job; } catch (JobCreationException jce) { throw jce; Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=882339&r1=882338&r2=882339&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Thu Nov 19 22:34:46 2009 @@ -49,6 +49,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.UDFContext; public abstract class PigMapBase extends MapReduceBase{ private static final Tuple DUMMYTUPLE = null; @@ -166,6 +167,12 @@ keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0]; pigReporter = new ProgressableReporter(); + + // Get the UDF specific context + UDFContext udfc = UDFContext.getUDFContext(); + udfc.addJobConf(job); + udfc.deserialize(); + if(!(mp.isEmpty())) { List<OperatorKey> targetOpKeys = (ArrayList<OperatorKey>)ObjectSerializer.deserialize(job.get("map.target.ops")); @@ -178,7 +185,6 @@ } - } catch (IOException ioe) { String msg = "Problem while configuring map plan."; throw new RuntimeException(msg, ioe); Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=882339&r1=882338&r2=882339&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Nov 19 22:34:46 2009 @@ -57,6 +57,7 @@ import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.WrappedIOException; import org.apache.pig.data.DataBag; @@ -301,6 +302,12 @@ roots = rp.getRoots().toArray(new PhysicalOperator[1]); leaf = rp.getLeaves().get(0); } + + // Get the UDF specific context + UDFContext udfc = UDFContext.getUDFContext(); + udfc.addJobConf(jConf); + udfc.deserialize(); + } catch (IOException ioe) { String msg = "Problem while configuring reduce plan."; throw new RuntimeException(msg, ioe); Added: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java?rev=882339&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java (added) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java Thu Nov 19 22:34:46 2009 @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.util; + +import java.io.IOException; +//import java.io.Serializable; +import java.util.HashMap; +import java.util.Properties; + +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.util.ObjectSerializer; + +...@suppresswarnings("deprecation") +public class UDFContext { + + private JobConf jconf = null; + private HashMap<Integer, Properties> udfConfs; + + private static UDFContext self = null; + + private UDFContext() { + udfConfs = new HashMap<Integer, Properties>(); + } + + public static UDFContext getUDFContext() { + if (self == null) { + self = new UDFContext(); + } + return self; + } + + /** + * Adds the JobConf to this singleton. Will be + * called on the backend by the Map and Reduce + * functions so that UDFs can obtain the JobConf + * on the backend. + */ + public void addJobConf(JobConf conf) { + jconf = conf; + } + + /** + * Get the JobConf. This should only be called on + * the backend. It will return null on the frontend. + * @return JobConf for this job. This is a copy of the + * JobConf. Nothing written here will be kept by the system. + * getUDFConf should be used for recording UDF specific + * information. + */ + public JobConf getJobConf() { + if (jconf != null) return new JobConf(jconf); + else return null; + } + + /** + * Get a properties object that is specific to this UDF. + * Note that if a given UDF is called multiple times in a script, + * and each instance passes different arguments, then each will + * be provided with different configuration object. + * This can be used by loaders to pass their input object path + * or URI and separate themselves from other instances of the + * same loader. Constructor arguments could also be used, + * as they are available on both the front and back end. + * + * Note that this can only be used to share information + * across instantiations of the same function in the front end + * and between front end and back end. It cannot be used to + * share information between instantiations (that is, between + * map and/or reduce instances) on the back end at runtime. + * @param c of the UDF obtaining the properties object. + * @param args String arguments that make this instance of + * the UDF unique. + * @return A reference to the properties object specific to + * the calling UDF. This is a reference, not a copy. + * Any changes to this object will automatically be + * propogated to other instances of the UDF calling this + * function. + */ + + @SuppressWarnings("unchecked") + public Properties getUDFProperties(Class c, String[] args) { + Integer k = generateKey(c, args); + Properties p = udfConfs.get(k); + if (p == null) { + p = new Properties(); + udfConfs.put(k, p); + } + return p; + } + + /** + * Get a properties object that is specific to this UDF. + * Note that if a given UDF is called multiple times in a script, + * they will all be provided the same configuration object. It + * is up to the UDF to make sure the multiple instances do not + * stomp on each other. + * + * It is guaranteed that this properties object will be separate + * from that provided to any other UDF. + * + * Note that this can only be used to share information + * across instantiations of the same function in the front end + * and between front end and back end. It cannot be used to + * share information between instantiations (that is, between + * map and/or reduce instances) on the back end at runtime. + * @param c of the UDF obtaining the properties object. + * @return A reference to the properties object specific to + * the calling UDF. This is a reference, not a copy. + * Any changes to this object will automatically be + * propogated to other instances of the UDF calling this + * function. + */ + @SuppressWarnings("unchecked") + public Properties getUDFProperties(Class c) { + Integer k = generateKey(c); + Properties p = udfConfs.get(k); + if (p == null) { + p = new Properties(); + udfConfs.put(k, p); + } + return p; + } + + /** + * Serialize the UDF specific information into an instance + * of JobConf. This function is intended to be called on + * the front end in preparation for sending the data to the + * backend. + * @param conf JobConf to serialize into + * @throws IOException if underlying serialization throws it + */ + public void serialize(JobConf conf) throws IOException { + conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs)); + } + + /** + * Populate the udfConfs field. This function is intended to + * be called by Map.configure or Reduce.configure on the backend. + * It assumes that addJobConf has already been called. + * @throws IOException if underlying deseralization throws it + */ + @SuppressWarnings("unchecked") + public void deserialize() throws IOException { + udfConfs = (HashMap<Integer, Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext")); + } + + @SuppressWarnings("unchecked") + private int generateKey(Class c) { + return c.getName().hashCode(); + } + + @SuppressWarnings("unchecked") + private int generateKey(Class c, String[] args) { + int hc = c.getName().hashCode(); + for (int i = 0; i < args.length; i++) { + hc <<= 1; + hc ^= args[i].hashCode(); + } + return hc; + } + +} \ No newline at end of file Added: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java?rev=882339&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java (added) +++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java Thu Nov 19 22:34:46 2009 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.pig.EvalFunc; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.UDFContext; +import org.junit.Test; + +import junit.framework.TestCase; + + +public class TestUDFContext extends TestCase { + + static MiniCluster cluster = null; + + @Override + protected void setUp() throws Exception { + cluster = MiniCluster.buildCluster(); + } + + + @Test + public void testUDFContext() throws Exception { + Util.createInputFile(cluster, "a.txt", new String[] { "dumb" }); + Util.createInputFile(cluster, "b.txt", new String[] { "dumber" }); + FileLocalizer.deleteTempFiles(); + PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + String[] statement = { "A = LOAD 'a.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('joe');", + "B = LOAD 'b.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('jane');", + "C = union A, B;", + "D = FOREACH C GENERATE $0, $1, org.apache.pig.test.utils.UDFContextTestEvalFunc($0), org.apache.pig.test.utils.UDFContextTestEvalFunc2($0);" }; + + File tmpFile = File.createTempFile("temp_jira_851", ".pig"); + FileWriter writer = new FileWriter(tmpFile); + for (String line : statement) { + writer.write(line + "\n"); + } + writer.close(); + + pig.registerScript(tmpFile.getAbsolutePath()); + Iterator<Tuple> iterator = pig.openIterator("D"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + if ("dumb".equals(tuple.get(0).toString())) { + assertEquals(tuple.get(1).toString(), "joe"); + } else if ("dumber".equals(tuple.get(0).toString())) { + assertEquals(tuple.get(1).toString(), "jane"); + } + assertEquals(Integer.valueOf(tuple.get(2).toString()), new Integer(5)); + assertEquals(tuple.get(3).toString(), "five"); + } + } +} Added: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java?rev=882339&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java (added) +++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java Thu Nov 19 22:34:46 2009 @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; + +public class UDFContextTestEvalFunc extends EvalFunc<Integer> { + public UDFContextTestEvalFunc() { + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); + p.setProperty("key1", "5"); + } + + @Override + public Integer exec(Tuple input) throws IOException { + String s = (UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1")); + return Integer.valueOf(s); + } + +} Added: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java?rev=882339&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java (added) +++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java Thu Nov 19 22:34:46 2009 @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; + +public class UDFContextTestEvalFunc2 extends EvalFunc<String> { + + public UDFContextTestEvalFunc2() { + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); + p.setProperty("key1", "five"); + } + + @Override + public String exec(Tuple input) throws IOException { + if (UDFContext.getUDFContext().getJobConf() == null) return "JobConf is null!"; + else return UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1"); + } + +} Added: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java?rev=882339&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java (added) +++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java Thu Nov 19 22:34:46 2009 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.UDFContext; + +public class UDFContextTestLoader extends PigStorage { + + private String[] vals = new String[1]; + + public UDFContextTestLoader(String v1) { + vals[0] = v1; + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals); + p.setProperty("key1", vals[0]); + } + + @Override + public Tuple getNext() throws IOException { + Tuple t = super.getNext(); + if (t != null) { + if (UDFContext.getUDFContext().getJobConf() == null) { + t.append("JobConf is null!"); + } else { + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals); + t.append(p.getProperty("key1")); + } + } + return t; + } +}