Author: daijy
Date: Fri Apr  3 18:51:10 2015
New Revision: 1671126

URL: http://svn.apache.org/r1671126
Log:
PIG-4491: Streaming Python Bytearray Bugs

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
    pig/trunk/src/python/streaming/controller.py
    pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
    
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr  3 18:51:10 2015
@@ -58,6 +58,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4491: Streaming Python Bytearray Bugs (jeremykarn via daijy)
+
 PIG-4487: Pig on Tez gives wrong success message on failure in case of 
multiple outputs (rohini)
 
 PIG-4483: Pig on Tez output statistics shows storing to same directory twice 
for union (rohini)

Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Fri Apr  3 
18:51:10 2015
@@ -153,9 +153,15 @@ public abstract class OutputHandler {
             recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n
             recordDelimStr = new String(recordDelimBa, 0, recordDelimLength,  
Charsets.UTF_8);
         }
-        if (recordDelimLength == 0 || currValue.getLength() < 
recordDelimLength) {
+
+        if (recordDelimLength == 0) {
             return true;
         }
+        //If our current section is less than the delim length, then its not 
the end of the row.
+        if (currValue.getLength() < recordDelimLength) {
+            return false;
+        }
+
         return currValue.find(recordDelimStr, currValue.getLength() - 
recordDelimLength) >= 0;
     }
     

Modified: pig/trunk/src/python/streaming/controller.py
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/python/streaming/controller.py?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/src/python/streaming/controller.py (original)
+++ pig/trunk/src/python/streaming/controller.py Fri Apr  3 18:51:10 2015
@@ -125,7 +125,12 @@ class PythonStreamingController:
                 try:
                     func_output = func(*inputs)
                     if should_log:
-                        log_message("Row %s: UDF Output: %s" % 
(self.input_count, unicode(func_output)))
+                        try:
+                            log_message("Row %s: UDF Output: %s" % 
(self.input_count, unicode(func_output)))
+                        except:
+                            #This is probably an error with unicoding the 
output.  Calling unicode on bytearray will
+                            #throw an exception.  Since its just a log 
statement, just skip and carry on.
+                            logging.exception("Couldn't log output.  Try to 
continue.")
                 except:
                     #These errors should always be caused by user code.
                     write_user_exception(module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)

Modified: pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java (original)
+++ pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java Fri Apr  3 
18:51:10 2015
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.test.MiniGenericCluster;
@@ -46,6 +47,7 @@ import org.junit.runner.RunWith;
 @RunWith(OrderedJUnit4Runner.class)
 @TestOrder({
     "testPythonUDF_onCluster",
+    "testPythonUDF_withBytearrayAndBytes_onCluster",
     "testPythonUDF__allTypes",
     "testPythonUDF__withBigDecimal",
     "testPythonUDF",
@@ -111,6 +113,41 @@ public class TestStreamingUDF {
         assertEquals(expected0, actual0);
         assertEquals(expected1, actual1);
     }
+    
+    @Test
+    public void testPythonUDF_withBytearrayAndBytes_onCluster() throws 
Exception {
+        pigServerMapReduce = new PigServer(cluster.getExecType(), 
cluster.getProperties());
+
+        
+        String[] pythonScript = {
+            "from pig_util import outputSchema",
+            "import os",
+            "@outputSchema('f:bytearray')",
+            "def foo(bar):",
+            "    return bytearray(os.urandom(1000))"
+        };
+        
+        Util.createLocalInputFile( "pyfilewBaB.py", pythonScript);
+
+        String[] input = {
+            "field1"
+        };
+        Util.createLocalInputFile("testTupleBaB", input);
+        Util.copyFromLocalToCluster(cluster, "testTupleBaB", "testTupleBaB");
+
+        pigServerMapReduce.registerQuery("REGISTER 'pyfilewBaB.py' USING 
streaming_python AS pf;");
+        pigServerMapReduce.registerQuery("A = LOAD 'testTupleBaB' as 
(b:chararray);");
+        pigServerMapReduce.registerQuery("B = FOREACH A generate pf.foo(b);");
+
+        Iterator<Tuple> iter = pigServerMapReduce.openIterator("B");
+        assertTrue(iter.hasNext());
+        Object result = iter.next().get(0);
+
+        //Mostly we're happy we got a result w/o throwing an exception, but 
we'll
+        //do a basic check.
+        assertTrue(result instanceof DataByteArray);
+        assertEquals(1000, ((DataByteArray)result).size());
+    }
 
     @Test
     public void testPythonUDF() throws Exception {
@@ -159,7 +196,6 @@ public class TestStreamingUDF {
         };
         Util.createLocalInputFile( "pyfileNL.py", pythonScript);
 
-        
         Data data = resetData(pigServerLocal);
         Tuple t0 = tf.newTuple(2);
         t0.set(0, "field10");

Modified: 
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java 
(original)
+++ 
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java 
Fri Apr  3 18:51:10 2015
@@ -65,6 +65,20 @@ public class TestStreamingUDFOutputHandl
         Assert.assertEquals(tf.newTuple("abc\ndef\nghi\njkl"), t);
     }
     
+    @Test
+    public void testGetValue__earlyNewLine() throws Exception{
+        FieldSchema fs = new FieldSchema("", DataType.CHARARRAY);
+        String data = "\na|_\n";
+        
+        PigStreamingUDF deserializer = new PigStreamingUDF(fs);
+        OutputHandler outty = new StreamingUDFOutputHandler(deserializer);
+        outty.bindTo(null, getIn(data), 0, 0);
+        
+        Tuple t = outty.getNext();
+        
+        Assert.assertEquals(tf.newTuple("\na"), t);
+    }
+    
     private BufferedPositionedInputStream getIn(String input) throws 
UnsupportedEncodingException {
         InputStream stream = new ByteArrayInputStream(input.getBytes("UTF-8"));
         BufferedPositionedInputStream result = new 
BufferedPositionedInputStream(stream);


Reply via email to