pan-x-c commented on code in PR #45084:
URL: https://github.com/apache/arrow/pull/45084#discussion_r1893544865
##########
python/pyarrow/tests/test_json.py:
##########
@@ -339,6 +334,300 @@ def test_stress_block_sizes(self):
assert table.to_pydict() == expected.to_pydict()
+class BaseTestJSONRead(BaseTestJSON):
+
+ def read_bytes(self, b, **kwargs):
+ return self.read_json(pa.py_buffer(b), **kwargs)
+
+ def test_file_object(self):
+ data = b'{"a": 1, "b": 2}\n'
+ expected_data = {'a': [1], 'b': [2]}
+ bio = io.BytesIO(data)
+ table = self.read_json(bio)
+ assert table.to_pydict() == expected_data
+ # Text files not allowed
+ sio = io.StringIO(data.decode())
+ with pytest.raises(TypeError):
+ self.read_json(sio)
+
+ def test_reconcile_accross_blocks(self):
+ # ARROW-12065: reconciling inferred types across blocks
+ first_row = b'{ }\n'
+ read_options = ReadOptions(block_size=len(first_row))
+ for next_rows, expected_pylist in [
+ (b'{"a": 0}', [None, 0]),
+ (b'{"a": []}', [None, []]),
+ (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]),
+ (b'{"a": {}}', [None, {}]),
+ (b'{"a": {}}\n{"a": {"b": {"c": 1}}}',
+ [None, {"b": None}, {"b": {"c": 1}}]),
+ ]:
+ table = self.read_bytes(first_row + next_rows,
+ read_options=read_options)
+ expected = {"a": expected_pylist}
+ assert table.to_pydict() == expected
+ # Check that the issue was exercised
+ assert table.column("a").num_chunks > 1
+
+
+class BaseTestStreamingJSONRead(BaseTestJSON):
+ def open_json(self, json, *args, **kwargs):
+ """
+ Reads the JSON file into memory using pyarrow's open_json
+ json The JSON bytes
+ args Positional arguments to be forwarded to pyarrow's open_json
+ kwargs Keyword arguments to be forwarded to pyarrow's open_json
+ """
+ read_options = kwargs.setdefault('read_options', ReadOptions())
+ read_options.use_threads = self.use_threads
+ return open_json(json, *args, **kwargs)
+
+ def open_bytes(self, b, **kwargs):
+ return self.open_json(pa.py_buffer(b), **kwargs)
+
+ def check_reader(self, reader, expected_schema, expected_data):
+ assert reader.schema == expected_schema
+ batches = list(reader)
+ assert len(batches) == len(expected_data)
+ for batch, expected_batch in zip(batches, expected_data):
+ batch.validate(full=True)
+ assert batch.schema == expected_schema
+ assert batch.to_pydict() == expected_batch
+
+ def read_bytes(self, b, **kwargs):
+ return self.open_bytes(b, **kwargs).read_all()
+
+ def test_file_object(self):
+ data = b'{"a": 1, "b": 2}\n'
+ expected_data = {'a': [1], 'b': [2]}
+ bio = io.BytesIO(data)
+ reader = self.open_json(bio)
+ expected_schema = pa.schema([('a', pa.int64()),
+ ('b', pa.int64())])
+ self.check_reader(reader, expected_schema, [expected_data])
+
+ def test_bad_first_chunk(self):
+ bad_first_chunk = b'{"i": 0 }\n{"i": 1}'
+ read_options = ReadOptions()
+ read_options.block_size = 3
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="straddling object straddles two block boundaries*"
+ ):
+ self.open_bytes(bad_first_chunk, read_options=read_options)
+
+ def test_bad_middle_chunk(self):
+ bad_middle_chunk = b'{"i": 0}\n{"i": 1}\n{"i": 2}'
+ read_options = ReadOptions()
+ read_options.block_size = 10
+ expected_schema = pa.schema([('i', pa.int64())])
+
+ reader = self.open_bytes(bad_middle_chunk, read_options=read_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'i': [0]
+ }
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="straddling object straddles two block boundaries*"
+ ):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_bad_first_parse(self):
+ bad_first_block = b'{"n": }\n{"n": 10000}'
+ read_options = ReadOptions()
+ read_options.block_size = 16
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: Invalid value.*"):
+ self.open_bytes(bad_first_block, read_options=read_options)
+
+ def test_bad_middle_parse_after_empty(self):
+ bad_first_block = b'{ }{"n": }\n{"n": 10000}'
+ read_options = ReadOptions()
+ read_options.block_size = 16
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: Invalid value.*"):
+ self.open_bytes(bad_first_block, read_options=read_options)
+
+ def test_bad_middle_parse(self):
+ bad_middle_chunk = b'{"n": 1000}\n{"n": 200 00}\n{"n": 3000}'
+ read_options = ReadOptions()
+ read_options.block_size = 10
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(bad_middle_chunk, read_options=read_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [1000]
+ }
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="JSON parse error:\
+ Missing a comma or '}' after an object member*"
+ ):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_non_linewise_chunker_first_block(self):
+ bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}'
+ read_options = ReadOptions(block_size=10)
+ parse_options = ParseOptions(newlines_in_values=True)
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(
+ bad_middle_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [0]
+ }
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error *"):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_non_linewise_chunker_bad_first_block(self):
+ bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}'
+ read_options = ReadOptions(block_size=10)
+ parse_options = ParseOptions(newlines_in_values=True)
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(
+ bad_middle_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [0]
+ }
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error *"):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_non_linewise_chunker_bad_middle_block(self):
+ bad_middle_chunk = b'{"n": 0}\n{"n": 1}\n{}"n":2}\n{"n": 3}'
+ read_options = ReadOptions(block_size=10)
+ parse_options = ParseOptions(newlines_in_values=True)
+ expected_schema = pa.schema([('n', pa.int64())])
+
+ reader = self.open_bytes(
+ bad_middle_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [0]
+ }
+ assert reader.read_next_batch().to_pydict() == {
+ 'n': [1]
+ }
+
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error *"):
+ reader.read_next_batch()
+
+ with pytest.raises(StopIteration):
+ reader.read_next_batch()
+
+ def test_ignore_leading_empty_blocks(self):
+ leading_empty_chunk = b' \n{"b": true, "s": "foo"}'
+ explicit_schema = pa.schema([
+ ('b', pa.bool_()),
+ ('s', pa.utf8())
+ ])
+ read_options = ReadOptions(block_size=24)
+ parse_options = ParseOptions(explicit_schema=explicit_schema)
+ expected_data = {
+ 'b': [True], 's': ["foo"]
+ }
+
+ reader = self.open_bytes(
+ leading_empty_chunk,
+ read_options=read_options,
+ parse_options=parse_options)
+ self.check_reader(reader, explicit_schema, [expected_data])
+
+ def test_inference(self):
+ rows = b'{"a": 0, "b": "foo" }\n\
+ {"a": 1, "c": true }\n{"a": 2, "d": 4.0}'
+ expected_schema = pa.schema([
+ ('a', pa.int64()),
+ ('b', pa.utf8())
+ ])
+ expected_data = {'a': [0], 'b': ["foo"]}
+
+ read_options = ReadOptions(block_size=32)
+ parse_options = ParseOptions(unexpected_field_behavior="infer")
+ reader = self.open_bytes(
+ rows,
+ read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == expected_data
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: unexpected field"):
+ reader.read_next_batch()
+
+ expected_schema = pa.schema([
+ ('a', pa.int64()),
+ ('b', pa.utf8()),
+ ('c', pa.bool_()),
+ ])
+ expected_data = {'a': [0, 1], 'b': ["foo", None], 'c': [None, True]}
+ read_options = ReadOptions(block_size=64)
+ reader = self.open_bytes(rows, read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == expected_data
+ with pytest.raises(pa.ArrowInvalid,
+ match="JSON parse error: unexpected field"):
+ reader.read_next_batch()
+
+ expected_schema = pa.schema([
+ ('a', pa.int64()),
+ ('b', pa.utf8()),
+ ('c', pa.bool_()),
+ ('d', pa.float64()),
+ ])
+ expected_data = {'a': [0, 1, 2], 'b': ["foo", None, None],
+ 'c': [None, True, None], 'd': [None, None, 4.0]}
+ read_options = ReadOptions(block_size=96)
+ reader = self.open_bytes(rows, read_options=read_options,
+ parse_options=parse_options)
+ assert reader.schema == expected_schema
+ assert reader.read_next_batch().to_pydict() == expected_data
+
+ def test_reconcile_across_blocks(self):
Review Comment:
The original test case in `BaseTestJSON.test_reconcile_across_blocks` is not
compatible with the JSON stream reader.
I made some changes, but I don't know whether this change is reasonable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]