rdblue commented on code in PR #6775: URL: https://github.com/apache/iceberg/pull/6775#discussion_r1182748972
########## python/pyiceberg/io/pyarrow.py: ########## @@ -498,6 +503,96 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +@lru_cache +def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat: + if file_format == FileFormat.PARQUET: + return ds.ParquetFileFormat(**kwargs) + else: + raise ValueError(f"Unsupported file format: {file_format}") + + +def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment: + _, path = PyArrowFileIO.parse_location(data_file.file_path) + return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) + + +def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: + delete_fragment = _construct_fragment( + fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE} + ) + table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() + table.unify_dictionaries() + return { + file.as_py(): table.filter(pc.field("file_path") == file).column("pos") + for file in table.column("file_path").chunks[0].dictionary + } + + +class _OrderedChunkedArrayConsumer: + """ + A wrapper to consume multiple individually ordered chunked-arrays + simultaneously as an ordered iterator + """ + + arrays: Tuple[pa.ChunkedArray, ...] + arrays_len: Tuple[int, ...] + arrays_pos: List[int] + + def _reset(self) -> None: + self.arrays_pos = [0] * len(self.arrays) + self.arrays_len = tuple(len(array) for array in self.arrays) + + def __init__(self, *arrays: pa.ChunkedArray) -> None: + self.arrays = arrays + self._reset() + + def __iter__(self) -> _OrderedChunkedArrayConsumer: + self._reset() + return self + + def __next__(self) -> int: Review Comment: This seems like a very expensive way to iterate through a list of arrays, since you need to consider each array every time you look for a new value. I would expect the iterator's state to keep the index of the current array, which is updated when it finishes consuming an array. Then it would also keep a position within the current array. That's much easier because you just need to keep track of two offsets, rather than `len(arrays)*2` offsets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org