vibhatha commented on a change in pull request #11911: URL: https://github.com/apache/arrow/pull/11911#discussion_r770370963
########## File path: python/pyarrow/tests/test_dataset.py ########## @@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2): assert not extra_file.exists() +def _generate_random_int_array(size=4, min=1, max=10): + return np.random.randint(min, max, size) + + +def _generate_data_and_columns(num_of_columns, records_per_row, + unique_records=None): + data = [] + column_names = [] + if unique_records is None: + unique_records = records_per_row + for i in range(num_of_columns): + data.append(_generate_random_int_array(size=records_per_row, + min=1, + max=unique_records)) + column_names.append("c" + str(i)) + return data, column_names + + +def _get_num_of_files_generated(base_directory): + file_dirs = os.listdir(base_directory) + number_of_files = 0 + for _, file_dir in enumerate(file_dirs): + sub_dir_path = base_directory / file_dir + number_of_files += len(os.listdir(sub_dir_path)) + return number_of_files + + +def _get_compare_pair(data_source, record_batch): + num_of_files_generated = _get_num_of_files_generated( + base_directory=data_source) + number_of_unique_rows = len(pa.compute.unique(record_batch[0])) + return num_of_files_generated, number_of_unique_rows + + +def test_write_dataset_max_rows_per_file(tempdir): + directory = tempdir / 'ds' + max_rows_per_file = 10 + max_rows_per_group = 10 + num_of_columns = 2 + records_per_row = 35 + + data, column_names = _generate_data_and_columns(num_of_columns, + records_per_row) + + record_batch = pa.record_batch(data=data, names=column_names) + + sub_directory = directory / 'onewrite' + + ds.write_dataset(record_batch, sub_directory, format="parquet", + max_rows_per_file=max_rows_per_file, + max_rows_per_group=max_rows_per_group) + + files_in_dir = os.listdir(sub_directory) + + # number of partitions with max_rows and the partition with the remainder + expected_partitions = len(data[0]) // max_rows_per_file + 1 + expected_row_combination = [max_rows_per_file + for i in range(expected_partitions - 1)] \ + + [len(data[0]) - ((expected_partitions - 1) * max_rows_per_file)] + + # test whether the expected amount of files are written + assert len(files_in_dir) == expected_partitions + + # compute the number of rows per each file written + result_row_combination = [] + for _, f_file in enumerate(files_in_dir): + f_path = sub_directory / str(f_file) + dataset = ds.dataset(f_path, format="parquet") + result_row_combination.append(dataset.to_table().shape[0]) + + # test whether the generated files have the expected number of rows + assert len(expected_row_combination) == len(result_row_combination) + assert sum(expected_row_combination) == sum(result_row_combination) + + +def test_write_dataset_min_rows_per_group(tempdir): + directory = tempdir / 'ds' + min_rows_per_group = 10 + max_rows_per_group = 20 + num_of_columns = 2 + records_per_row = 49 + unique_records = 5 Review comment: Great !, that's neat. Btw, @westonpace shouldn't we provide the async write mode to Python users like we have in the C++? When writing the test cases, I thought how to map the cases 1:1, but later felt like the code should be more Pythonic than what C++ can offer. Is this something we should work on (with existing Process/Thread interfaces in Python)? This is not related to this issue, just curious. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org