Author: rohini
Date: Thu Jan 28 15:01:17 2016
New Revision: 1727372

URL: http://svn.apache.org/viewvc?rev=1727372&view=rev
Log:
PIG-4787: Log JSONLoader exception while parsing records (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/JsonLoader.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1727372&r1=1727371&r2=1727372&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 28 15:01:17 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4787: Log JSONLoader exception while parsing records (rohini)
+
 PIG-4763: Insufficient check for the number of arguments in runpigmix.pl 
(sekikn via rohini)
 
 PIG-4411: Support for vertex level configuration like speculative execution 
(rohini)

Modified: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonLoader.java?rev=1727372&r1=1727371&r2=1727372&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonLoader.java Thu Jan 28 15:01:17 
2016
@@ -25,14 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.math.BigDecimal;
 
-import org.joda.time.format.ISODateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -56,37 +49,44 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
 
 /**
  * A loader for data stored using {@link JsonStorage}.  This is not a generic
  * JSON loader. It depends on the schema being stored with the data when
  * conceivably you could write a loader that determines the schema from the
- * JSON. 
+ * JSON.
  */
 public class JsonLoader extends LoadFunc implements LoadMetadata {
 
     protected RecordReader reader = null;
     protected ResourceSchema schema = null;
-    
+
     private String udfcSignature = null;
     private JsonFactory jsonFactory = null;
     private TupleFactory tupleFactory = TupleFactory.getInstance();
     private BagFactory bagFactory = BagFactory.getInstance();
-    
+
     private static final String SCHEMA_SIGNATURE = "pig.jsonloader.schema";
-    
+
     public JsonLoader() {
     }
-    
+
     public JsonLoader(String schemaString) throws IOException {
         schema = new ResourceSchema(Utils.parseSchema(schemaString));
     }
 
+    @Override
     public void setLocation(String location, Job job) throws IOException {
         // Tell our input format where we will be reading from
         FileInputFormat.setInputPaths(job, location);
     }
-    
+
+    @Override
     @SuppressWarnings("unchecked")
     public InputFormat getInputFormat() throws IOException {
         // We will use TextInputFormat, the default Hadoop input format for
@@ -95,17 +95,19 @@ public class JsonLoader extends LoadFunc
         return new TextInputFormat();
     }
 
+    @Override
     public LoadCaster getLoadCaster() throws IOException {
         // We do not expect to do casting of byte arrays, because we will be
         // returning typed data.
         return null;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public void prepareToRead(RecordReader reader, PigSplit split)
     throws IOException {
         this.reader = reader;
-        
+
         // Get the schema string from the UDFContext object.
         UDFContext udfc = UDFContext.getUDFContext();
         Properties p =
@@ -121,6 +123,7 @@ public class JsonLoader extends LoadFunc
         jsonFactory = new JsonFactory();
     }
 
+    @Override
     public Tuple getNext() throws IOException {
         Text val = null;
         try {
@@ -150,31 +153,34 @@ public class JsonLoader extends LoadFunc
         // isn't what we expect we return a tuple with null fields rather than
         // throwing an exception.  That way a few mangled lines don't fail the
         // job.
-        
+
         try {
             if (p.nextToken() != JsonToken.START_OBJECT) {
                 warn("Bad record, could not find start of record " +
                     val.toString(), PigWarning.UDF_WARNING_1);
                 return t;
             }
-    
+
             // Read each field in the record
             for (int i = 0; i < fields.length; i++) {
                 t.set(i, readField(p, fields[i], i));
             }
-    
+
             if (p.nextToken() != JsonToken.END_OBJECT) {
                 warn("Bad record, could not find end of record " +
                     val.toString(), PigWarning.UDF_WARNING_1);
                 return t;
             }
-            
+
         } catch (Exception jpe) {
-            warn("Bad record, returning null for " + val, 
PigWarning.UDF_WARNING_1);
+            Throwable ex = jpe.getCause() == null ? jpe : jpe.getCause();
+            warn("Encountered exception " + ex.getClass().getName() + ": "
+                    + ex.getMessage() + ". Bad record, returning null for "
+                    + val, PigWarning.UDF_WARNING_1);
         } finally {
             p.close();
         }
-        
+
         return t;
     }
 
@@ -186,44 +192,44 @@ public class JsonLoader extends LoadFunc
             // Read based on our expected type
             case DataType.BOOLEAN:
                 return p.getBooleanValue();
-    
+
             case DataType.INTEGER:
                 return p.getIntValue();
-    
+
             case DataType.LONG:
                 return p.getLongValue();
-    
+
             case DataType.FLOAT:
                 return p.getFloatValue();
-    
+
             case DataType.DOUBLE:
                 return p.getDoubleValue();
-    
+
             case DataType.DATETIME:
                 DateTimeFormatter formatter = 
ISODateTimeFormat.dateTimeParser();
                 return formatter.withOffsetParsed().parseDateTime(p.getText());
-    
+
             case DataType.BYTEARRAY:
                 byte[] b = p.getText().getBytes();
                 // Use the DBA constructor that copies the bytes so that we own
                 // the memory
                 return new DataByteArray(b, 0, b.length);
-    
+
             case DataType.CHARARRAY:
                 return p.getText();
-    
+
             case DataType.BIGINTEGER:
                 return p.getBigIntegerValue();
-    
+
             case DataType.BIGDECIMAL:
                 return new BigDecimal(p.getText());
-    
+
             default:
                 throw new IOException("Unknown type in input schema: " +
                         field.getType() );
         }
     }
-    
+
     private Object readField(JsonParser p,
                              ResourceFieldSchema field,
                              int fieldnum) throws IOException {
@@ -237,7 +243,7 @@ public class JsonLoader extends LoadFunc
 
         // Check to see if this value was null
         if (tok == JsonToken.VALUE_NULL) return null;
-        
+
         tok = p.nextToken();
 
         // Read based on our expected type
@@ -324,11 +330,13 @@ public class JsonLoader extends LoadFunc
     }
 
     //------------------------------------------------------------------------
-    
+
+    @Override
     public void setUDFContextSignature(String signature) {
         udfcSignature = signature;
     }
 
+    @Override
     public ResourceSchema getSchema(String location, Job job)
     throws IOException {
 
@@ -338,12 +346,12 @@ public class JsonLoader extends LoadFunc
         } else {
             // Parse the schema
             s = (new JsonMetadata()).getSchema(location, job, true);
-    
+
             if (s == null) {
                 throw new IOException("Unable to parse schema found in file in 
" + location);
             }
         }
-    
+
         // Now that we have determined the schema, store it in our
         // UDFContext properties object so we have it when we need it on the
         // backend
@@ -355,18 +363,21 @@ public class JsonLoader extends LoadFunc
         return s;
     }
 
-    public ResourceStatistics getStatistics(String location, Job job) 
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job)
     throws IOException {
         // We don't implement this one.
         return null;
     }
 
+    @Override
     public String[] getPartitionKeys(String location, Job job)
     throws IOException {
         // We don't have partitions
         return null;
     }
 
+    @Override
     public void setPartitionFilter(Expression partitionFilter)
     throws IOException {
         // We don't have partitions


Reply via email to