wjones127 commented on a change in pull request #11911:
URL: https://github.com/apache/arrow/pull/11911#discussion_r770011466
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+ unique_records=None):
+ data = []
+ column_names = []
+ if unique_records is None:
+ unique_records = records_per_row
+ for i in range(num_of_columns):
+ data.append(_generate_random_int_array(size=records_per_row,
+ min=1,
+ max=unique_records))
+ column_names.append("c" + str(i))
+ return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+ file_dirs = os.listdir(base_directory)
+ number_of_files = 0
+ for _, file_dir in enumerate(file_dirs):
+ sub_dir_path = base_directory / file_dir
+ number_of_files += len(os.listdir(sub_dir_path))
+ return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+ num_of_files_generated = _get_num_of_files_generated(
+ base_directory=data_source)
+ number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
Review comment:
Why do we care about num unique rows? It's not clear to me that it is
necessary.
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+ unique_records=None):
+ data = []
+ column_names = []
+ if unique_records is None:
+ unique_records = records_per_row
+ for i in range(num_of_columns):
+ data.append(_generate_random_int_array(size=records_per_row,
+ min=1,
+ max=unique_records))
+ column_names.append("c" + str(i))
+ return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+ file_dirs = os.listdir(base_directory)
+ number_of_files = 0
+ for _, file_dir in enumerate(file_dirs):
Review comment:
nit: if you don't need the index, then you shouldn't need the
`enumerate()`
```suggestion
for file_dir in file_dirs:
```
(Also if you want to generalize this function to handle no partitioning or
deeper partitioning, consider using `os.walk()`)
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+ unique_records=None):
+ data = []
+ column_names = []
+ if unique_records is None:
+ unique_records = records_per_row
+ for i in range(num_of_columns):
+ data.append(_generate_random_int_array(size=records_per_row,
+ min=1,
+ max=unique_records))
+ column_names.append("c" + str(i))
+ return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+ file_dirs = os.listdir(base_directory)
+ number_of_files = 0
+ for _, file_dir in enumerate(file_dirs):
+ sub_dir_path = base_directory / file_dir
+ number_of_files += len(os.listdir(sub_dir_path))
+ return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+ num_of_files_generated = _get_num_of_files_generated(
+ base_directory=data_source)
+ number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+ return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
Review comment:
I think with a little refactoring, these tests could be combined into
one that uses `pytest.mark.parametrize()` to test different scenarios. What do
you think of that?
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
Review comment:
Is `records_per_row` just the number of rows here? I'm confused by this
name.
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+ unique_records=None):
+ data = []
+ column_names = []
+ if unique_records is None:
+ unique_records = records_per_row
+ for i in range(num_of_columns):
+ data.append(_generate_random_int_array(size=records_per_row,
+ min=1,
+ max=unique_records))
+ column_names.append("c" + str(i))
+ return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+ file_dirs = os.listdir(base_directory)
+ number_of_files = 0
+ for _, file_dir in enumerate(file_dirs):
+ sub_dir_path = base_directory / file_dir
+ number_of_files += len(os.listdir(sub_dir_path))
+ return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+ num_of_files_generated = _get_num_of_files_generated(
+ base_directory=data_source)
+ number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+ return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
Review comment:
The `min_rows_per_file` and `min_rows_per_group` are a little harder
harder to test for all cases though, since they have exceptions when there
aren't enough rows.
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+ unique_records=None):
+ data = []
+ column_names = []
+ if unique_records is None:
+ unique_records = records_per_row
+ for i in range(num_of_columns):
+ data.append(_generate_random_int_array(size=records_per_row,
+ min=1,
+ max=unique_records))
+ column_names.append("c" + str(i))
+ return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+ file_dirs = os.listdir(base_directory)
+ number_of_files = 0
+ for _, file_dir in enumerate(file_dirs):
+ sub_dir_path = base_directory / file_dir
+ number_of_files += len(os.listdir(sub_dir_path))
+ return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+ num_of_files_generated = _get_num_of_files_generated(
+ base_directory=data_source)
+ number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+ return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
+ directory = tempdir / 'ds'
+ max_rows_per_file = 10
+ max_rows_per_group = 10
+ num_of_columns = 2
+ records_per_row = 35
+
+ data, column_names = _generate_data_and_columns(num_of_columns,
+ records_per_row)
+
+ record_batch = pa.record_batch(data=data, names=column_names)
+
+ sub_directory = directory / 'onewrite'
+
+ ds.write_dataset(record_batch, sub_directory, format="parquet",
+ max_rows_per_file=max_rows_per_file,
+ max_rows_per_group=max_rows_per_group)
+
+ files_in_dir = os.listdir(sub_directory)
+
+ # number of partitions with max_rows and the partition with the remainder
+ expected_partitions = len(data[0]) // max_rows_per_file + 1
+ expected_row_combination = [max_rows_per_file
+ for i in range(expected_partitions - 1)] \
+ + [len(data[0]) - ((expected_partitions - 1) * max_rows_per_file)]
+
+ # test whether the expected amount of files are written
+ assert len(files_in_dir) == expected_partitions
+
+ # compute the number of rows per each file written
+ result_row_combination = []
+ for _, f_file in enumerate(files_in_dir):
+ f_path = sub_directory / str(f_file)
+ dataset = ds.dataset(f_path, format="parquet")
+ result_row_combination.append(dataset.to_table().shape[0])
+
+ # test whether the generated files have the expected number of rows
+ assert len(expected_row_combination) == len(result_row_combination)
+ assert sum(expected_row_combination) == sum(result_row_combination)
+
+
+def test_write_dataset_min_rows_per_group(tempdir):
+ directory = tempdir / 'ds'
+ min_rows_per_group = 10
+ max_rows_per_group = 20
+ num_of_columns = 2
+ records_per_row = 49
+ unique_records = 5
+
+ data, column_names = _generate_data_and_columns(num_of_columns,
+ records_per_row,
+ unique_records)
+
+ record_batch = pa.record_batch(data=data, names=column_names)
+
+ data_source = directory / "min_rows_group"
+
+ ds.write_dataset(data=record_batch, base_dir=data_source,
+ min_rows_per_group=min_rows_per_group,
+ max_rows_per_group=max_rows_per_group,
+ format="parquet")
+
+ files_in_dir = os.listdir(data_source)
+ batched_data = []
+ for _, f_file in enumerate(files_in_dir):
+ f_path = data_source / str(f_file)
+ dataset = ds.dataset(f_path, format="parquet")
+ table = dataset.to_table()
+ batches = table.to_batches()
+ for batch in batches:
+ batched_data.append(batch.num_rows)
+
+ assert batched_data[0] > min_rows_per_group and \
+ batched_data[0] <= max_rows_per_group
+ assert batched_data[1] > min_rows_per_group and \
+ batched_data[1] <= max_rows_per_group
+ assert batched_data[2] <= max_rows_per_group
+
+
+def test_write_dataset_max_rows_per_group(tempdir):
+ directory = tempdir / 'ds'
+ max_rows_per_group = 18
+ num_of_columns = 2
+ records_per_row = 30
+ unique_records = 5
+
+ data, column_names = _generate_data_and_columns(num_of_columns,
+ records_per_row,
+ unique_records)
+
+ record_batch = pa.record_batch(data=data, names=column_names)
+
+ data_source = directory / "max_rows_group"
+
+ ds.write_dataset(data=record_batch, base_dir=data_source,
+ max_rows_per_group=max_rows_per_group,
+ format="parquet")
+
+ files_in_dir = os.listdir(data_source)
+ batched_data = []
+ for _, f_file in enumerate(files_in_dir):
Review comment:
nit: don't need `_` or `enumerate()` here.
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+ unique_records=None):
+ data = []
+ column_names = []
+ if unique_records is None:
+ unique_records = records_per_row
+ for i in range(num_of_columns):
+ data.append(_generate_random_int_array(size=records_per_row,
+ min=1,
+ max=unique_records))
+ column_names.append("c" + str(i))
+ return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+ file_dirs = os.listdir(base_directory)
+ number_of_files = 0
+ for _, file_dir in enumerate(file_dirs):
+ sub_dir_path = base_directory / file_dir
+ number_of_files += len(os.listdir(sub_dir_path))
+ return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+ num_of_files_generated = _get_num_of_files_generated(
+ base_directory=data_source)
+ number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+ return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
+ directory = tempdir / 'ds'
+ max_rows_per_file = 10
+ max_rows_per_group = 10
+ num_of_columns = 2
+ records_per_row = 35
+
+ data, column_names = _generate_data_and_columns(num_of_columns,
+ records_per_row)
+
+ record_batch = pa.record_batch(data=data, names=column_names)
+
+ sub_directory = directory / 'onewrite'
+
+ ds.write_dataset(record_batch, sub_directory, format="parquet",
+ max_rows_per_file=max_rows_per_file,
+ max_rows_per_group=max_rows_per_group)
+
+ files_in_dir = os.listdir(sub_directory)
+
+ # number of partitions with max_rows and the partition with the remainder
+ expected_partitions = len(data[0]) // max_rows_per_file + 1
+ expected_row_combination = [max_rows_per_file
+ for i in range(expected_partitions - 1)] \
+ + [len(data[0]) - ((expected_partitions - 1) * max_rows_per_file)]
+
+ # test whether the expected amount of files are written
+ assert len(files_in_dir) == expected_partitions
+
+ # compute the number of rows per each file written
+ result_row_combination = []
+ for _, f_file in enumerate(files_in_dir):
+ f_path = sub_directory / str(f_file)
+ dataset = ds.dataset(f_path, format="parquet")
+ result_row_combination.append(dataset.to_table().shape[0])
+
+ # test whether the generated files have the expected number of rows
+ assert len(expected_row_combination) == len(result_row_combination)
+ assert sum(expected_row_combination) == sum(result_row_combination)
+
+
+def test_write_dataset_min_rows_per_group(tempdir):
+ directory = tempdir / 'ds'
+ min_rows_per_group = 10
+ max_rows_per_group = 20
+ num_of_columns = 2
+ records_per_row = 49
+ unique_records = 5
+
+ data, column_names = _generate_data_and_columns(num_of_columns,
+ records_per_row,
+ unique_records)
+
+ record_batch = pa.record_batch(data=data, names=column_names)
+
+ data_source = directory / "min_rows_group"
+
+ ds.write_dataset(data=record_batch, base_dir=data_source,
+ min_rows_per_group=min_rows_per_group,
+ max_rows_per_group=max_rows_per_group,
+ format="parquet")
+
+ files_in_dir = os.listdir(data_source)
+ batched_data = []
+ for _, f_file in enumerate(files_in_dir):
+ f_path = data_source / str(f_file)
+ dataset = ds.dataset(f_path, format="parquet")
+ table = dataset.to_table()
+ batches = table.to_batches()
+ for batch in batches:
+ batched_data.append(batch.num_rows)
+
+ assert batched_data[0] > min_rows_per_group and \
+ batched_data[0] <= max_rows_per_group
+ assert batched_data[1] > min_rows_per_group and \
+ batched_data[1] <= max_rows_per_group
+ assert batched_data[2] <= max_rows_per_group
+
+
+def test_write_dataset_max_rows_per_group(tempdir):
+ directory = tempdir / 'ds'
+ max_rows_per_group = 18
+ num_of_columns = 2
+ records_per_row = 30
+ unique_records = 5
+
+ data, column_names = _generate_data_and_columns(num_of_columns,
+ records_per_row,
+ unique_records)
+
+ record_batch = pa.record_batch(data=data, names=column_names)
+
+ data_source = directory / "max_rows_group"
+
+ ds.write_dataset(data=record_batch, base_dir=data_source,
+ max_rows_per_group=max_rows_per_group,
+ format="parquet")
+
+ files_in_dir = os.listdir(data_source)
+ batched_data = []
+ for _, f_file in enumerate(files_in_dir):
+ f_path = data_source / str(f_file)
+ dataset = ds.dataset(f_path, format="parquet")
+ table = dataset.to_table()
+ batches = table.to_batches()
+ for batch in batches:
+ batched_data.append(batch.num_rows)
+
+ assert batched_data == [18, 12]
+
+
+def test_write_dataset_max_open_files(tempdir):
+ # TODO: INCOMPLETE TEST CASE WIP
+ directory = tempdir / 'ds'
+ print("Directory : ", directory)
+
+ record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0],
+ ['a', 'b', 'c', 'd', 'e']],
+ names=['c1', 'c2'])
+ record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0],
+ ['a', 'b', 'c', 'd', 'e']],
+ names=['c1', 'c2'])
+ record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0],
+ ['a', 'b', 'c', 'd', 'e']],
+ names=['c1', 'c2'])
+ record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0],
+ ['a', 'b', 'c', 'd', 'e']],
+ names=['c1', 'c2'])
+
+ table = pa.Table.from_batches([record_batch_1, record_batch_2,
+ record_batch_3, record_batch_4])
+
+ partitioning = ds.partitioning(
+ pa.schema([("c2", pa.string())]), flavor="hive")
+
+ data_source_1 = directory / "default"
+
+ ds.write_dataset(data=table, base_dir=data_source_1,
+ partitioning=partitioning, format="parquet")
+
+ # CASE 1: when max_open_files=default & max_open_files >= num_of_partitions
+ # the number of unique rows must be equal to
+ # the number of files generated
Review comment:
I don't think that's true, right? We wouldn't want to create a file per
rows just because `max_open_files` is high.
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
assert not extra_file.exists()
+def _generate_random_int_array(size=4, min=1, max=10):
+ return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+ unique_records=None):
+ data = []
+ column_names = []
+ if unique_records is None:
+ unique_records = records_per_row
+ for i in range(num_of_columns):
+ data.append(_generate_random_int_array(size=records_per_row,
+ min=1,
+ max=unique_records))
+ column_names.append("c" + str(i))
+ return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+ file_dirs = os.listdir(base_directory)
+ number_of_files = 0
+ for _, file_dir in enumerate(file_dirs):
+ sub_dir_path = base_directory / file_dir
+ number_of_files += len(os.listdir(sub_dir_path))
+ return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+ num_of_files_generated = _get_num_of_files_generated(
+ base_directory=data_source)
+ number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+ return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
Review comment:
For example, you could have tests like:
```python
assert all(file.nrows <= max_rows_per_file for file in dataset_file)
assert all(group.nrows <= max_rows_per_file for file in dataset_file for
group in file.groups)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]