Author: rohini
Date: Sat Nov 14 23:31:23 2015
New Revision: 1714398
URL: http://svn.apache.org/viewvc?rev=1714398&view=rev
Log:
PIG-4736: Removing empty keys in UDFContext broke one LoadFunc (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
pig/trunk/test/org/apache/pig/test/TestUDFContext.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Nov 14 23:31:23 2015
@@ -69,6 +69,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4736: Removing empty keys in UDFContext broke one LoadFunc (rohini)
+
PIG-4733: Avoid NullPointerException in JVMReuseImpl for builtin classes
(rohini)
PIG-4722: [Pig on Tez] NPE while running Combiner (rohini)
Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Sat Nov 14 23:31:23
2015
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -202,14 +200,6 @@ public class UDFContext {
* @throws IOException if underlying serialization throws it
*/
public void serialize(Configuration conf) throws IOException {
- // Minor optimziation. Remove empty properties before serialization.
- Iterator<Entry<UDFContextKey, Properties>> iter =
udfConfs.entrySet().iterator();
- while (iter.hasNext()) {
- Entry<UDFContextKey, Properties> entry = iter.next();
- if (entry.getValue().isEmpty()) {
- iter.remove();
- }
- }
conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
}
Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java Sat Nov 14
23:31:23 2015
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -99,6 +100,7 @@ public class UDFContextSeparator extends
// as user might be just accessing properties by base
class name
// instead of by Initial, Intermediate and Final classes
algebraicUDFKeys.add(key);
+ knownKeys.add(key);
}
}
} else {
@@ -199,6 +201,16 @@ public class UDFContextSeparator extends
HashMap<UDFContextKey, Properties> udfConfsToSerialize)
throws IOException {
HashMap<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+
+ // Save empty values from being serialized unnecessarily
+ Iterator<Entry<UDFContextKey, Properties>> iter =
udfConfsToSerialize.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<UDFContextKey, Properties> entry = iter.next();
+ if (entry.getValue().isEmpty()) {
+ iter.remove();
+ }
+ }
+
// Add unknown ones for serialization
for (UDFContextKey key : getUnKnownKeys()) {
udfConfsToSerialize.put(key, udfConfs.get(key));
Modified: pig/trunk/test/org/apache/pig/test/TestUDFContext.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUDFContext.java?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUDFContext.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUDFContext.java Sat Nov 14 23:31:23
2015
@@ -17,28 +17,42 @@
*/
package org.apache.pig.test;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
+import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.junit.AfterClass;
import org.junit.Test;
public class TestUDFContext {
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ FileLocalizer.deleteTempFiles();
+ }
+
@Test
public void testUDFContext() throws Exception {
- File a = Util.createLocalInputFile("a.txt", new String[] { "dumb" });
- File b = Util.createLocalInputFile("b.txt", new String[] { "dumber" });
- FileLocalizer.deleteTempFiles();
+ File a = Util.createInputFile("inp1", "txt", new String[] { "dumb" });
+ File b = Util.createInputFile("inp2", "txt", new String[] { "dumber"
});
PigServer pig = new PigServer(Util.getLocalTestMode(), new
Properties());
String[] statement = { "A = LOAD '" +
Util.encodeEscape(a.getAbsolutePath()) +
"' USING
org.apache.pig.test.utils.UDFContextTestLoader('joe');",
@@ -101,4 +115,77 @@ public class TestUDFContext {
}
+ @Test
+ public void testUDFContextSeparator() throws Exception {
+
+ File inputFile = Util.createInputFile("input", "txt", new String[] {
"f1\tf2\tf3\tf4\tf5" });
+
+ PigServer pigServer = new PigServer(Util.getLocalTestMode(), new
Properties());
+ Storage.Data data = resetData(pigServer);
+
+ String inputPath = Util.encodeEscape(inputFile.getAbsolutePath());
+ String query = "A = LOAD '" + inputPath + "' USING PigStorage();"
+ + "B = LOAD '" + inputPath + "' USING PigStorage();"
+ + "B = FOREACH B GENERATE $0, $1;"
+ + "C = LOAD '" + inputPath + "' USING " +
FieldsByIndexLoader.class.getName() + "('1,2');"
+ // Scalar to force PigStorage to be always visited first in
LoaderProcessor
+ + "C = FOREACH C GENERATE *, B.$0;"
+ + "STORE A INTO 'A' USING mock.Storage();"
+ + "STORE B INTO 'B' USING mock.Storage();"
+ + "STORE C INTO 'C' USING mock.Storage();";
+
+ pigServer.registerQuery(query);
+
+ List<Tuple> a = data.get("A");
+ List<Tuple> b = data.get("B");
+ List<Tuple> c = data.get("C");
+ assertEquals(1, a.size());
+ assertEquals(1, b.size());
+ assertEquals(1, c.size());
+ DataByteArray f1 = new DataByteArray("f1");
+ DataByteArray f2 = new DataByteArray("f2");
+ DataByteArray f3 = new DataByteArray("f3");
+ DataByteArray f4 = new DataByteArray("f4");
+ DataByteArray f5 = new DataByteArray("f5");
+ assertEquals(tuple(f1, f2, f3, f4, f5), a.get(0));
+ assertEquals(tuple(f1, f2), b.get(0));
+ assertEquals(tuple(f2, f3, f1), c.get(0));
+ }
+
+
+ public static class FieldsByIndexLoader extends PigStorage {
+
+ // Tests the case of one user LoadFunc where UDF properties was a
class variable
+ // and getSchema() was used to determined frontend instead of
+ // UDFContext.getUDFContext().isFrontEnd();
+ private boolean frontend = false;
+ private Properties props =
UDFContext.getUDFContext().getUDFProperties(this.getClass());
+ private boolean[] selectedFields = new boolean[5]; //Assuming data
always has 5 columns
+
+ public FieldsByIndexLoader(String fieldIndices) {
+ String[] requiredFields = fieldIndices.split(",");
+ for (String index : requiredFields) {
+ selectedFields[Integer.parseInt(index)] = true;
+ }
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ if (frontend) {
+ // PigStorage should deserialize this as
+ // mRequiredColumns =
(boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
+ props.setProperty(signature,
ObjectSerializer.serialize(selectedFields));
+ }
+ super.setLocation(location, job);
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Job job)
+ throws IOException {
+ frontend = true;
+ return super.getSchema(location, job);
+ }
+
+ }
+
}