Author: rohini
Date: Mon Apr 6 23:56:14 2015
New Revision: 1671714
URL: http://svn.apache.org/r1671714
Log:
PIG-4498: AvroStorage in Piggbank does not handle bad records and fails (viraj
via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671714&r1=1671713&r2=1671714&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 6 23:56:14 2015
@@ -58,6 +58,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4498: AvroStorage in Piggbank does not handle bad records and fails (viraj
via rohini)
+
PIG-4499: mvn-build miss tez classes in pig-h2.jar (daijy)
PIG-4488: Pig on tez mask tez.queue.name (daijy)
Modified:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1671714&r1=1671713&r2=1671714&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
Mon Apr 6 23:56:14 2015
@@ -194,30 +194,43 @@ public class PigAvroRecordReader extends
@Override
public Writable getCurrentValue() throws IOException, InterruptedException
{
- Object obj = reader.next();
- Tuple result = null;
- if (obj instanceof Tuple) {
- AvroStorageLog.details("Class =" + obj.getClass());
- result = (Tuple) obj;
- } else {
- if (obj != null) {
- AvroStorageLog.details("Wrap class " + obj.getClass() + " as a
tuple.");
+ try {
+ Object obj = reader.next();
+ Tuple result = null;
+ if (obj instanceof Tuple) {
+ AvroStorageLog.details("Class =" + obj.getClass());
+ result = (Tuple) obj;
+ } else {
+ if (obj != null) {
+ AvroStorageLog.details("Wrap class " + obj.getClass() + "
as a tuple.");
+ }
+ else {
+ AvroStorageLog.details("Wrap null as a tuple.");
+ }
+ result = wrapAsTuple(obj);
}
- else {
- AvroStorageLog.details("Wrap null as a tuple.");
+ if (schemaToMergedSchemaMap != null) {
+ // remap the position of fields to the merged schema
+ Map<Integer, Integer> map = schemaToMergedSchemaMap.get(path);
+ if (map == null) {
+ throw new IOException("The schema of '" + path + "' " +
+ "is not merged by AvroStorage.");
+ }
+ result = remap(result, map);
}
- result = wrapAsTuple(obj);
+ return result;
}
- if (schemaToMergedSchemaMap != null) {
- // remap the position of fields to the merged schema
- Map<Integer, Integer> map = schemaToMergedSchemaMap.get(path);
- if (map == null) {
- throw new IOException("The schema of '" + path + "' " +
- "is not merged by AvroStorage.");
+ catch(Exception e) {
+ if (ignoreBadFiles) {
+ LOG.warn("Ignoring bad record for '" + path + "'.");
+ return null;
+ }
+ else {
+ //re-throw exception
+ LOG.error("Bad record for '" + path + "'.");
+ throw new IOException(e);
}
- result = remap(result, map);
}
- return result;
}
/**