[ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92980&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92980
 ]

ASF GitHub Bot logged work on BEAM-4062:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/18 00:39
            Start Date: 20/Apr/18 00:39
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #5158: [BEAM-4062] Fix 
performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/filebasedsink.py 
b/sdks/python/apache_beam/io/filebasedsink.py
index ab3ab5fd5b3..126eb868abf 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -98,6 +98,8 @@ def __init__(self,
     self.num_shards = num_shards
     self.coder = coder
     self.shard_name_format = self._template_to_format(shard_name_template)
+    self.shard_name_glob_format = self._template_to_glob_format(
+        shard_name_template)
     self.compression_type = compression_type
     self.mime_type = mime_type
 
@@ -188,36 +190,58 @@ def _get_final_name(self, shard_num, num_shards):
         self.file_name_suffix.get()
     ])
 
-  def pre_finalize(self, init_result, writer_results):
-    writer_results = sorted(writer_results)
-    num_shards = len(writer_results)
-    existing_files = []
-    for shard_num in range(len(writer_results)):
-      final_name = self._get_final_name(shard_num, num_shards)
-      if FileSystems.exists(final_name):
-        existing_files.append(final_name)
-    if existing_files:
-      logging.info('Deleting existing files in target path: %d',
-                   len(existing_files))
-      FileSystems.delete(existing_files)
+  @check_accessible(['file_path_prefix', 'file_name_suffix'])
+  def _get_final_name_glob(self, num_shards):
+    return ''.join([
+        self.file_path_prefix.get(),
+        self.shard_name_glob_format % dict(num_shards=num_shards),
+        self.file_name_suffix.get()
+    ])
 
-  @check_accessible(['file_path_prefix'])
-  def finalize_write(self, init_result, writer_results,
-                     unused_pre_finalize_results):
-    writer_results = sorted(writer_results)
-    num_shards = len(writer_results)
+  def pre_finalize(self, init_result, writer_results):
+    num_shards = len(list(writer_results))
+    dst_glob = self._get_final_name_glob(num_shards)
+    dst_glob_files = [file_metadata.path
+                      for mr in FileSystems.match([dst_glob])
+                      for file_metadata in mr.metadata_list]
+
+    if dst_glob_files:
+      logging.warn('Deleting %d existing files in target path matching: %s',
+                   len(dst_glob_files), self.shard_name_glob_format)
+      FileSystems.delete(dst_glob_files)
+
+  def _check_state_for_finalize_write(self, writer_results, num_shards):
+    """Checks writer output files' states.
+
+    Returns:
+      src_files, dst_files: Lists of files to rename. For each i, 
finalize_write
+        should rename(src_files[i], dst_files[i]).
+      delete_files: Src files to delete. These could be leftovers from an
+        incomplete (non-atomic) rename operation.
+      num_skipped: Tally of writer results files already renamed, such as from
+        a previous run of finalize_write().
+    """
+    if not writer_results:
+      return [], [], [], 0
+
+    src_glob = FileSystems.join(FileSystems.split(writer_results[0])[0], '*')
+    dst_glob = self._get_final_name_glob(num_shards)
+    src_glob_files = set(file_metadata.path
+                         for mr in FileSystems.match([src_glob])
+                         for file_metadata in mr.metadata_list)
+    dst_glob_files = set(file_metadata.path
+                         for mr in FileSystems.match([dst_glob])
+                         for file_metadata in mr.metadata_list)
 
     src_files = []
     dst_files = []
     delete_files = []
-    chunk_size = FileSystems.get_chunk_size(self.file_path_prefix.get())
     num_skipped = 0
-    for shard_num, shard in enumerate(writer_results):
+    for shard_num, src in enumerate(writer_results):
       final_name = self._get_final_name(shard_num, num_shards)
-      src = shard
       dst = final_name
-      src_exists = FileSystems.exists(src)
-      dst_exists = FileSystems.exists(dst)
+      src_exists = src in src_glob_files
+      dst_exists = dst in dst_glob_files
       if not src_exists and not dst_exists:
         raise BeamIOError('src and dst files do not exist. src: %s, dst: %s' % 
(
             src, dst))
@@ -233,13 +257,23 @@ def finalize_write(self, init_result, writer_results,
 
       src_files.append(src)
       dst_files.append(dst)
+    return src_files, dst_files, delete_files, num_skipped
+
+  @check_accessible(['file_path_prefix'])
+  def finalize_write(self, init_result, writer_results,
+                     unused_pre_finalize_results):
+    writer_results = sorted(writer_results)
+    num_shards = len(writer_results)
 
-    num_skipped = len(delete_files)
+    src_files, dst_files, delete_files, num_skipped = (
+        self._check_state_for_finalize_write(writer_results, num_shards))
+    num_skipped += len(delete_files)
     FileSystems.delete(delete_files)
     num_shards_to_finalize = len(src_files)
     min_threads = min(num_shards_to_finalize, 
FileBasedSink._MAX_RENAME_THREADS)
     num_threads = max(1, min_threads)
 
+    chunk_size = FileSystems.get_chunk_size(self.file_path_prefix.get())
     source_file_batch = [src_files[i:i + chunk_size]
                          for i in range(0, len(src_files), chunk_size)]
     destination_file_batch = [dst_files[i:i + chunk_size]
@@ -299,21 +333,38 @@ def _rename_batch(batch):
       # May have already been removed.
       pass
 
+  @staticmethod
+  def _template_replace_num_shards(shard_name_template):
+    match = re.search('N+', shard_name_template)
+    if match:
+      shard_name_template = shard_name_template.replace(
+          match.group(0), '%%(num_shards)0%dd' % len(match.group(0)))
+    return shard_name_template
+
   @staticmethod
   def _template_to_format(shard_name_template):
     if not shard_name_template:
       return ''
-    m = re.search('S+', shard_name_template)
-    if m is None:
-      raise ValueError("Shard number pattern S+ not found in template '%s'" %
-                       shard_name_template)
+    match = re.search('S+', shard_name_template)
+    if match is None:
+      raise ValueError(
+          "Shard number pattern S+ not found in shard_name_template: %s" %
+          shard_name_template)
     shard_name_format = shard_name_template.replace(
-        m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
-    m = re.search('N+', shard_name_format)
-    if m:
-      shard_name_format = shard_name_format.replace(
-          m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
-    return shard_name_format
+        match.group(0), '%%(shard_num)0%dd' % len(match.group(0)))
+    return FileBasedSink._template_replace_num_shards(shard_name_format)
+
+  @staticmethod
+  def _template_to_glob_format(shard_name_template):
+    if not shard_name_template:
+      return ''
+    match = re.search('S+', shard_name_template)
+    if match is None:
+      raise ValueError(
+          "Shard number pattern S+ not found in shard_name_template: %s" %
+          shard_name_template)
+    shard_name_format = shard_name_template.replace(match.group(0), '*')
+    return FileBasedSink._template_replace_num_shards(shard_name_format)
 
   def __eq__(self, other):
     # TODO: Clean up workitem_test which uses this.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 92980)
    Time Spent: 2.5h  (was: 2h 20m)

> Performance regression in FileBasedSink
> ---------------------------------------
>
>                 Key: BEAM-4062
>                 URL: https://issues.apache.org/jira/browse/BEAM-4062
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Blocker
>             Fix For: 2.5.0
>
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to