razvanculea commented on code in PR #35253: URL: https://github.com/apache/beam/pull/35253#discussion_r2240192243
########## sdks/python/apache_beam/io/filebasedsink.py: ########## @@ -385,13 +431,131 @@ def _rename_batch(batch): # This error is not serious, we simply log it. _LOGGER.info('Unable to delete file: %s', init_result) + @check_accessible(['file_path_prefix']) + def finalize_windowed_write( + self, init_result, writer_results, unused_pre_finalize_results, w=None): + writer_results = sorted(writer_results) + num_shards = len(writer_results) + + src_files, dst_files, delete_files, num_skipped = ( + self._check_state_for_finalize_write(writer_results, num_shards, w)) + num_skipped += len(delete_files) + FileSystems.delete(delete_files) + num_shards_to_finalize = len(src_files) + min_threads = min(num_shards_to_finalize, FileBasedSink._MAX_RENAME_THREADS) + num_threads = max(1, min_threads) + + chunk_size = FileSystems.get_chunk_size(self.file_path_prefix.get()) + source_file_batch = [ + src_files[i:i + chunk_size] + for i in range(0, len(src_files), chunk_size) + ] + destination_file_batch = [ + dst_files[i:i + chunk_size] + for i in range(0, len(dst_files), chunk_size) + ] + + if num_shards_to_finalize: + start_time = time.time() + + def _rename_batch(batch): + """_rename_batch executes batch rename operations.""" + source_files, destination_files = batch + exceptions = [] + try: + FileSystems.rename(source_files, destination_files) + return exceptions + except BeamIOError as exp: + if exp.exception_details is None: + raise + for (src, dst), exception in exp.exception_details.items(): + if exception: + _LOGGER.error( + ('Exception in _rename_batch. src: %s, ' + 'dst: %s, err: %s'), + src, + dst, + exception) + exceptions.append(exception) + else: + _LOGGER.debug('Rename successful: %s -> %s', src, dst) + return exceptions + + if w is None or isinstance(w, window.GlobalWindow): + # bounded input is handled by finalize_write + # this should not be executed + # Use a thread pool for renaming operations. + exception_batches = util.run_using_threadpool( + _rename_batch, + list(zip(source_file_batch, destination_file_batch)), + num_threads) + + all_exceptions = [ + e for exception_batch in exception_batches for e in exception_batch + ] + if all_exceptions: + raise Exception( + 'Encountered exceptions in finalize_write: %s' % all_exceptions) + + yield from dst_files + else: + # unbounded input + batch = list([src_files, dst_files]) + exception_batches = _rename_batch(batch) + + all_exceptions = [ + e for exception_batch in exception_batches for e in exception_batch + ] + if all_exceptions: + raise Exception( + 'Encountered exceptions in finalize_write: %s' % all_exceptions) + + yield from dst_files + + _LOGGER.info( + 'Renamed %d shards in %.2f seconds.', + num_shards_to_finalize, + time.time() - start_time) + else: + _LOGGER.warning( + 'No shards found to finalize. num_shards: %d, skipped: %d', + num_shards, + num_skipped) + + try: + FileSystems.delete([init_result]) + except IOError: + # This error is not serious, we simply log it. + _LOGGER.info('Unable to delete file: %s', init_result) + + @staticmethod + def _template_replace_window(shard_name_template): + match = re.search('W+', shard_name_template) + if match: + shard_name_template = shard_name_template.replace( + match.group(0), '%%(window)0%ds' % len(match.group(0))) + match = re.search('V+', shard_name_template) + if match: + shard_name_template = shard_name_template.replace( + match.group(0), '%%(window_utc)0%ds' % len(match.group(0))) + return shard_name_template + + @staticmethod + def _template_replace_uuid(shard_name_template): + match = re.search('U+', shard_name_template) + if match: + shard_name_template = shard_name_template.replace( + match.group(0), '%%(uuid)0%dd' % len(match.group(0))) + return FileBasedSink._template_replace_window(shard_name_template) + @staticmethod def _template_replace_num_shards(shard_name_template): match = re.search('N+', shard_name_template) if match: shard_name_template = shard_name_template.replace( match.group(0), '%%(num_shards)0%dd' % len(match.group(0))) - return shard_name_template + #return shard_name_template + return FileBasedSink._template_replace_uuid(shard_name_template) Review Comment: yes, I can refactor the code as above without side effects (i kept the legacy way of chaining , but it's not the most readable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org