liferoad commented on issue #35363: URL: https://github.com/apache/beam/issues/35363#issuecomment-2994369056
This should work: ``` import apache_beam as beam from apache_beam.io.fileio import FileSink, default_file_naming from apache_beam.options.pipeline_options import PipelineOptions import gzip import os import logging import shutil # --- 1. Define the Custom Sink --- # This sink now inherits from FileSink and handles Gzip compression # directly to avoid potential stream-state issues with the runner's # built-in compression handling. class GzipFileSink(FileSink): """ A custom sink to write Gzip-compressed files. This implementation handles the Gzip compression directly. """ def __init__(self): self._writer = None def open(self, fh): """Opens a Gzip writer.""" self._writer = gzip.GzipFile(fileobj=fh, mode='wb') def write(self, record: str): """Writes a record to the Gzip file, encoding it to UTF-8.""" self._writer.write(record.encode('utf-8') + b'\n') def flush(self): """ Closes the Gzip writer to finalize the stream. This is called by the runner at the end of a bundle, and is the correct place to ensure the Gzip trailer is written. """ if self._writer: self._writer.close() self._writer = None # --- 2. Main Pipeline Logic --- def run_pipeline(): """Sets up and runs the Beam pipeline.""" output_dir = './output_gzip' # Clean up previous run's output directory for a clean slate if os.path.exists(output_dir): logging.info(f"Removing existing output directory: {output_dir}") shutil.rmtree(output_dir) # We use default pipeline options for this local run. options = PipelineOptions() with beam.Pipeline(options=options) as p: # Create some sample data to write to files. lines = [ 'This is the first line of our dataset.', 'Apache Beam is a unified model for defining both batch and streaming data-parallel processing pipelines.', 'This example demonstrates a custom sink inheriting from the correct TextSink.', 'The data will be written to compressed files.', 'Each line will be an entry in the output file.', 'Let\'s add a few more lines for good measure.', 'This helps ensure the output is not trivial.', 'Final line of the sample data.' ] # The PCollection that will be written to the files. data_to_write = p | 'CreateData' >> beam.Create(lines) # The core of the example: using WriteToFiles with our custom sink. # We also provide a file_naming function to ensure the output files # have the correct '.txt.gz' suffix for the verification step. _ = data_to_write | 'WriteToGzipFiles' >> beam.io.fileio.WriteToFiles( path=output_dir, sink=GzipFileSink(), file_naming=default_file_naming(prefix='output', suffix='.txt.gz')) logging.info(f"Pipeline finished. Output files are in '{output_dir}'.") verify_output(output_dir) # --- 3. Verification Step --- def verify_output(directory): """ Reads the compressed output files to verify the content. """ logging.info("\n--- Verifying Output ---") try: output_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.txt.gz')] if not output_files: logging.error("No output files found!") return for filepath in output_files: logging.info(f"Reading content from: {filepath}") content_count = 0 with gzip.open(filepath, 'rt', encoding='utf-8') as f: for line in f: print(f" > {line.strip()}") content_count += 1 logging.info(f"Successfully read {content_count} lines from {os.path.basename(filepath)}.") except Exception as e: logging.error(f"An error occurred during verification: {e}") if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run_pipeline() ``` In general, you can use https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText. compression_type can be set or automatically set based on file_pattern -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org