Author: daijy
Date: Wed Aug 5 21:04:43 2015
New Revision: 1694329
URL: http://svn.apache.org/r1694329
Log:
PIG-4623: Fixed the 'new line' character inside double-quote causing the csv
parsing failure
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1694329&r1=1694328&r2=1694329&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 5 21:04:43 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4623: Fixed the 'new line' character inside double-quote causing the csv
parsing failure (ken11223 via daijy)
+
PIG-4649: [Pig on Tez] Union followed by HCatStorer misses some data (rohini)
PIG-4636: Occurred spelled incorrectly in error message for Launcher and
POMergeCogroup (stevenmz via daijy)
Modified:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java?rev=1694329&r1=1694328&r2=1694329&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
Wed Aug 5 21:04:43 2015
@@ -87,6 +87,25 @@ public class CSVLoader extends FileInput
public CSVLoader() {
}
+ /*
+ * Merge two ByteBuffer
+ * Precondition is both input buffer cannot be null nor empty
+ */
+ private ByteBuffer enlargeBuffer(ByteBuffer a, int extraLen, boolean
putSpaceInBetween) {
+ int totalLen = a.capacity() + extraLen;
+ if (putSpaceInBetween) {
+ totalLen++;
+ }
+ ByteBuffer c = ByteBuffer.allocate(totalLen);
+ for(int i=0; i<a.position(); i++) {
+ c.put(a.get(i));
+ }
+ if (putSpaceInBetween) {
+ c.put((byte) 32); //Put a space afterwards
+ }
+ return c;
+ }
+
@Override
public Tuple getNext() throws IOException {
mProtoTuple = new ArrayList<Object>();
@@ -103,46 +122,64 @@ public class CSVLoader extends FileInput
mRequiredColumnsInitialized = true;
}
try {
- if (!in.nextKeyValue()) {
- return null;
- }
- Text value = (Text) in.getCurrentValue();
- byte[] buf = value.getBytes();
- int len = value.getLength();
- int fieldID = 0;
-
- ByteBuffer fieldBuffer = ByteBuffer.allocate(len);
-
- for (int i = 0; i < len; i++) {
- byte b = buf[i];
- inField = true;
- if (inQuotedField) {
- if (b == DOUBLE_QUOTE) {
- evenQuotesSeen = !evenQuotesSeen;
- if (evenQuotesSeen) {
- fieldBuffer.put(DOUBLE_QUOTE);
- }
- } else
- if (!evenQuotesSeen &&
- (b == FIELD_DEL || b == RECORD_DEL)) {
- inQuotedField = false;
- inField = false;
- readField(fieldBuffer, fieldID++);
- } else {
- fieldBuffer.put(b);
- }
- } else if (b == DOUBLE_QUOTE) {
- inQuotedField = true;
- evenQuotesSeen = true;
- } else if (b == FIELD_DEL) {
- inField = false;
- readField(fieldBuffer, fieldID++); // end of the field
- } else {
- evenQuotesSeen = true;
- fieldBuffer.put(b);
- }
- }
- if (inField) readField(fieldBuffer, fieldID++);
+ boolean doneThisLineLogically = false;
+ ByteBuffer previousBufferToBeRead = null;
+ int fieldID = 0;
+ while(!doneThisLineLogically) {
+ if (!in.nextKeyValue()) {
+ return null;
+ }
+ Text value = (Text) in.getCurrentValue();
+ byte[] buf = value.getBytes();
+ int len = value.getLength();
+ ByteBuffer fieldBuffer = null;
+ if (previousBufferToBeRead != null) {
+ fieldBuffer = enlargeBuffer(previousBufferToBeRead,
len, true);
+ } else {
+ fieldBuffer = ByteBuffer.allocate(len);
+ }
+
+ for (int i = 0; i < len; i++) {
+ byte b = buf[i];
+ inField = true;
+ if (inQuotedField) {
+ if (b == DOUBLE_QUOTE) {
+ evenQuotesSeen = !evenQuotesSeen;
+ if (evenQuotesSeen) {
+ fieldBuffer.put(DOUBLE_QUOTE);
+ }
+ } else
+ if (!evenQuotesSeen &&
+ (b == FIELD_DEL || b == RECORD_DEL)) {
+ inQuotedField = false;
+ inField = false;
+ readField(fieldBuffer, fieldID++);
+ } else {
+ fieldBuffer.put(b);
+ }
+ } else if (b == DOUBLE_QUOTE) {
+ inQuotedField = true;
+ evenQuotesSeen = true;
+ } else if (b == FIELD_DEL) {
+ inField = false;
+ readField(fieldBuffer, fieldID++); // end of the
field
+ } else {
+ evenQuotesSeen = true;
+ fieldBuffer.put(b);
+ }
+ }
+ doneThisLineLogically = true;
+
+ if (inField) {
+ if (inQuotedField && evenQuotesSeen) {
+ doneThisLineLogically = false;
+ previousBufferToBeRead = fieldBuffer;
+ } else {
+ readField(fieldBuffer, fieldID++);
+ }
+ }
+
+ } //End of while loo\
} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
@@ -153,7 +190,7 @@ public class CSVLoader extends FileInput
Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
return t;
}
-
+
private void readField(ByteBuffer buf, int fieldID) {
if (mRequiredColumns==null || (mRequiredColumns.length>fieldID &&
mRequiredColumns[fieldID])) {
byte[] bytes = new byte[buf.position()];
Modified:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java?rev=1694329&r1=1694328&r2=1694329&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
Wed Aug 5 21:04:43 2015
@@ -30,7 +30,6 @@ import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
import org.apache.pig.test.Util;
import org.junit.Test;
@@ -38,10 +37,8 @@ public class TestCSVStorage {
protected static final Log LOG = LogFactory.getLog(TestCSVStorage.class);
private PigServer pigServer;
- private MiniCluster cluster;
public TestCSVStorage() throws ExecException, IOException {
- cluster = MiniCluster.buildCluster();
pigServer = new PigServer(ExecType.LOCAL, new Properties());
pigServer.getPigContext().getProperties()
.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1");
@@ -85,4 +82,20 @@ public class TestCSVStorage {
assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
}
+ /*
+ * See PIG-4623
+ */
+ @Test
+ public void testQuotedQuotesWithNewLines() throws IOException {
+ String inputFileName = "TestCSVLoader-quotedquotes.txt";
+ Util.createLocalInputFile(inputFileName,
+ new String[] {"\"foo \n ,\"\"bar\"\",baz\"",
"\"\"\"\"\"\"\"\""});
+ String script = "a = load '" + inputFileName + "' using
org.apache.pig.piggybank.storage.CSVLoader() " +
+ " as (a:chararray); ";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("a");
+ assertEquals(Util.createTuple(new String[] {"foo ,\"bar\",baz"}),
it.next());
+ assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
+ }
+
}