liferoad commented on issue #35363:
URL: https://github.com/apache/beam/issues/35363#issuecomment-2994369056

   This should work:
   ```
   import apache_beam as beam
   from apache_beam.io.fileio import FileSink, default_file_naming
   from apache_beam.options.pipeline_options import PipelineOptions
   
   import gzip
   import os
   import logging
   import shutil
   
   # --- 1. Define the Custom Sink ---
   # This sink now inherits from FileSink and handles Gzip compression
   # directly to avoid potential stream-state issues with the runner's
   # built-in compression handling.
   
   class GzipFileSink(FileSink):
       """
       A custom sink to write Gzip-compressed files.
       This implementation handles the Gzip compression directly.
       """
       def __init__(self):
           self._writer = None
   
       def open(self, fh):
           """Opens a Gzip writer."""
           self._writer = gzip.GzipFile(fileobj=fh, mode='wb')
   
       def write(self, record: str):
           """Writes a record to the Gzip file, encoding it to UTF-8."""
           self._writer.write(record.encode('utf-8') + b'\n')
   
       def flush(self):
           """
           Closes the Gzip writer to finalize the stream. This is called by the
           runner at the end of a bundle, and is the correct place to ensure
           the Gzip trailer is written.
           """
           if self._writer:
               self._writer.close()
               self._writer = None
   
   
   # --- 2. Main Pipeline Logic ---
   
   def run_pipeline():
       """Sets up and runs the Beam pipeline."""
       output_dir = './output_gzip'
   
       # Clean up previous run's output directory for a clean slate
       if os.path.exists(output_dir):
           logging.info(f"Removing existing output directory: {output_dir}")
           shutil.rmtree(output_dir)
   
       # We use default pipeline options for this local run.
       options = PipelineOptions()
   
       with beam.Pipeline(options=options) as p:
           # Create some sample data to write to files.
           lines = [
               'This is the first line of our dataset.',
               'Apache Beam is a unified model for defining both batch and 
streaming data-parallel processing pipelines.',
               'This example demonstrates a custom sink inheriting from the 
correct TextSink.',
               'The data will be written to compressed files.',
               'Each line will be an entry in the output file.',
               'Let\'s add a few more lines for good measure.',
               'This helps ensure the output is not trivial.',
               'Final line of the sample data.'
           ]
   
           # The PCollection that will be written to the files.
           data_to_write = p | 'CreateData' >> beam.Create(lines)
   
           # The core of the example: using WriteToFiles with our custom sink.
           # We also provide a file_naming function to ensure the output files
           # have the correct '.txt.gz' suffix for the verification step.
           _ = data_to_write | 'WriteToGzipFiles' >> 
beam.io.fileio.WriteToFiles(
               path=output_dir,
               sink=GzipFileSink(),
               file_naming=default_file_naming(prefix='output', 
suffix='.txt.gz'))
   
       logging.info(f"Pipeline finished. Output files are in '{output_dir}'.")
       verify_output(output_dir)
   
   # --- 3. Verification Step ---
   
   def verify_output(directory):
       """
       Reads the compressed output files to verify the content.
       """
       logging.info("\n--- Verifying Output ---")
       try:
           output_files = [os.path.join(directory, f) for f in 
os.listdir(directory) if f.endswith('.txt.gz')]
           if not output_files:
               logging.error("No output files found!")
               return
   
           for filepath in output_files:
               logging.info(f"Reading content from: {filepath}")
               content_count = 0
               with gzip.open(filepath, 'rt', encoding='utf-8') as f:
                   for line in f:
                       print(f"  > {line.strip()}")
                       content_count += 1
               logging.info(f"Successfully read {content_count} lines from 
{os.path.basename(filepath)}.")
   
       except Exception as e:
           logging.error(f"An error occurred during verification: {e}")
   
   
   if __name__ == '__main__':
       logging.getLogger().setLevel(logging.INFO)
       run_pipeline()
   ```
   In general, you can use 
https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText.
 compression_type can be set or automatically set based on file_pattern
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to