[ 
https://issues.apache.org/jira/browse/BEAM-2857?focusedWorklogId=238803&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-238803
 ]

ASF GitHub Bot logged work on BEAM-2857:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/May/19 21:08
            Start Date: 07/May/19 21:08
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #8394: [BEAM-2857] 
Implementing WriteToFiles transform for fileio (Python)
URL: https://github.com/apache/beam/pull/8394#discussion_r281830351
 
 

 ##########
 File path: sdks/python/apache_beam/io/fileio.py
 ##########
 @@ -169,3 +179,476 @@ def __init__(self, compression=None, 
skip_directories=True):
   def expand(self, pcoll):
     return pcoll | beam.ParDo(_ReadMatchesFn(self._compression,
                                              self._skip_directories))
+
+
+class FileSink(object):
+  """Specifies how to write elements to individual files in ``WriteToFiles``.
+
+  **NOTE: THIS CLASS IS EXPERIMENTAL.**
+
+  A Sink class must implement the following:
+
+   - The ``open`` method, which initializes writing to a file handler (it is 
not
+     responsible for opening the file handler itself).
+   - The ``write`` method, which writes an element to the file that was passed
+     in ``open``.
+   - The ``flush`` method, which flushes any buffered state. This is most often
+     called before closing a file (but not exclusively called in that
+     situation). The sink is not responsible for closing the file handler.
+   """
+
+  def open(self, fh):
+    raise NotImplementedError
+
+  def write(self, record):
+    raise NotImplementedError
+
+  def flush(self):
+    raise NotImplementedError
+
+
+class DefaultSink(FileSink):
+  """A sink that writes elements into file handlers as they come.
+
+  **NOTE: THIS CLASS IS EXPERIMENTAL.**
+
+  This sink simply calls file_handler.write(record) on all records that come
+  into it.
+  """
+
+  def open(self, fh):
+    self._fh = fh
+
+  def write(self, record):
+    self._fh.write(record)
+
+  def flush(self):
+    self._fh.flush()
+
+
+def prefix_naming(prefix):
+  return default_file_naming(prefix)
+
+
+_DEFAULT_FILE_NAME_TEMPLATE = (
+    '{prefix}-{start}-{end}-{pane}-'
+    '{shard:05d}-{total_shards:05d}'
+    '{suffix}{compression}')
+
+
+def destination_prefix_naming():
+
+  def _inner(window, pane, shard_index, total_shards, compression, 
destination):
+    args = {'prefix': str(destination),
+            'start': '',
+            'end': '',
+            'pane': '',
+            'shard': 0,
+            'total_shards': 0,
+            'suffix': '',
+            'compression': ''}
+    if total_shards is not None and shard_index is not None:
+      args['shard'] = int(shard_index)
+      args['total_shards'] = int(total_shards)
+
+    if window != GlobalWindow():
+      args['start'] = window.start.to_utc_datetime().isoformat()
+      args['end'] = window.end.to_utc_datetime().isoformat()
+
+    # If the PANE is the ONLY firing in the window, we don't add it.
+    if pane and not (pane.is_first and pane.is_last):
+      args['pane'] = pane.index
+
+    if compression:
+      args['compression'] = '.%s' % compression
+
+    return _DEFAULT_FILE_NAME_TEMPLATE.format(**args)
+
+  return _inner
+
+
+def default_file_naming(prefix, suffix=None):
+
+  def _inner(window, pane, shard_index, total_shards, compression, 
destination):
+    args = {'prefix': prefix,
+            'start': '',
+            'end': '',
+            'pane': '',
+            'shard': 0,
+            'total_shards': 0,
+            'suffix': '',
+            'compression': ''}
+    if total_shards is not None and shard_index is not None:
+      args['shard'] = int(shard_index)
+      args['total_shards'] = int(total_shards)
+
+    if window != GlobalWindow():
+      args['start'] = window.start.to_utc_datetime().isoformat()
+      args['end'] = window.end.to_utc_datetime().isoformat()
+
+    # If the PANE is the ONLY firing in the window, we don't add it.
+    if pane and not (pane.is_first and pane.is_last):
+      args['pane'] = pane.index
+
+    if compression:
+      args['compression'] = '.%s' % compression
+    if suffix:
+      args['suffix'] = suffix
+
+    return _DEFAULT_FILE_NAME_TEMPLATE.format(**args)
+
+  return _inner
+
+
+class FileResult(object):
+  """A descriptor of a file that has been written."""
+
+  def __init__(self,
+               file_name,
+               shard_index,
+               total_shards,
+               window,
+               pane,
+               destination):
+    self.file_name = file_name
+    self.shard_index = int(shard_index)
+    self.total_shards = int(total_shards)
+    self.window = window
+    self.pane = pane
+    self.destination = destination
+
+  def _tuple(self):
+    return (self.file_name,
+            self.shard_index,
+            self.total_shards,
+            self.window,
+            self.pane,
+            self.destination)
+
+  def __hash__(self):
+    return hash(self._tuple())
+
+  def __eq__(self, other):
+    return isinstance(other, FileResult) and self._tuple() == other._tuple()
+
+  def __str__(self):
+    return '_FileResult(%s)' % self.__dict__
+
+  def __repr__(self):
+    return '<%s at 0x%x>' % (self.__str__(), id(self))
+
+
+@experimental()
+class WriteToFiles(beam.PTransform):
+  """Write the incoming PCollection to a set of output files.
+
+  The incoming ``PCollection`` may be bounded or unbounded.
+
+  **Note:** For unbounded ``PCollection``s, this transform does not support
+  multiple firings per Window (due to the fact that files are named only by
+  their destination, and window, at the moment).
+  """
+
+  # We allow up to 20 different destinations to be written in a single bundle.
+  # Too many files will add memory pressure to the worker, so we let it be 20.
+  MAX_NUM_WRITERS_PER_BUNDLE = 20
+
+  DEFAULT_SHARDING = 5
+
+  def __init__(self,
+               path,
+               file_naming=None,
+               destination=None,
+               temp_directory=None,
+               sink=None,
+               shards=None,
+               output_fn=None,
+               max_writers_per_bundle=MAX_NUM_WRITERS_PER_BUNDLE):
+    """Initializes a WriteToFiles transform.
+
+    Args:
+      path (str, ValueProvider): The directory to write files into.
+      file_naming (callable): A callable that takes in a window, pane,
+        shard_index, total_shards and compression; and returns a file name.
+      destination (callable): If this argument is provided, the sink parameter
+        must also be a callable.
+      temp_directory (str, ValueProvider): To ensure atomicity in the 
transform,
+        the output is written into temporary files, which are written to a
+        directory that is meant to be temporary as well. Once the whole output
+        has been written, the files are moved into their final destination, and
+        given their final names. By default, the temporary directory will be
+         within the temp_location of your pipeline.
+      sink (callable, FileSink): The sink to use to write into a file. It 
should
+        implement the methods of a ``FileSink``. If none is provided, a
+        ``DefaultSink`` is used.
+      shards (int): The number of shards per destination and trigger firing.
+      max_writers_per_bundle (int): The number of writers that can be open
+        concurrently in a single worker that's processing one bundle.
+    """
+    self.path = (
+        path if isinstance(path, ValueProvider) else StaticValueProvider(str,
+                                                                         path))
+    self.file_naming_fn = file_naming or default_file_naming('output')
+    self.destination_fn = self._get_destination_fn(destination)
+    self._temp_directory = temp_directory
+    self.sink_fn = self._get_sink_fn(sink)
+    self.shards = shards or WriteToFiles.DEFAULT_SHARDING
+    self.output_fn = output_fn or (lambda x: x)
+
+    self._max_num_writers_per_bundle = max_writers_per_bundle
+
+  @staticmethod
+  def _get_sink_fn(input_sink):
+    if isinstance(input_sink, FileSink):
+      return lambda x: input_sink
+    elif callable(input_sink):
+      return input_sink
+    else:
+      return lambda x: DefaultSink()
+
+  @staticmethod
+  def _get_destination_fn(destination):
+    if isinstance(destination, ValueProvider):
+      return lambda elm: destination.get()
+    elif callable(destination):
+      return destination
+    else:
+      return lambda elm: destination
+
+  def expand(self, pcoll):
+    p = pcoll.pipeline
+
+    if not self._temp_directory:
+      temp_location = (
+          p.options.view_as(GoogleCloudOptions).temp_location
+          or self.path.get())
+      dir_uid = str(uuid.uuid4())
+      self._temp_directory = StaticValueProvider(
+          str,
+          filesystems.FileSystems.join(temp_location,
+                                       '.temp%s' % dir_uid))
+      logging.info('Added temporary directory %s', self._temp_directory.get())
 
 Review comment:
   No. The transform may be run in streaming, so I don't think there's a good 
moment in the pipeline execution to perform a cleanup of the temp directory. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 238803)
    Time Spent: 2h 40m  (was: 2.5h)

> Create FileIO in Python
> -----------------------
>
>                 Key: BEAM-2857
>                 URL: https://issues.apache.org/jira/browse/BEAM-2857
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Eugene Kirpichov
>            Assignee: Pablo Estrada
>            Priority: Major
>              Labels: gsoc, gsoc2019, mentor
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), 
> which together cover the majority of needs for general-purpose file 
> ingestion. Beam Python should have something similar.
> An early design document for this: https://s.apache.org/fileio-beam-python



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to