Author: rohini
Date: Sat Nov 14 23:31:23 2015
New Revision: 1714398

URL: http://svn.apache.org/viewvc?rev=1714398&view=rev
Log:
PIG-4736: Removing empty keys in UDFContext broke one LoadFunc (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
    pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
    pig/trunk/test/org/apache/pig/test/TestUDFContext.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Nov 14 23:31:23 2015
@@ -69,6 +69,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4736: Removing empty keys in UDFContext broke one LoadFunc (rohini)
+
 PIG-4733: Avoid NullPointerException in JVMReuseImpl for builtin classes 
(rohini)
 
 PIG-4722: [Pig on Tez] NPE while running Combiner (rohini)

Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Sat Nov 14 23:31:23 
2015
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -202,14 +200,6 @@ public class UDFContext {
      * @throws IOException if underlying serialization throws it
      */
     public void serialize(Configuration conf) throws IOException {
-        // Minor optimziation. Remove empty properties before serialization.
-        Iterator<Entry<UDFContextKey, Properties>> iter = 
udfConfs.entrySet().iterator();
-        while (iter.hasNext()) {
-            Entry<UDFContextKey, Properties> entry = iter.next();
-            if (entry.getValue().isEmpty()) {
-                iter.remove();
-            }
-        }
         conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
         conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
     }

Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java Sat Nov 14 
23:31:23 2015
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -99,6 +100,7 @@ public class UDFContextSeparator extends
                     // as user might be just accessing properties by base 
class name
                     // instead of by Initial, Intermediate and Final classes
                     algebraicUDFKeys.add(key);
+                    knownKeys.add(key);
                 }
             }
         } else {
@@ -199,6 +201,16 @@ public class UDFContextSeparator extends
             HashMap<UDFContextKey, Properties> udfConfsToSerialize)
             throws IOException {
         HashMap<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+
+        // Save empty values from being serialized unnecessarily
+        Iterator<Entry<UDFContextKey, Properties>> iter = 
udfConfsToSerialize.entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<UDFContextKey, Properties> entry = iter.next();
+            if (entry.getValue().isEmpty()) {
+                iter.remove();
+            }
+        }
+
         // Add unknown ones for serialization
         for (UDFContextKey key : getUnKnownKeys()) {
             udfConfsToSerialize.put(key, udfConfs.get(key));

Modified: pig/trunk/test/org/apache/pig/test/TestUDFContext.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUDFContext.java?rev=1714398&r1=1714397&r2=1714398&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUDFContext.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUDFContext.java Sat Nov 14 23:31:23 
2015
@@ -17,28 +17,42 @@
  */
 package org.apache.pig.test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.junit.AfterClass;
 import org.junit.Test;
 
 public class TestUDFContext {
 
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        FileLocalizer.deleteTempFiles();
+    }
+
     @Test
     public void testUDFContext() throws Exception {
-        File a = Util.createLocalInputFile("a.txt", new String[] { "dumb" });
-        File b = Util.createLocalInputFile("b.txt", new String[] { "dumber" });
-        FileLocalizer.deleteTempFiles();
+        File a = Util.createInputFile("inp1", "txt", new String[] { "dumb" });
+        File b = Util.createInputFile("inp2", "txt", new String[] { "dumber" 
});
         PigServer pig = new PigServer(Util.getLocalTestMode(), new 
Properties());
         String[] statement = { "A = LOAD '" + 
Util.encodeEscape(a.getAbsolutePath()) +
                 "' USING 
org.apache.pig.test.utils.UDFContextTestLoader('joe');",
@@ -101,4 +115,77 @@ public class TestUDFContext {
 
     }
 
+    @Test
+    public void testUDFContextSeparator() throws Exception {
+
+        File inputFile = Util.createInputFile("input", "txt", new String[] { 
"f1\tf2\tf3\tf4\tf5" });
+
+        PigServer pigServer = new PigServer(Util.getLocalTestMode(), new 
Properties());
+        Storage.Data data = resetData(pigServer);
+
+        String inputPath = Util.encodeEscape(inputFile.getAbsolutePath());
+        String query = "A = LOAD '" + inputPath + "' USING PigStorage();"
+                + "B = LOAD '" + inputPath + "' USING PigStorage();"
+                + "B = FOREACH B GENERATE $0, $1;"
+                + "C = LOAD '" + inputPath + "' USING " + 
FieldsByIndexLoader.class.getName() + "('1,2');"
+                // Scalar to force PigStorage to be always visited first in 
LoaderProcessor
+                + "C = FOREACH C GENERATE *, B.$0;"
+                + "STORE A INTO 'A' USING mock.Storage();"
+                + "STORE B INTO 'B' USING mock.Storage();"
+                + "STORE C INTO 'C' USING mock.Storage();";
+
+        pigServer.registerQuery(query);
+
+        List<Tuple> a = data.get("A");
+        List<Tuple> b = data.get("B");
+        List<Tuple> c = data.get("C");
+        assertEquals(1, a.size());
+        assertEquals(1, b.size());
+        assertEquals(1, c.size());
+        DataByteArray f1 = new DataByteArray("f1");
+        DataByteArray f2 = new DataByteArray("f2");
+        DataByteArray f3 = new DataByteArray("f3");
+        DataByteArray f4 = new DataByteArray("f4");
+        DataByteArray f5 = new DataByteArray("f5");
+        assertEquals(tuple(f1, f2, f3, f4, f5), a.get(0));
+        assertEquals(tuple(f1, f2), b.get(0));
+        assertEquals(tuple(f2, f3, f1), c.get(0));
+    }
+
+
+    public static class FieldsByIndexLoader extends PigStorage {
+
+        // Tests the case of one user LoadFunc where UDF properties was a 
class variable
+        // and getSchema() was used to determined frontend instead of
+        // UDFContext.getUDFContext().isFrontEnd();
+        private boolean frontend = false;
+        private Properties props = 
UDFContext.getUDFContext().getUDFProperties(this.getClass());
+        private boolean[] selectedFields = new boolean[5]; //Assuming data 
always has 5 columns
+
+        public FieldsByIndexLoader(String fieldIndices) {
+            String[] requiredFields = fieldIndices.split(",");
+            for (String index : requiredFields) {
+                selectedFields[Integer.parseInt(index)] = true;
+            }
+        }
+
+        @Override
+        public void setLocation(String location, Job job) throws IOException {
+            if (frontend) {
+                // PigStorage should deserialize this as
+                // mRequiredColumns = 
(boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
+                props.setProperty(signature, 
ObjectSerializer.serialize(selectedFields));
+            }
+            super.setLocation(location, job);
+        }
+
+        @Override
+        public ResourceSchema getSchema(String location, Job job)
+                throws IOException {
+            frontend = true;
+            return super.getSchema(location, job);
+        }
+
+    }
+
 }


Reply via email to