This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a6e577d031 GH-39857: [C++] Improve error message for "chunker out of 
sync" condition (#39892)
a6e577d031 is described below

commit a6e577d031d20a1a7d3dd01536b9a77db5d1bff8
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Feb 6 16:19:03 2024 +0100

    GH-39857: [C++] Improve error message for "chunker out of sync" condition 
(#39892)
    
    ### Rationale for this change
    
    When writing the CSV reader, we thought that the parser not finding the 
same line limits as the chunker should never happen, hence the terse "chunker 
out of sync" error message.
    
    It turns out that, if the input contains multiline cell values and the 
`newlines_in_values` option was not enabled, the chunker can happily delimit a 
block on a newline that's inside a quoted string. The parser will then see 
truncated data and will stop parsing, yielding a parsed size that's smaller 
than the first block (see added comment in the code).
    
    ### What changes are included in this PR?
    
    * Add some parser tests that showcase the condition encountered in GH-39857
    * Improve error message to guide users towards the solution
    
    ### Are these changes tested?
    
    There's no functional change, the error message itself isn't tested.
    
    ### Are there any user-facing changes?
    
    No.
    
    * Closes: #39857
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/csv/parser_test.cc | 22 ++++++++++++++++++++++
 cpp/src/arrow/csv/reader.cc      | 34 +++++++++++++++++++++++++++++-----
 python/pyarrow/tests/test_csv.py | 25 +++++++++++++++++++++++++
 3 files changed, 76 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/csv/parser_test.cc b/cpp/src/arrow/csv/parser_test.cc
index 960a69c59d..dd3d025202 100644
--- a/cpp/src/arrow/csv/parser_test.cc
+++ b/cpp/src/arrow/csv/parser_test.cc
@@ -175,6 +175,13 @@ void AssertParsePartial(BlockParser& parser, const 
std::string& str,
   ASSERT_EQ(parsed_size, expected_size);
 }
 
+void AssertParsePartial(BlockParser& parser, const 
std::vector<std::string_view>& data,
+                        uint32_t expected_size) {
+  uint32_t parsed_size = static_cast<uint32_t>(-1);
+  ASSERT_OK(parser.Parse(data, &parsed_size));
+  ASSERT_EQ(parsed_size, expected_size);
+}
+
 void AssertLastRowEq(const BlockParser& parser,
                      const std::vector<std::string>& expected) {
   std::vector<std::string> values;
@@ -376,6 +383,21 @@ TEST(BlockParser, TruncatedData) {
   }
 }
 
+TEST(BlockParser, TruncatedDataViews) {
+  // The BlockParser API mandates that, when passing a vector of views,
+  // only the last view may be a truncated CSV block.
+  // In the current implementation, receiving a truncated non-last view
+  // simply stops parsing after that view.
+  BlockParser parser(ParseOptions::Defaults(), /*num_cols=*/3);
+  AssertParsePartial(parser, Views({"a,b,", "c\n"}), 0);
+  AssertParsePartial(parser, Views({"a,b,c\nd,", "e,f\n"}), 6);
+
+  // More sophisticated: non-last block ends on some newline inside a quoted 
string
+  // (terse reproducer of gh-39857)
+  AssertParsePartial(parser, Views({"a,b,\"c\n", "\"\n"}), 0);
+  AssertParsePartial(parser, Views({"a,b,c\n\"d\n", "\",e,f\n"}), 6);
+}
+
 TEST(BlockParser, Final) {
   // Tests for ParseFinal()
   BlockParser parser(ParseOptions::Defaults());
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index 332fad054f..1ac25e290a 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -261,11 +261,10 @@ class SerialBlockReader : public BlockReader {
     auto consume_bytes = [this, bytes_before_buffer,
                           next_buffer](int64_t nbytes) -> Status {
       DCHECK_GE(nbytes, 0);
-      auto offset = nbytes - bytes_before_buffer;
-      if (offset < 0) {
-        // Should not happen
-        return Status::Invalid("CSV parser got out of sync with chunker");
-      }
+      int64_t offset = nbytes - bytes_before_buffer;
+      // All data before the buffer should have been consumed.
+      // This is checked in Parse() and BlockParsingOperator::operator().
+      DCHECK_GE(offset, 0);
       partial_ = SliceBuffer(buffer_, offset);
       buffer_ = next_buffer;
       return Status::OK();
@@ -400,6 +399,7 @@ class BlockParsingOperator {
         count_rows_(first_row >= 0),
         num_rows_seen_(first_row) {}
 
+  // TODO: this is almost entirely the same as ReaderMixin::Parse(). Refactor?
   Result<ParsedBlock> operator()(const CSVBlock& block) {
     constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
     auto parser = std::make_shared<BlockParser>(
@@ -427,9 +427,24 @@ class BlockParsingOperator {
     } else {
       RETURN_NOT_OK(parser->Parse(views, &parsed_size));
     }
+
+    // `partial + completion` should have been entirely consumed.
+    const int64_t bytes_before_buffer = block.partial->size() + 
block.completion->size();
+    if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
+      // This can happen if `newlines_in_values` is not enabled and
+      // `partial + completion` ends with a newline inside a quoted string.
+      // In this case, the BlockParser stops at the truncated data in the first
+      // block (see gh-39857).
+      return Status::Invalid(
+          "CSV parser got out of sync with chunker. This can mean the data 
file "
+          "contains cell values spanning multiple lines; please consider 
enabling "
+          "the option 'newlines_in_values'.");
+    }
+
     if (count_rows_) {
       num_rows_seen_ += parser->total_num_rows();
     }
+
     RETURN_NOT_OK(block.consume_bytes(parsed_size));
     return ParsedBlock{std::move(parser), block.block_index,
                        static_cast<int64_t>(parsed_size) + 
block.bytes_skipped};
@@ -738,6 +753,15 @@ class ReaderMixin {
     } else {
       RETURN_NOT_OK(parser->Parse(views, &parsed_size));
     }
+    // See BlockParsingOperator for explanation.
+    const int64_t bytes_before_buffer = partial->size() + completion->size();
+    if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
+      return Status::Invalid(
+          "CSV parser got out of sync with chunker. This can mean the data 
file "
+          "contains cell values spanning multiple lines; please consider 
enabling "
+          "the option 'newlines_in_values'.");
+    }
+
     if (count_rows_) {
       num_rows_seen_ += parser->total_num_rows();
     }
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 31f24187e3..bc1dd8a09a 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -667,6 +667,31 @@ class BaseTestCSV(abc.ABC):
             'b': ["e", "j"],
         }
 
+    def test_chunker_out_of_sync(self):
+        # GH-39892: if there are newlines in values, the parser may become
+        # out of sync with the chunker. In this case, we try to produce an
+        # informative error message.
+        rows = b"""a,b,c\nd,e,"f\n"\ng,h,i\n"""
+        expected = {
+            'a': ["d", "g"],
+            'b': ["e", "h"],
+            'c': ["f\n", "i"],
+        }
+        for block_size in range(8, 15):
+            # Sanity check: parsing works with newlines_in_values=True
+            d = self.read_bytes(
+                rows, parse_options=ParseOptions(newlines_in_values=True),
+                read_options=ReadOptions(block_size=block_size)).to_pydict()
+            assert d == expected
+        # With these block sizes, a block would end on the physical newline
+        # inside the quoted cell value, leading to a mismatch between
+        # CSV chunker and parser.
+        for block_size in range(8, 11):
+            with pytest.raises(ValueError,
+                               match="cell values spanning multiple lines"):
+                self.read_bytes(
+                    rows, read_options=ReadOptions(block_size=block_size))
+
 
 class BaseCSVTableRead(BaseTestCSV):
 

Reply via email to