olga
Sat, 03 May 2008 14:50:04 -0700
Author: olga Date: Sat May 3 14:49:42 2008 New Revision: 653148 URL: http://svn.apache.org/viewvc?rev=653148&view=rev Log: PIG-229: proper error handling for invlaid streaming deserializer Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=653148&r1=653147&r2=653148&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Sat May 3 14:49:42 2008 @@ -273,3 +273,4 @@ PIG-215: Cleanup a few dangling ends left by PIG-111 (pi_song via gates). + PIG-229: Proper error handling in case of deserializer failure Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=653148&r1=653147&r2=653148&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java Sat May 3 14:49:42 2008 @@ -160,4 +160,9 @@ sb.append(carray, 0, cbuff.position()); return sb.toString(); } + + public void close() throws IOException { + super.close(); + in.close(); + } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=653148&r1=653147&r2=653148&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java Sat May 3 14:49:42 2008 @@ -17,9 +17,12 @@ */ package org.apache.pig.impl.streaming; +import java.io.IOException; + import org.apache.pig.LoadFunc; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; /** @@ -28,6 +31,7 @@ * via its <code>stdout</code>. */ public class DefaultOutputHandler extends OutputHandler { + BufferedPositionedInputStream stdout; public DefaultOutputHandler() { deserializer = new PigStorage(); @@ -40,4 +44,16 @@ public OutputType getOutputType() { return OutputType.SYNCHRONOUS; } + + public void bindTo(String fileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + stdout = is; + super.bindTo(fileName, stdout, offset, end); + } + + public void close() throws IOException { + super.close(); + stdout.close(); + stdout = null; + } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=653148&r1=653147&r2=653148&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Sat May 3 14:49:42 2008 @@ -87,6 +87,8 @@ protected long outputRecords = 0; protected long outputBytes = 0; + protected volatile Throwable outerrThreadsError; + /** * Create a new [EMAIL PROTECTED] ExecutableManager}. */ @@ -202,6 +204,13 @@ stderrThread.interrupt(); } } + + // Check if there was a problem with the managed process + if (outerrThreadsError != null) { + throw new IOException("Output/Error thread failed with: " + + outerrThreadsError); + } + } /** @@ -304,6 +313,12 @@ * @throws IOException */ public void add(Datum d) throws IOException { + // Check if there was a problem with the managed process + if (outerrThreadsError != null) { + throw new IOException("Output/Error thread failed with: " + + outerrThreadsError); + } + // Pass the serialized tuple to the executable via the InputHandler Tuple t = (Tuple)d; inputHandler.putNext(t); @@ -344,6 +359,9 @@ outputHandler.close(); } catch (Throwable t) { + // Note that an error occurred + outerrThreadsError = t; + LOG.warn(t); try { outputHandler.close(); @@ -387,16 +405,19 @@ stderr.close(); LOG.debug("ProcessErrorThread done"); } - } catch (Throwable th) { - LOG.warn(th); + } catch (Throwable t) { + // Note that an error occurred + outerrThreadsError = t; + + LOG.warn(t); try { if (stderr != null) { stderr.close(); } } catch (IOException ioe) { LOG.info(ioe); - throw new RuntimeException(th); } + throw new RuntimeException(t); } } }