Author: rohini
Date: Thu Jan 28 15:01:17 2016
New Revision: 1727372
URL: http://svn.apache.org/viewvc?rev=1727372&view=rev
Log:
PIG-4787: Log JSONLoader exception while parsing records (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1727372&r1=1727371&r2=1727372&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 28 15:01:17 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4787: Log JSONLoader exception while parsing records (rohini)
+
PIG-4763: Insufficient check for the number of arguments in runpigmix.pl
(sekikn via rohini)
PIG-4411: Support for vertex level configuration like speculative execution
(rohini)
Modified: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonLoader.java?rev=1727372&r1=1727371&r2=1727372&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonLoader.java Thu Jan 28 15:01:17
2016
@@ -25,14 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.math.BigDecimal;
-import org.joda.time.format.ISODateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -56,37 +49,44 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
/**
* A loader for data stored using {@link JsonStorage}. This is not a generic
* JSON loader. It depends on the schema being stored with the data when
* conceivably you could write a loader that determines the schema from the
- * JSON.
+ * JSON.
*/
public class JsonLoader extends LoadFunc implements LoadMetadata {
protected RecordReader reader = null;
protected ResourceSchema schema = null;
-
+
private String udfcSignature = null;
private JsonFactory jsonFactory = null;
private TupleFactory tupleFactory = TupleFactory.getInstance();
private BagFactory bagFactory = BagFactory.getInstance();
-
+
private static final String SCHEMA_SIGNATURE = "pig.jsonloader.schema";
-
+
public JsonLoader() {
}
-
+
public JsonLoader(String schemaString) throws IOException {
schema = new ResourceSchema(Utils.parseSchema(schemaString));
}
+ @Override
public void setLocation(String location, Job job) throws IOException {
// Tell our input format where we will be reading from
FileInputFormat.setInputPaths(job, location);
}
-
+
+ @Override
@SuppressWarnings("unchecked")
public InputFormat getInputFormat() throws IOException {
// We will use TextInputFormat, the default Hadoop input format for
@@ -95,17 +95,19 @@ public class JsonLoader extends LoadFunc
return new TextInputFormat();
}
+ @Override
public LoadCaster getLoadCaster() throws IOException {
// We do not expect to do casting of byte arrays, because we will be
// returning typed data.
return null;
}
+ @Override
@SuppressWarnings("unchecked")
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = reader;
-
+
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
@@ -121,6 +123,7 @@ public class JsonLoader extends LoadFunc
jsonFactory = new JsonFactory();
}
+ @Override
public Tuple getNext() throws IOException {
Text val = null;
try {
@@ -150,31 +153,34 @@ public class JsonLoader extends LoadFunc
// isn't what we expect we return a tuple with null fields rather than
// throwing an exception. That way a few mangled lines don't fail the
// job.
-
+
try {
if (p.nextToken() != JsonToken.START_OBJECT) {
warn("Bad record, could not find start of record " +
val.toString(), PigWarning.UDF_WARNING_1);
return t;
}
-
+
// Read each field in the record
for (int i = 0; i < fields.length; i++) {
t.set(i, readField(p, fields[i], i));
}
-
+
if (p.nextToken() != JsonToken.END_OBJECT) {
warn("Bad record, could not find end of record " +
val.toString(), PigWarning.UDF_WARNING_1);
return t;
}
-
+
} catch (Exception jpe) {
- warn("Bad record, returning null for " + val,
PigWarning.UDF_WARNING_1);
+ Throwable ex = jpe.getCause() == null ? jpe : jpe.getCause();
+ warn("Encountered exception " + ex.getClass().getName() + ": "
+ + ex.getMessage() + ". Bad record, returning null for "
+ + val, PigWarning.UDF_WARNING_1);
} finally {
p.close();
}
-
+
return t;
}
@@ -186,44 +192,44 @@ public class JsonLoader extends LoadFunc
// Read based on our expected type
case DataType.BOOLEAN:
return p.getBooleanValue();
-
+
case DataType.INTEGER:
return p.getIntValue();
-
+
case DataType.LONG:
return p.getLongValue();
-
+
case DataType.FLOAT:
return p.getFloatValue();
-
+
case DataType.DOUBLE:
return p.getDoubleValue();
-
+
case DataType.DATETIME:
DateTimeFormatter formatter =
ISODateTimeFormat.dateTimeParser();
return formatter.withOffsetParsed().parseDateTime(p.getText());
-
+
case DataType.BYTEARRAY:
byte[] b = p.getText().getBytes();
// Use the DBA constructor that copies the bytes so that we own
// the memory
return new DataByteArray(b, 0, b.length);
-
+
case DataType.CHARARRAY:
return p.getText();
-
+
case DataType.BIGINTEGER:
return p.getBigIntegerValue();
-
+
case DataType.BIGDECIMAL:
return new BigDecimal(p.getText());
-
+
default:
throw new IOException("Unknown type in input schema: " +
field.getType() );
}
}
-
+
private Object readField(JsonParser p,
ResourceFieldSchema field,
int fieldnum) throws IOException {
@@ -237,7 +243,7 @@ public class JsonLoader extends LoadFunc
// Check to see if this value was null
if (tok == JsonToken.VALUE_NULL) return null;
-
+
tok = p.nextToken();
// Read based on our expected type
@@ -324,11 +330,13 @@ public class JsonLoader extends LoadFunc
}
//------------------------------------------------------------------------
-
+
+ @Override
public void setUDFContextSignature(String signature) {
udfcSignature = signature;
}
+ @Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
@@ -338,12 +346,12 @@ public class JsonLoader extends LoadFunc
} else {
// Parse the schema
s = (new JsonMetadata()).getSchema(location, job, true);
-
+
if (s == null) {
throw new IOException("Unable to parse schema found in file in
" + location);
}
}
-
+
// Now that we have determined the schema, store it in our
// UDFContext properties object so we have it when we need it on the
// backend
@@ -355,18 +363,21 @@ public class JsonLoader extends LoadFunc
return s;
}
- public ResourceStatistics getStatistics(String location, Job job)
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job)
throws IOException {
// We don't implement this one.
return null;
}
+ @Override
public String[] getPartitionKeys(String location, Job job)
throws IOException {
// We don't have partitions
return null;
}
+ @Override
public void setPartitionFilter(Expression partitionFilter)
throws IOException {
// We don't have partitions