This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 425666c8b7 GH-49448: [C++][CI] Detect mismatching schema in 
differential IPC fuzzing (#49451)
425666c8b7 is described below

commit 425666c8b726dd81401d56305bc5138f755f3663
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Mar 10 16:28:02 2026 +0100

    GH-49448: [C++][CI] Detect mismatching schema in differential IPC fuzzing 
(#49451)
    
    ### Rationale for this change
    
    In https://github.com/apache/arrow/issues/49311 we added differential 
fuzzing for the IPC file fuzzer, where we compare the contents as read by the 
IPC file reader and the IPC stream reader, respectively.
    
    However, as the IPC file footer carries a duplicate definition of the 
file's schema, the fuzzer could mutate it and make it mismatch the IPC stream 
schema. The fuzz target would currently fail on such a file, even though it's 
technically not a bug in the IPC implementation.
    
    This was detected by OSS-Fuzz in 
https://issues.oss-fuzz.com/issues/486127061
    
    ### What changes are included in this PR?
    
    * Detect a mismatching schema between IPC stream and IPC file, and skip the 
batches comparison in that case.
    * Compare results between IPC file reads with pre-buffering enabled/disabled
    * When comparing IPC read results, also compare custom metadata, not just 
the data batches themselves
    
    ### Are these changes tested?
    
    Yes, by new regression test.
    
    ### Are there any user-facing changes?
    
    No.
    
    * GitHub Issue: #49448
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/ipc/reader.cc | 112 ++++++++++++++++++++++++++++++++++----------
 testing                     |   2 +-
 2 files changed, 88 insertions(+), 26 deletions(-)

diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 991d238240..6aa502dc27 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <numeric>
 #include <optional>
+#include <sstream>
 #include <string>
 #include <type_traits>
 #include <unordered_map>
@@ -42,6 +43,7 @@
 #include "arrow/ipc/metadata_internal.h"
 #include "arrow/ipc/reader_internal.h"
 #include "arrow/ipc/writer.h"
+#include "arrow/pretty_print.h"
 #include "arrow/record_batch.h"
 #include "arrow/sparse_tensor.h"
 #include "arrow/status.h"
@@ -2694,13 +2696,49 @@ Status ValidateFuzzBatch(const RecordBatchWithMetadata& 
batch) {
 
 Status CompareFuzzBatches(const RecordBatchWithMetadata& left,
                           const RecordBatchWithMetadata& right) {
+  if ((left.custom_metadata != nullptr) != (right.custom_metadata != nullptr) 
||
+      (left.custom_metadata && 
!left.custom_metadata->Equals(*right.custom_metadata))) {
+    std::stringstream ss;
+    auto print_metadata = [&](const RecordBatchWithMetadata& batch) {
+      if (batch.custom_metadata) {
+        ss << batch.custom_metadata->ToString();
+      } else {
+        ss << "nullptr";
+      }
+    };
+    ss << "Custom metadata unequal: left = ";
+    print_metadata(left);
+    ss << "\nright = ";
+    print_metadata(right);
+    return Status::Invalid(ss.str());
+  }
+
   bool ok = true;
   if ((left.batch != nullptr) != (right.batch != nullptr)) {
     ok = false;
   } else if (left.batch) {
     ok &= left.batch->Equals(*right.batch, EqualOptions{}.nans_equal(true));
   }
-  return ok ? Status::OK() : Status::Invalid("Batches unequal");
+  if (ok) {
+    return Status::OK();
+  }
+  std::stringstream ss;
+  PrettyPrintOptions options{};
+  options.show_field_metadata = true;
+  options.show_schema_metadata = true;
+  auto print_batch = [&](const RecordBatchWithMetadata& batch) {
+    if (batch.batch) {
+      ss << "\n";
+      ARROW_UNUSED(PrettyPrint(*batch.batch, options, &ss));
+    } else {
+      ss << "nullptr";
+    }
+  };
+  ss << "Batches unequal: left = ";
+  print_batch(left);
+  ss << "\nright = ";
+  print_batch(right);
+  return Status::Invalid(ss.str());
 }
 
 Status CompareFuzzBatches(const std::vector<RecordBatchWithMetadata>& left,
@@ -2748,8 +2786,13 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
 
   Status final_status;
 
+  struct IpcReadResult {
+    std::shared_ptr<Schema> schema;
+    std::vector<RecordBatchWithMetadata> batches = {};
+  };
+
   // Try to read the IPC file as a stream to compare the results (differential 
fuzzing)
-  auto do_stream_read = [&]() -> Result<std::vector<RecordBatchWithMetadata>> {
+  auto do_stream_read = [&]() -> Result<IpcReadResult> {
     io::BufferReader buffer_reader(buffer);
     // Skip magic bytes at the beginning
     RETURN_NOT_OK(
@@ -2766,11 +2809,10 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
       }
       batches.push_back(batch);
     }
-    return batches;
+    return IpcReadResult{batch_reader->schema(), batches};
   };
 
-  auto do_file_read =
-      [&](bool pre_buffer) -> Result<std::vector<RecordBatchWithMetadata>> {
+  auto do_file_read = [&](bool pre_buffer) -> Result<IpcReadResult> {
     io::BufferReader buffer_reader(buffer);
     ARROW_ASSIGN_OR_RAISE(auto batch_reader,
                           RecordBatchFileReader::Open(&buffer_reader, 
FuzzingOptions()));
@@ -2779,46 +2821,66 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
       RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
     }
 
+    IpcReadResult result = {.schema = batch_reader->schema()};
     const int n_batches = batch_reader->num_record_batches();
-    std::vector<RecordBatchWithMetadata> batches;
     // Delay error return until the end, as we want to access all record 
batches
     Status st;
     for (int i = 0; i < n_batches; ++i) {
       RecordBatchWithMetadata batch;
       st &= batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
       st &= ValidateFuzzBatch(batch);
-      batches.push_back(batch);
+      result.batches.push_back(batch);
     }
     RETURN_NOT_OK(st);
-    return batches;
+    return result;
   };
 
-  // Lazily-initialized if the IPC reader succeeds
-  std::optional<Result<std::vector<RecordBatchWithMetadata>>> 
maybe_stream_batches;
+  // Persistent read result for differential fuzzing.
+  std::optional<IpcReadResult> maybe_read_result;
+
+  auto compare_result = [&](IpcReadResult new_result) {
+    // Differential fuzzing: compare results read using different APIs or 
options
+    // with one another.
+    if (maybe_read_result.has_value()) {
+      ARROW_CHECK_OK(CompareFuzzBatches(maybe_read_result->batches, 
new_result.batches));
+    } else {
+      maybe_read_result = std::move(new_result);
+    }
+  };
 
   for (const bool pre_buffer : {false, true}) {
-    auto maybe_file_batches = do_file_read(pre_buffer);
-    final_status &= maybe_file_batches.status();
-    if (maybe_file_batches.ok()) {
-      // IPC file read successful: differential fuzzing with IPC stream reader,
-      // if possible.
-      // NOTE: some valid IPC files may not be readable as IPC streams,
-      // for example because of excess spacing between IPC messages.
-      // A regular IPC file writer would not produce them, but fuzzing might.
-      if (!maybe_stream_batches.has_value()) {
-        maybe_stream_batches = do_stream_read();
-        final_status &= maybe_stream_batches->status();
-      }
-      if (maybe_stream_batches->ok()) {
+    auto maybe_file_result = do_file_read(pre_buffer);
+    final_status &= maybe_file_result.status();
+    if (maybe_file_result.ok()) {
+      compare_result(*std::move(maybe_file_result));
+    }
+  }
+
+  if (maybe_read_result.has_value()) {
+    // IPC file read successful: compare results with IPC stream reader,
+    // if possible.
+    // NOTE: some valid IPC files may not be readable as IPC streams,
+    // for example because of excess spacing between IPC messages.
+    // A regular IPC file writer would not produce them, but fuzzing might.
+    auto maybe_stream_result = do_stream_read();
+    final_status &= maybe_stream_result.status();
+    if (maybe_stream_result.ok()) {
+      if (maybe_read_result->schema->Equals(maybe_stream_result->schema,
+                                            /*check_metadata=*/true)) {
         // XXX: in some rare cases, an IPC file might read unequal to the 
enclosed
         // IPC stream, for example if the footer skips some batches or orders 
the
         // batches differently. We should revisit this if the fuzzer generates 
such
         // files.
-        ARROW_CHECK_OK(CompareFuzzBatches(*maybe_file_batches, 
**maybe_stream_batches));
+        compare_result(*maybe_stream_result);
+      } else {
+        // The fuzzer might have mutated the schema definition that is 
duplicated
+        // in the IPC file footer, in which case the comparison above would 
fail.
+        final_status &= Status::TypeError(
+            "Schema mismatch between IPC stream and IPC file footer, skipping "
+            "comparison");
       }
     }
   }
-
   return final_status;
 }
 
diff --git a/testing b/testing
index ca49b7795c..fff99f68b7 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit ca49b7795c09181c2915b0a5e762a8fac70f9556
+Subproject commit fff99f68b7085b7ac1c210f79f1087f6a085a06b

Reply via email to