Author: rohini
Date: Mon Apr  6 23:56:14 2015
New Revision: 1671714

URL: http://svn.apache.org/r1671714
Log:
PIG-4498: AvroStorage in Piggbank does not handle bad records and fails (viraj 
via rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671714&r1=1671713&r2=1671714&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr  6 23:56:14 2015
@@ -58,6 +58,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4498: AvroStorage in Piggbank does not handle bad records and fails (viraj 
via rohini)
+
 PIG-4499: mvn-build miss tez classes in pig-h2.jar (daijy)
 
 PIG-4488: Pig on tez mask tez.queue.name (daijy)

Modified: 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1671714&r1=1671713&r2=1671714&view=diff
==============================================================================
--- 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
 (original)
+++ 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
 Mon Apr  6 23:56:14 2015
@@ -194,30 +194,43 @@ public class PigAvroRecordReader extends
 
     @Override
     public Writable getCurrentValue() throws IOException, InterruptedException 
{
-        Object obj = reader.next();
-        Tuple result = null;
-        if (obj instanceof Tuple) {
-            AvroStorageLog.details("Class =" + obj.getClass());
-            result = (Tuple) obj;
-        } else {
-            if (obj != null) {
-                AvroStorageLog.details("Wrap class " + obj.getClass() + " as a 
tuple.");
+        try {
+            Object obj = reader.next();
+            Tuple result = null;
+            if (obj instanceof Tuple) {
+                AvroStorageLog.details("Class =" + obj.getClass());
+                result = (Tuple) obj;
+            } else {
+                if (obj != null) {
+                    AvroStorageLog.details("Wrap class " + obj.getClass() + " 
as a tuple.");
+                }
+                else {
+                    AvroStorageLog.details("Wrap null as a tuple.");
+                }
+                result = wrapAsTuple(obj);
             }
-            else {
-                AvroStorageLog.details("Wrap null as a tuple.");
+            if (schemaToMergedSchemaMap != null) {
+                // remap the position of fields to the merged schema
+                Map<Integer, Integer> map = schemaToMergedSchemaMap.get(path);
+                if (map == null) {
+                    throw new IOException("The schema of '" + path + "' " +
+                            "is not merged by AvroStorage.");
+                }
+                result = remap(result, map);
             }
-            result = wrapAsTuple(obj);
+            return result;
         }
-        if (schemaToMergedSchemaMap != null) {
-            // remap the position of fields to the merged schema
-            Map<Integer, Integer> map = schemaToMergedSchemaMap.get(path);
-            if (map == null) {
-                throw new IOException("The schema of '" + path + "' " +
-                                      "is not merged by AvroStorage.");
+        catch(Exception e) {
+            if (ignoreBadFiles) {
+                LOG.warn("Ignoring bad record for '" + path + "'.");
+                return null;
+            }
+            else {
+                //re-throw exception
+                LOG.error("Bad record for '" + path + "'.");
+                throw new IOException(e);
             }
-            result = remap(result, map);
         }
-        return result;
     }
 
     /**


Reply via email to