Author: daijy
Date: Fri Apr 3 18:51:10 2015
New Revision: 1671126
URL: http://svn.apache.org/r1671126
Log:
PIG-4491: Streaming Python Bytearray Bugs
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
pig/trunk/src/python/streaming/controller.py
pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 3 18:51:10 2015
@@ -58,6 +58,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4491: Streaming Python Bytearray Bugs (jeremykarn via daijy)
+
PIG-4487: Pig on Tez gives wrong success message on failure in case of
multiple outputs (rohini)
PIG-4483: Pig on Tez output statistics shows storing to same directory twice
for union (rohini)
Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Fri Apr 3
18:51:10 2015
@@ -153,9 +153,15 @@ public abstract class OutputHandler {
recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n
recordDelimStr = new String(recordDelimBa, 0, recordDelimLength,
Charsets.UTF_8);
}
- if (recordDelimLength == 0 || currValue.getLength() <
recordDelimLength) {
+
+ if (recordDelimLength == 0) {
return true;
}
+ //If our current section is less than the delim length, then its not
the end of the row.
+ if (currValue.getLength() < recordDelimLength) {
+ return false;
+ }
+
return currValue.find(recordDelimStr, currValue.getLength() -
recordDelimLength) >= 0;
}
Modified: pig/trunk/src/python/streaming/controller.py
URL:
http://svn.apache.org/viewvc/pig/trunk/src/python/streaming/controller.py?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/src/python/streaming/controller.py (original)
+++ pig/trunk/src/python/streaming/controller.py Fri Apr 3 18:51:10 2015
@@ -125,7 +125,12 @@ class PythonStreamingController:
try:
func_output = func(*inputs)
if should_log:
- log_message("Row %s: UDF Output: %s" %
(self.input_count, unicode(func_output)))
+ try:
+ log_message("Row %s: UDF Output: %s" %
(self.input_count, unicode(func_output)))
+ except:
+ #This is probably an error with unicoding the
output. Calling unicode on bytearray will
+ #throw an exception. Since its just a log
statement, just skip and carry on.
+ logging.exception("Couldn't log output. Try to
continue.")
except:
#These errors should always be caused by user code.
write_user_exception(module_name, self.stream_error,
NUM_LINES_OFFSET_TRACE)
Modified: pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java (original)
+++ pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java Fri Apr 3
18:51:10 2015
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.test.MiniGenericCluster;
@@ -46,6 +47,7 @@ import org.junit.runner.RunWith;
@RunWith(OrderedJUnit4Runner.class)
@TestOrder({
"testPythonUDF_onCluster",
+ "testPythonUDF_withBytearrayAndBytes_onCluster",
"testPythonUDF__allTypes",
"testPythonUDF__withBigDecimal",
"testPythonUDF",
@@ -111,6 +113,41 @@ public class TestStreamingUDF {
assertEquals(expected0, actual0);
assertEquals(expected1, actual1);
}
+
+ @Test
+ public void testPythonUDF_withBytearrayAndBytes_onCluster() throws
Exception {
+ pigServerMapReduce = new PigServer(cluster.getExecType(),
cluster.getProperties());
+
+
+ String[] pythonScript = {
+ "from pig_util import outputSchema",
+ "import os",
+ "@outputSchema('f:bytearray')",
+ "def foo(bar):",
+ " return bytearray(os.urandom(1000))"
+ };
+
+ Util.createLocalInputFile( "pyfilewBaB.py", pythonScript);
+
+ String[] input = {
+ "field1"
+ };
+ Util.createLocalInputFile("testTupleBaB", input);
+ Util.copyFromLocalToCluster(cluster, "testTupleBaB", "testTupleBaB");
+
+ pigServerMapReduce.registerQuery("REGISTER 'pyfilewBaB.py' USING
streaming_python AS pf;");
+ pigServerMapReduce.registerQuery("A = LOAD 'testTupleBaB' as
(b:chararray);");
+ pigServerMapReduce.registerQuery("B = FOREACH A generate pf.foo(b);");
+
+ Iterator<Tuple> iter = pigServerMapReduce.openIterator("B");
+ assertTrue(iter.hasNext());
+ Object result = iter.next().get(0);
+
+ //Mostly we're happy we got a result w/o throwing an exception, but
we'll
+ //do a basic check.
+ assertTrue(result instanceof DataByteArray);
+ assertEquals(1000, ((DataByteArray)result).size());
+ }
@Test
public void testPythonUDF() throws Exception {
@@ -159,7 +196,6 @@ public class TestStreamingUDF {
};
Util.createLocalInputFile( "pyfileNL.py", pythonScript);
-
Data data = resetData(pigServerLocal);
Tuple t0 = tf.newTuple(2);
t0.set(0, "field10");
Modified:
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java?rev=1671126&r1=1671125&r2=1671126&view=diff
==============================================================================
---
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
(original)
+++
pig/trunk/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
Fri Apr 3 18:51:10 2015
@@ -65,6 +65,20 @@ public class TestStreamingUDFOutputHandl
Assert.assertEquals(tf.newTuple("abc\ndef\nghi\njkl"), t);
}
+ @Test
+ public void testGetValue__earlyNewLine() throws Exception{
+ FieldSchema fs = new FieldSchema("", DataType.CHARARRAY);
+ String data = "\na|_\n";
+
+ PigStreamingUDF deserializer = new PigStreamingUDF(fs);
+ OutputHandler outty = new StreamingUDFOutputHandler(deserializer);
+ outty.bindTo(null, getIn(data), 0, 0);
+
+ Tuple t = outty.getNext();
+
+ Assert.assertEquals(tf.newTuple("\na"), t);
+ }
+
private BufferedPositionedInputStream getIn(String input) throws
UnsupportedEncodingException {
InputStream stream = new ByteArrayInputStream(input.getBytes("UTF-8"));
BufferedPositionedInputStream result = new
BufferedPositionedInputStream(stream);