Author: daijy
Date: Wed Aug  5 21:04:43 2015
New Revision: 1694329

URL: http://svn.apache.org/r1694329
Log:
PIG-4623: Fixed the 'new line' character inside double-quote causing the csv 
parsing failure

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
    
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1694329&r1=1694328&r2=1694329&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug  5 21:04:43 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4623: Fixed the 'new line' character inside double-quote causing the csv 
parsing failure (ken11223 via daijy)
+
 PIG-4649: [Pig on Tez] Union followed by HCatStorer misses some data (rohini)
 
 PIG-4636: Occurred spelled incorrectly in error message for Launcher and 
POMergeCogroup (stevenmz via daijy)

Modified: 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java?rev=1694329&r1=1694328&r2=1694329&view=diff
==============================================================================
--- 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
 (original)
+++ 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVLoader.java
 Wed Aug  5 21:04:43 2015
@@ -87,6 +87,25 @@ public class CSVLoader extends FileInput
     public CSVLoader() {
     }
 
+    /*
+     * Merge two ByteBuffer
+     * Precondition is both input buffer cannot be null nor empty
+     */
+    private ByteBuffer enlargeBuffer(ByteBuffer a, int extraLen, boolean 
putSpaceInBetween) {
+       int totalLen = a.capacity() + extraLen;
+       if (putSpaceInBetween) {
+               totalLen++;
+       }
+       ByteBuffer c = ByteBuffer.allocate(totalLen);
+       for(int i=0; i<a.position(); i++) {
+               c.put(a.get(i));
+       }
+       if (putSpaceInBetween) {
+               c.put((byte) 32);       //Put a space afterwards
+       }
+       return c;
+    }
+    
     @Override
     public Tuple getNext() throws IOException {
         mProtoTuple = new ArrayList<Object>();
@@ -103,46 +122,64 @@ public class CSVLoader extends FileInput
             mRequiredColumnsInitialized = true;
         }
         try {
-            if (!in.nextKeyValue()) {
-                return null;
-            }                                                                  
                         
-            Text value = (Text) in.getCurrentValue();
-            byte[] buf = value.getBytes();
-            int len = value.getLength();
-            int fieldID = 0;
-
-            ByteBuffer fieldBuffer = ByteBuffer.allocate(len);
-
-            for (int i = 0; i < len; i++) {
-                byte b = buf[i];
-                inField = true;
-                if (inQuotedField) {
-                    if (b == DOUBLE_QUOTE) {
-                        evenQuotesSeen = !evenQuotesSeen;
-                        if (evenQuotesSeen) {
-                            fieldBuffer.put(DOUBLE_QUOTE);
-                        }
-                    } else
-                        if (!evenQuotesSeen &&
-                                (b == FIELD_DEL || b == RECORD_DEL)) {
-                            inQuotedField = false;
-                            inField = false;
-                            readField(fieldBuffer, fieldID++);
-                        } else {
-                            fieldBuffer.put(b);
-                        }
-                } else if (b == DOUBLE_QUOTE) {
-                    inQuotedField = true;
-                    evenQuotesSeen = true;
-                } else if (b == FIELD_DEL) {
-                    inField = false;
-                    readField(fieldBuffer, fieldID++); // end of the field
-                } else {
-                    evenQuotesSeen = true;
-                    fieldBuffer.put(b);
-                }
-            }
-            if (inField) readField(fieldBuffer, fieldID++);
+               boolean doneThisLineLogically = false;
+               ByteBuffer previousBufferToBeRead = null;
+               int fieldID = 0;
+               while(!doneThisLineLogically)   {
+                   if (!in.nextKeyValue()) {
+                       return null;
+                   }
+                   Text value = (Text) in.getCurrentValue();
+                   byte[] buf = value.getBytes();
+                   int len = value.getLength();
+                   ByteBuffer fieldBuffer = null;
+                   if (previousBufferToBeRead != null) {
+                       fieldBuffer = enlargeBuffer(previousBufferToBeRead, 
len, true);
+                   } else {
+                       fieldBuffer = ByteBuffer.allocate(len); 
+                   }
+       
+                   for (int i = 0; i < len; i++) {
+                       byte b = buf[i];
+                       inField = true;
+                       if (inQuotedField) {
+                           if (b == DOUBLE_QUOTE) {
+                               evenQuotesSeen = !evenQuotesSeen;
+                               if (evenQuotesSeen) {
+                                   fieldBuffer.put(DOUBLE_QUOTE);
+                               }
+                           } else
+                               if (!evenQuotesSeen &&
+                                       (b == FIELD_DEL || b == RECORD_DEL)) {
+                                   inQuotedField = false;
+                                   inField = false;
+                                   readField(fieldBuffer, fieldID++);
+                               } else {
+                                   fieldBuffer.put(b);
+                               }
+                       } else if (b == DOUBLE_QUOTE) {
+                           inQuotedField = true;
+                           evenQuotesSeen = true;
+                       } else if (b == FIELD_DEL) {
+                           inField = false;
+                           readField(fieldBuffer, fieldID++); // end of the 
field
+                       } else {
+                           evenQuotesSeen = true;
+                           fieldBuffer.put(b);
+                       }
+                   }
+                   doneThisLineLogically = true;
+                   
+                   if (inField)  {
+                       if (inQuotedField && evenQuotesSeen) {
+                               doneThisLineLogically = false;
+                               previousBufferToBeRead = fieldBuffer;
+                       } else {
+                               readField(fieldBuffer, fieldID++);
+                       }
+                   }
+                   
+               }       //End of while loo\
         } catch (InterruptedException e) {
             int errCode = 6018;
             String errMsg = "Error while reading input";
@@ -153,7 +190,7 @@ public class CSVLoader extends FileInput
         Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
         return t;
     }
-
+    
     private void readField(ByteBuffer buf, int fieldID) {
         if (mRequiredColumns==null || (mRequiredColumns.length>fieldID && 
mRequiredColumns[fieldID])) {
             byte[] bytes = new byte[buf.position()];

Modified: 
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java?rev=1694329&r1=1694328&r2=1694329&view=diff
==============================================================================
--- 
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
 (original)
+++ 
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
 Wed Aug  5 21:04:43 2015
@@ -30,7 +30,6 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
 import org.apache.pig.test.Util;
 import org.junit.Test;
 
@@ -38,10 +37,8 @@ public class TestCSVStorage {
     protected static final Log LOG = LogFactory.getLog(TestCSVStorage.class);
     
     private PigServer pigServer;
-    private MiniCluster cluster;
     
     public TestCSVStorage() throws ExecException, IOException {
-        cluster = MiniCluster.buildCluster();
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
         pigServer.getPigContext().getProperties()
                 .setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1");
@@ -85,4 +82,20 @@ public class TestCSVStorage {
         assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
     }
     
+    /*
+     * See PIG-4623
+     */
+    @Test
+    public void testQuotedQuotesWithNewLines() throws IOException {
+        String inputFileName = "TestCSVLoader-quotedquotes.txt";
+        Util.createLocalInputFile(inputFileName, 
+                new String[] {"\"foo \n ,\"\"bar\"\",baz\"", 
"\"\"\"\"\"\"\"\""});
+        String script = "a = load '" + inputFileName + "' using 
org.apache.pig.piggybank.storage.CSVLoader() " +
+        "   as (a:chararray); ";
+        Util.registerMultiLineQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("a");
+        assertEquals(Util.createTuple(new String[] {"foo   ,\"bar\",baz"}), 
it.next());
+        assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
+    }
+    
 }


Reply via email to