pitrou commented on code in PR #14355:
URL: https://github.com/apache/arrow/pull/14355#discussion_r1041103879


##########
cpp/src/arrow/json/reader.cc:
##########
@@ -183,35 +493,47 @@ Result<std::shared_ptr<TableReader>> TableReader::Make(
   return ptr;
 }
 
+Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
+    std::shared_ptr<io::InputStream> stream, const ReadOptions& read_options,
+    const ParseOptions& parse_options, const io::IOContext& io_context,
+    Executor* cpu_executor) {
+  auto future = StreamingReaderImpl::MakeAsync(
+      std::make_shared<DecodeContext>(parse_options, io_context.pool()),
+      std::move(stream), io_context, cpu_executor, read_options);
+  return future.Then([](const std::shared_ptr<StreamingReaderImpl>& reader) {
+    return std::static_pointer_cast<StreamingReader>(reader);
+  });
+}
+
+Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
+    std::shared_ptr<io::InputStream> stream, const ReadOptions& read_options,
+    const ParseOptions& parse_options, const io::IOContext& io_context,
+    Executor* cpu_executor) {
+  auto future =
+      MakeAsync(std::move(stream), read_options, parse_options, io_context, 
cpu_executor);
+  return future.result();

Review Comment:
   Q: how hard would it be to make `StreamingReader::Make` return its result 
without needing to read the first block (which might incur a non-trivial 
latency, e.g. if loading from S3)?
   
   AFAICT, the main complication would be in the `StreamingReader::schema()` 
implementation needing to wait on the first block's Future...



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));

Review Comment:
   Same here.



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));

Review Comment:
   Let's validate the emitted batch as well.
   ```suggestion
       ASSERT_FALSE(IsIterationEnd(*out));
       ASSERT_OK((**out).ValidateFull());
   ```



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_processed(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = 
UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": 
"foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> 
futures(expected.num_batches + 2);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_processed(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, 
internal::UnwrapOrRaise(std::move(results)));
+  batches.erase(std::remove(batches.begin(), batches.end(), nullptr), 
batches.end());
+  EXPECT_EQ(batches.size(), static_cast<size_t>(expected.num_batches));
+
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, FuturesOutliveReader) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  auto stream = MakeTestStream(expected.json, kIoLatency);
+  std::vector<Future<std::shared_ptr<RecordBatch>>> 
futures(expected.num_batches + 2);
+  {
+    ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream));
+    EXPECT_EQ(reader->bytes_processed(), 0);
+    for (auto& future : futures) {
+      future = reader->ReadNextAsync();
+    }
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  ASSERT_OK_AND_ASSIGN(auto batches, 
internal::UnwrapOrRaise(std::move(results)));
+  batches.erase(std::remove(batches.begin(), batches.end(), nullptr), 
batches.end());

Review Comment:
   Same thing here: this might be a bit stricter?



##########
docs/source/cpp/json.rst:
##########
@@ -66,6 +73,45 @@ A JSON file is read from a :class:`~arrow::io::InputStream`.
       }
    }
 
+StreamingReader
+===============
+
+Reads a file incrementally in fixed-size blocks, each yielding a

Review Comment:
   ```suggestion
   :class:`~StreamingReader` reads a file incrementally from blocks of a 
roughly equal byte size, each yielding a
   ```



##########
docs/source/cpp/json.rst:
##########
@@ -66,6 +73,45 @@ A JSON file is read from a :class:`~arrow::io::InputStream`.
       }
    }
 
+StreamingReader
+===============
+
+Reads a file incrementally in fixed-size blocks, each yielding a
+:class:`~arrow::RecordBatch`. Each independent JSON object in a block
+is converted to a row in the output batch.
+
+All batches adhere to a consistent :class:`~arrow:Schema`, which is
+derived from the first loaded batch. Alternatively, an explicit schema
+may be passed via :class:`~ParseOptions`.
+
+.. code-block:: cpp
+
+   #include "arrow/json/api.h"
+
+   {
+      // ...
+      auto read_options = arrow::json::ReadOptions::Defaults();
+      auto parse_options = arrow::json::ParseOptions::Defaults();
+
+      std::shared_ptr<arrow::io::InputStream> stream;
+      auto result = arrow::json::StreamingReader::Make(stream,
+                                                       read_options,
+                                                       parse_options);
+      if (!result.ok()) {
+         // Handle instantiation error
+      }
+      std::shared_ptr<arrow::json::StreamingReader> reader = *result;
+
+      std::shared_ptr<arrow::RecordBatch> batch;

Review Comment:
   I would move this declaration inside the loop, which will avoid holding to 
memory too long:
   ```c++
            std::shared_ptr<arrow::RecordBatch> batch = *maybe_batch;
            // Operate on each batch...
   ```
   



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));

Review Comment:
   Can we also check the error message here?



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_processed(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},

Review Comment:
   Same here.



##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,442 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; 
}
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = 
UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == 
UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), 
parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return 
parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return 
promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size 
= nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + 
block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, 
block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... 
args) {
+    return [self = 
std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return 
(*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> 
next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        DCHECK_EQ(partial_, nullptr) << "Logic error: non-null partial with 
null buffer";
+        return TransformFinish();
+      }
+      partial_ = std::make_shared<Buffer>("");
+      buffer_ = std::move(next_buffer);
+      return TransformSkip();
+    }
+    DCHECK_NE(partial_, nullptr);
+
+    std::shared_ptr<Buffer> whole, completion, next_partial;
+    if (!next_buffer) {
+      // End of file reached => compute completion from penultimate block
+      RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, 
&whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      // Get completion of partial from previous block.
+      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, 
&completion,
+                                                 &starts_with_whole));
+      // Get all whole objects entirely inside the current buffer
+      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, 
&next_partial));
+    }
+
+    buffer_ = std::move(next_buffer);
+    return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial),
+                                       std::move(completion), std::move(whole),
+                                       index_++});
+  }
+
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<Buffer> partial_;
+  std::shared_ptr<Buffer> buffer_;
+  int64_t index_ = 0;
+};
+
+template <typename... Args>
+Iterator<ChunkedBlock> MakeChunkingIterator(Iterator<std::shared_ptr<Buffer>> 
source,
+                                            Args&&... args) {
+  return MakeTransformedIterator(std::move(source),
+                                 
ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
+
+// NOTE: Not reentrant. Incoming buffers are processed sequentially and the 
transformer's
+// internal state gets updated on each call.
+template <typename... Args>
+AsyncGenerator<ChunkedBlock> MakeChunkingGenerator(
+    AsyncGenerator<std::shared_ptr<Buffer>> source, Args&&... args) {
+  return MakeTransformedGenerator(std::move(source),
+                                  
ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
 
 class TableReaderImpl : public TableReader,
                         public std::enable_shared_from_this<TableReaderImpl> {
  public:
   TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
                   const ParseOptions& parse_options,
                   std::shared_ptr<TaskGroup> task_group)
-      : pool_(pool),
+      : decode_context_(parse_options, pool),
         read_options_(read_options),
-        parse_options_(parse_options),
-        chunker_(MakeChunker(parse_options_)),
         task_group_(std::move(task_group)) {}
 
   Status Init(std::shared_ptr<io::InputStream> input) {
     ARROW_ASSIGN_OR_RAISE(auto it,
                           io::MakeInputStreamIterator(input, 
read_options_.block_size));
     return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
-        .Value(&block_iterator_);
+        .Value(&buffer_iterator_);
   }
 
   Result<std::shared_ptr<Table>> Read() override {
-    RETURN_NOT_OK(MakeBuilder());
-
-    ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
-    if (block == nullptr) {
+    auto block_it = MakeChunkingIterator(std::move(buffer_iterator_),
+                                         
MakeChunker(decode_context_.parse_options()));
+
+    bool did_read = false;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(auto block, block_it.Next());
+      if (IsIterationEnd(block)) break;
+      if (!did_read) {
+        did_read = true;
+        RETURN_NOT_OK(MakeBuilder());
+      }
+      task_group_->Append(
+          [self = shared_from_this(), block] { return 
self->ParseAndInsert(block); });
+    }
+    if (!did_read) {
       return Status::Invalid("Empty JSON file");
     }
 
-    auto self = shared_from_this();
-    auto empty = std::make_shared<Buffer>("");
+    std::shared_ptr<ChunkedArray> array;
+    RETURN_NOT_OK(builder_->Finish(&array));
+    return Table::FromChunkedStructArray(array);
+  }
 
-    int64_t block_index = 0;
-    std::shared_ptr<Buffer> partial = empty;
+ private:
+  Status MakeBuilder() {
+    return MakeChunkedArrayBuilder(task_group_, decode_context_.pool(),
+                                   decode_context_.promotion_graph(),
+                                   decode_context_.conversion_type(), 
&builder_);
+  }
 
-    while (block != nullptr) {
-      std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+  Status ParseAndInsert(const ChunkedBlock& block) {
+    ARROW_ASSIGN_OR_RAISE(auto parsed, ParseBlock(block, 
decode_context_.parse_options(),
+                                                  decode_context_.pool()));
+    builder_->Insert(block.index, field("", parsed->type()), parsed);
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+  DecodeContext decode_context_;
+  ReadOptions read_options_;
+  std::shared_ptr<TaskGroup> task_group_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
 
-      if (next_block == nullptr) {
-        // End of file reached => compute completion from penultimate block
-        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, 
&whole));
-      } else {
-        std::shared_ptr<Buffer> starts_with_whole;
-        // Get completion of partial from previous block.
-        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
-                                                   &starts_with_whole));
+// Callable object for parsing/converting individual JSON blocks. The class 
itself can be
+// called concurrently but reads from the `DecodeContext` aren't synchronized
+class DecodingOperator {
+ public:
+  explicit DecodingOperator(std::shared_ptr<const DecodeContext> context)
+      : context_(std::move(context)) {}
 
-        // Get all whole objects entirely inside the current buffer
-        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, 
&next_partial));
-      }
+  Result<DecodedBlock> operator()(const ChunkedBlock& block) const {
+    int64_t num_bytes;
+    ARROW_ASSIGN_OR_RAISE(auto unconverted, ParseBlock(block, 
context_->parse_options(),
+                                                       context_->pool(), 
&num_bytes));
 
-      // Launch parse task
-      task_group_->Append([self, partial, completion, whole, block_index] {
-        return self->ParseAndInsert(partial, completion, whole, block_index);
-      });
-      block_index++;
+    std::shared_ptr<ChunkedArrayBuilder> builder;
+    RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), 
context_->pool(),
+                                          context_->promotion_graph(),
+                                          context_->conversion_type(), 
&builder));
+    builder->Insert(0, field("", unconverted->type()), unconverted);
 
-      partial = next_partial;
-      block = next_block;
-    }
+    std::shared_ptr<ChunkedArray> chunked;
+    RETURN_NOT_OK(builder->Finish(&chunked));
+    ARROW_ASSIGN_OR_RAISE(auto batch, 
RecordBatch::FromStructArray(chunked->chunk(0)));
 
-    std::shared_ptr<ChunkedArray> array;
-    RETURN_NOT_OK(builder_->Finish(&array));
-    return Table::FromChunkedStructArray(array);
+    return DecodedBlock{std::move(batch), num_bytes};
   }
 
  private:
-  Status MakeBuilder() {
-    auto type = parse_options_.explicit_schema
-                    ? struct_(parse_options_.explicit_schema->fields())
-                    : struct_({});
+  std::shared_ptr<const DecodeContext> context_;
+};
 
-    auto promotion_graph =
-        parse_options_.unexpected_field_behavior == 
UnexpectedFieldBehavior::InferType
-            ? GetPromotionGraph()
-            : nullptr;
+// TODO(benibus): Replace with `MakeApplyGenerator` from
+// github.com/apache/arrow/pull/14269 if/when it gets merged
+//
+// Reads from the source and spawns fan-out decoding tasks on the given 
executor
+AsyncGenerator<DecodedBlock> MakeDecodingGenerator(
+    AsyncGenerator<ChunkedBlock> source,
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder,
+    Executor* executor) {
+  struct State {
+    AsyncGenerator<ChunkedBlock> source;
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder;
+    Executor* executor;
+  } state{std::move(source), std::move(decoder), executor};
+
+  return [state = std::make_shared<State>(std::move(state))] {
+    auto options = CallbackOptions::Defaults();
+    options.executor = state->executor;
+    // Since the decode step is heavy we want to schedule it as
+    // a separate task so as to maximize task distribution accross CPU cores
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source().Then(
+        [state](const ChunkedBlock& block) -> Result<DecodedBlock> {
+          if (IsIterationEnd(block)) {
+            return IterationEnd<DecodedBlock>();
+          } else {
+            return state->decoder(block);
+          }
+        },
+        {}, options);
+  };
+}
 
-    return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, 
&builder_);
-  }
-
-  Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
-                        const std::shared_ptr<Buffer>& completion,
-                        const std::shared_ptr<Buffer>& whole, int64_t 
block_index) {
-    std::unique_ptr<BlockParser> parser;
-    RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
-    RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + 
completion->size() +
-                                               whole->size()));
-
-    if (partial->size() != 0 || completion->size() != 0) {
-      std::shared_ptr<Buffer> straddling;
-      if (partial->size() == 0) {
-        straddling = completion;
-      } else if (completion->size() == 0) {
-        straddling = partial;
-      } else {
-        ARROW_ASSIGN_OR_RAISE(straddling,
-                              ConcatenateBuffers({partial, completion}, 
pool_));
-      }
-      RETURN_NOT_OK(parser->Parse(straddling));
+// Adds async-reentrancy to `source` by submitting tasks to a single-threaded 
executor
+// (FIFO order) - ensuring, at most, one future is pending at a time
+template <typename T>
+Result<AsyncGenerator<T>> MakeReentrantGenerator(AsyncGenerator<T> source) {
+  struct State {
+    AsyncGenerator<T> source;
+    std::shared_ptr<ThreadPool> thread_pool;
+  } state{std::move(source), nullptr};
+  ARROW_ASSIGN_OR_RAISE(state.thread_pool, ThreadPool::Make(1));
+
+  return [state = std::make_shared<State>(std::move(state))]() -> Future<T> {
+    auto maybe_future =
+        state->thread_pool->Submit([state] { return state->source().result(); 
});
+    return DeferNotOk(std::move(maybe_future));
+  };
+}
+
+class StreamingReaderImpl : public StreamingReader {
+ public:
+  StreamingReaderImpl(DecodedBlock first_block, AsyncGenerator<DecodedBlock> 
source,
+                      const std::shared_ptr<DecodeContext>& context, int 
max_readahead)
+      : first_block_(std::move(first_block)),
+        schema_(first_block_->record_batch->schema()),
+        bytes_processed_(std::make_shared<std::atomic<int64_t>>(0)) {
+    // Set the final schema for future invocations of the source generator
+    context->SetStrictSchema(schema_);
+    if (max_readahead > 0) {
+      source = MakeReadaheadGenerator(std::move(source), max_readahead);
     }
+    generator_ = MakeMappedGenerator(
+        std::move(source), [counter = bytes_processed_](const DecodedBlock& 
out) {
+          counter->fetch_add(out.num_bytes);
+          return out.record_batch;
+        });
+  }
 
-    if (whole->size() != 0) {
-      RETURN_NOT_OK(parser->Parse(whole));
+  static Future<std::shared_ptr<StreamingReaderImpl>> MakeAsync(
+      std::shared_ptr<DecodeContext> context, std::shared_ptr<io::InputStream> 
stream,
+      io::IOContext io_context, Executor* cpu_executor, const ReadOptions& 
read_options) {
+    if (!cpu_executor) {
+      cpu_executor = GetCpuThreadPool();
     }
 
-    std::shared_ptr<Array> parsed;
-    RETURN_NOT_OK(parser->Finish(&parsed));
-    builder_->Insert(block_index, field("", parsed->type()), parsed);
-    return Status::OK();
+    ARROW_ASSIGN_OR_RAISE(
+        auto buffer_it,
+        io::MakeInputStreamIterator(std::move(stream), 
read_options.block_size));
+    ARROW_ASSIGN_OR_RAISE(
+        auto buffer_gen,
+        MakeBackgroundGenerator(std::move(buffer_it), io_context.executor()));
+    buffer_gen = MakeTransferredGenerator(std::move(buffer_gen), cpu_executor);
+
+    auto chunking_gen = MakeChunkingGenerator(std::move(buffer_gen),
+                                              
MakeChunker(context->parse_options()));
+    ARROW_ASSIGN_OR_RAISE(chunking_gen, 
MakeReentrantGenerator(std::move(chunking_gen)));
+
+    auto decoding_gen = MakeDecodingGenerator(std::move(chunking_gen),
+                                              DecodingOperator(context), 
cpu_executor);

Review Comment:
   I didn't notice this on the previous review, but the CPU executor should 
ideally not be used if `use_threads` is false, i.e. every CPU-heavy task should 
happen on the thread calling `ReadNext()` (and, I suppose, `ReadNextAsync()` 
would then be blocking...).



##########
docs/source/cpp/json.rst:
##########
@@ -66,6 +73,45 @@ A JSON file is read from a :class:`~arrow::io::InputStream`.
       }
    }
 
+StreamingReader
+===============
+
+Reads a file incrementally in fixed-size blocks, each yielding a
+:class:`~arrow::RecordBatch`. Each independent JSON object in a block
+is converted to a row in the output batch.
+
+All batches adhere to a consistent :class:`~arrow:Schema`, which is
+derived from the first loaded batch. Alternatively, an explicit schema
+may be passed via :class:`~ParseOptions`.
+
+.. code-block:: cpp
+
+   #include "arrow/json/api.h"
+
+   {
+      // ...
+      auto read_options = arrow::json::ReadOptions::Defaults();
+      auto parse_options = arrow::json::ParseOptions::Defaults();
+
+      std::shared_ptr<arrow::io::InputStream> stream;
+      auto result = arrow::json::StreamingReader::Make(stream,
+                                                       read_options,
+                                                       parse_options);
+      if (!result.ok()) {
+         // Handle instantiation error
+      }
+      std::shared_ptr<arrow::json::StreamingReader> reader = *result;
+
+      std::shared_ptr<arrow::RecordBatch> batch;
+      for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : 
*reader) {
+         if (!result.ok()) {

Review Comment:
   ```suggestion
            if (!maybe_batch.ok()) {
   ```



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_processed(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = 
UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": 
"foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> 
futures(expected.num_batches + 2);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_processed(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, 
internal::UnwrapOrRaise(std::move(results)));
+  batches.erase(std::remove(batches.begin(), batches.end(), nullptr), 
batches.end());

Review Comment:
   This would seem to accept some nulls scattered in the results, while they 
should only occur at the end, right? Can we make this a bit stricter?



##########
docs/source/cpp/json.rst:
##########
@@ -24,17 +24,24 @@
 Reading JSON files
 ==================
 
-Arrow allows reading line-separated JSON files as Arrow tables.  Each
-independent JSON object in the input file is converted to a row in
-the target Arrow table.
+Line-separated JSON files can either be read as a single Arrow Table
+with a :class:`~TableReader` or streamed as RecordBatches with a
+:class:`~StreamingReader`.
+
+Both of these readers require an :class:`arrow::io::InputStream` instance
+representing the input file. Their behavior can be customized using a
+combination of :class:`~ReadOptions`, :class:`~ParseOptions`, and
+other parameters.
 
 .. seealso::
    :ref:`JSON reader API reference <cpp-api-json>`.
 
-Basic usage
+TableReader
 ===========
 
-A JSON file is read from a :class:`~arrow::io::InputStream`.
+Reads an entire file in one shot as a :class:`~arrow::Table`. Each

Review Comment:
   ```suggestion
   :class:`~TableReader` reads an entire file in one shot as a 
:class:`~arrow::Table`. Each
   ```



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_processed(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = 
UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": 
"foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> 
futures(expected.num_batches + 2);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_processed(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, 
internal::UnwrapOrRaise(std::move(results)));
+  batches.erase(std::remove(batches.begin(), batches.end(), nullptr), 
batches.end());
+  EXPECT_EQ(batches.size(), static_cast<size_t>(expected.num_batches));
+
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));

Review Comment:
   Also validate.
   ```suggestion
     ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
     ASSERT_OK(table->ValidateFull());
   ```



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_processed(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, 
R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = 
UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": 
"foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> 
futures(expected.num_batches + 2);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_processed(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, 
internal::UnwrapOrRaise(std::move(results)));
+  batches.erase(std::remove(batches.begin(), batches.end(), nullptr), 
batches.end());
+  EXPECT_EQ(batches.size(), static_cast<size_t>(expected.num_batches));
+
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, FuturesOutliveReader) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  auto stream = MakeTestStream(expected.json, kIoLatency);
+  std::vector<Future<std::shared_ptr<RecordBatch>>> 
futures(expected.num_batches + 2);
+  {
+    ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream));
+    EXPECT_EQ(reader->bytes_processed(), 0);
+    for (auto& future : futures) {
+      future = reader->ReadNextAsync();
+    }
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  ASSERT_OK_AND_ASSIGN(auto batches, 
internal::UnwrapOrRaise(std::move(results)));
+  batches.erase(std::remove(batches.begin(), batches.end(), nullptr), 
batches.end());
+  EXPECT_EQ(batches.size(), static_cast<size_t>(expected.num_batches));
+
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));

Review Comment:
   Validate the result here as well?



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -320,5 +325,546 @@ TEST(ReaderTest, FailOnInvalidEOF) {
   }
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& 
str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), read_options_, 
parse_options_,
+                                 io_context_, executor_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... 
args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, 
MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index 
field and a
+  // struct field of random data. `block_size_multiplier` is applied to the 
largest
+  // generated row length to determine the target block_size. i.e - higher 
multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier 
= 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", 
struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * 
block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), 
"]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = 
false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = nullptr;
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  // Object straddles multiple blocks
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+  AssertSchemaEqual(reader->schema(), test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), 
*batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_processed(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 13);
+}
+
+TEST_P(StreamingReaderTest, PropagateErrorsNonLinewiseChunker) {
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"i":0}{1})",
+          R"({"i":2})",
+      },
+      "\n");
+  auto bad_middle_blocks = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({}"i":2})",
+          R"({"i": 3})",
+      },
+      "\n");
+
+  std::shared_ptr<RecordBatch> batch;
+  std::shared_ptr<StreamingReader> reader;
+  Status status;
+  read_options_.block_size = 10;
+  parse_options_.newlines_in_values = true;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_first_block));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 7);
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  AssertReadEnd(reader);
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(bad_middle_blocks));
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), 
*batch);
+  // Chunker doesn't require newline delimiters, so this should be valid
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":1}]"), 
*batch);
+
+  status = reader->ReadNext(&batch);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+  // Should fail to parse "{}\"i\""
+  ASSERT_RAISES(Invalid, status);
+  EXPECT_THAT(status.message(), ::testing::StartsWith("JSON parse error"));
+  // Incoming chunker error from ":2}" shouldn't leak through after the first 
failure,
+  // which is a possibility if async tasks are still outstanding due to 
readahead.
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_processed(), 20);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", 
utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_processed(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, 
R"([{"b":true,"s":"foo"}])");
+
+  AssertSchemaEqual(reader->schema(), expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_processed(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": 
"2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},

Review Comment:
   Let's not use the same values for each row?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to