[ 
https://issues.apache.org/jira/browse/BEAM-2810?focusedWorklogId=116689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116689
 ]

ASF GitHub Bot logged work on BEAM-2810:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jun/18 00:26
            Start Date: 28/Jun/18 00:26
    Worklog Time Spent: 10m 
      Work Description: ryan-williams commented on a change in pull request 
#5496: [BEAM-2810] use fastavro in Avro IO
URL: https://github.com/apache/beam/pull/5496#discussion_r198678978
 
 

 ##########
 File path: sdks/python/apache_beam/io/avroio.py
 ##########
 @@ -377,6 +407,56 @@ def split_points_unclaimed(stop_position):
           yield record
 
 
+class _FastAvroSource(filebasedsource.FileBasedSource):
+  """A source for reading Avro files using the `fastavro` library.
+
+  ``_FastAvroSource`` is implemented using the file-based source framework
+  available in module 'filebasedsource'. Hence please refer to module
+  'filebasedsource' to fully understand how this source implements operations
+  common to all file-based sources such as file-pattern expansion and splitting
+  into bundles for parallel processing.
+
+  TODO: remove ``_AvroSource`` in favor of using ``_FastAvroSource``
+  everywhere once it has been more widely tested
+  """
+
+  def read_records(self, file_name, range_tracker):
+    next_block_start = -1
+
+    def split_points_unclaimed(stop_position):
+      if next_block_start >= stop_position:
+        # Next block starts at or after the suggested stop position. Hence
+        # there will not be split points to be claimed for the range ending at
+        # suggested stop position.
+        return 0
+
+      return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN
+
+    range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
+
+    start_offset = range_tracker.start_position()
+    if start_offset is None:
+      start_offset = 0
+
+    with self.open_file(file_name) as f:
+      blocks = block_reader(f)
+      sync_marker = blocks._header['sync']
+
+      # We have to start at current position if previous bundle ended at the
+      # end of a sync marker.
+      start_offset = max(0, start_offset - len(sync_marker))
+      f.seek(start_offset)
+      _AvroUtils.advance_file_past_next_sync_marker(f, sync_marker)
+
+      next_block_start = f.tell()
+
+      while range_tracker.try_claim(f.tell()):
 
 Review comment:
   I think there was a bug here that I've just pushed [a 
fix](https://github.com/apache/beam/pull/5496/commits/80bca2a0b0452a9c724e000eaec98d7583eb471c)
 to, though I can't tell if it is what you were asking about.
   
   The bug was that after [`block = next(blocks)` 
below](https://github.com/apache/beam/pull/5496/files/d11c87396b7c21bf75f75eb722a19bafc8ae221e#diff-04fef9e0550df0b0c4e1cd0264406eb5R454),
 the whole block had been read into memory, and so `f.tell()` was already 
advanced by the size of the block, so [the `+ block.size` below was 
wrong](https://github.com/apache/beam/pull/5496/files/d11c87396b7c21bf75f75eb722a19bafc8ae221e#diff-04fef9e0550df0b0c4e1cd0264406eb5R455),
 and `next_block_start` was ending up at higher values than it should have.
   
   However, afaict that was relatively inconsequential; `next_block_start` is 
only used to decide whether to [return 
`0`](https://github.com/apache/beam/pull/5496/files/d11c87396b7c21bf75f75eb722a19bafc8ae221e#diff-04fef9e0550df0b0c4e1cd0264406eb5R431)
 or 
[`SPLIT_POINTS_UNKNOWN`](https://github.com/apache/beam/pull/5496/files/d11c87396b7c21bf75f75eb722a19bafc8ae221e#diff-04fef9e0550df0b0c4e1cd0264406eb5R433)
 to [`range_tracker`'s `set_split_points_unclaimed_callback` 
above](https://github.com/apache/beam/pull/5496/files/d11c87396b7c21bf75f75eb722a19bafc8ae221e#diff-04fef9e0550df0b0c4e1cd0264406eb5R435);
 if it returned `0` too early, would that result in correctness issues, or just 
a failure to schedule more workers at subsequent points in the file?
   
   It seems like you're concerned about [the `blocks = block_reader(f)` line 
above](https://github.com/apache/beam/pull/5496/files/d11c87396b7c21bf75f75eb722a19bafc8ae221e#diff-04fef9e0550df0b0c4e1cd0264406eb5R442),
 and whether [`block = next(blocks)` 
below](https://github.com/apache/beam/pull/5496/files/d11c87396b7c21bf75f75eb722a19bafc8ae221e#diff-04fef9e0550df0b0c4e1cd0264406eb5R454)
 will correctly read from the place that `f` is pointing at, to which the 
answer is "yes"; the blocks iterator is formed as a closure around the 
file-pointer, and at each iteration-step it will get whatever bytes the 
file-pointer is pointing at.
   
   I believe this all mirrors how `_AvroSource` works, as well.
   
   Thanks for digging in to this, lmk if it makes sense!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 116689)
    Time Spent: 4h  (was: 3h 50m)

> Consider a faster Avro library in Python
> ----------------------------------------
>
>                 Key: BEAM-2810
>                 URL: https://issues.apache.org/jira/browse/BEAM-2810
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> https://stackoverflow.com/questions/45870789/bottleneck-on-data-source
> Seems like this job is reading Avro files (exported by BigQuery) at about 2 
> MB/s.
> We use the standard Python "avro" library which is apparently known to be 
> very slow (10x+ slower than Java) 
> http://apache-avro.679487.n3.nabble.com/Avro-decode-very-slow-in-Python-td4034422.html,
>  and there are alternatives e.g. https://pypi.python.org/pypi/fastavro/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to