[
https://issues.apache.org/jira/browse/BEAM-2810?focusedWorklogId=116784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116784
]
ASF GitHub Bot logged work on BEAM-2810:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Jun/18 05:10
Start Date: 28/Jun/18 05:10
Worklog Time Spent: 10m
Work Description: chamikaramj closed pull request #5496: [BEAM-2810] use
fastavro in Avro IO
URL: https://github.com/apache/beam/pull/5496
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.test-infra/jenkins/verify_performance_test_results.py
b/.test-infra/jenkins/verify_performance_test_results.py
index cf3be579f73..5f673efcced 100644
--- a/.test-infra/jenkins/verify_performance_test_results.py
+++ b/.test-infra/jenkins/verify_performance_test_results.py
@@ -21,7 +21,7 @@
# --mode=report - In this mode script iterates over list of BigQuery tables
and
# analyses the data. This mode is intended to be run on a regulars basis,
e.g. daily.
# Report will contain average tests execution time of given metric, its
comparison with
-# with average calculated from historical data, recent standard deviation
and standard
+# average calculated from historical data, recent standard deviation and
standard
# deviation calculated based on historical data.
# --mode=validation - In this mode script will analyse single BigQuery table
and check
# recent results.
diff --git a/sdks/python/apache_beam/examples/avro_bitcoin.py
b/sdks/python/apache_beam/examples/avro_bitcoin.py
new file mode 100644
index 00000000000..d05b73553b0
--- /dev/null
+++ b/sdks/python/apache_beam/examples/avro_bitcoin.py
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Collect statistics on transactions in a public bitcoin dataset that was
+exported to avro
+
+Usage:
+export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
+python -m apache_beam.examples.bitcoin \
+ --compress --fastavro --output fastavro-compressed
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import avro
+
+import apache_beam as beam
+from apache_beam.io.avroio import ReadFromAvro
+from apache_beam.io.avroio import WriteToAvro
+from apache_beam.metrics import Metrics
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class BitcoinTxnCountDoFn(beam.DoFn):
+ """Count inputs and outputs per transaction"""
+
+ def __init__(self):
+ super(BitcoinTxnCountDoFn, self).__init__()
+ self.txn_counter = Metrics.counter(self.__class__, 'txns')
+ self.inputs_dist = Metrics.distribution(self.__class__, 'inputs_per_txn')
+ self.outputs_dist = Metrics.distribution(self.__class__, 'outputs_per_txn')
+ self.output_amts_dist = Metrics.distribution(self.__class__, 'output_amts')
+ self.txn_amts_dist = Metrics.distribution(self.__class__, 'txn_amts')
+
+ def process(self, elem):
+ """Update counters and distributions, and filter and sum some fields"""
+
+ inputs = elem['inputs']
+ outputs = elem['outputs']
+
+ self.txn_counter.inc()
+
+ num_inputs = len(inputs)
+ num_outputs = len(outputs)
+
+ self.inputs_dist.update(num_inputs)
+ self.outputs_dist.update(num_outputs)
+
+ total = 0
+ for output in outputs:
+ amt = output['output_satoshis']
+ self.output_amts_dist.update(amt)
+ total += amt
+
+ self.txn_amts_dist.update(total)
+
+ return [
+ {
+ "transaction_id": elem["transaction_id"],
+ "timestamp": elem["timestamp"],
+ "block_id": elem["block_id"],
+ "previous_block": elem["previous_block"],
+ "num_inputs": num_inputs,
+ "num_outputs": num_outputs,
+ "sum_output": total,
+ }
+ ]
+
+
+SCHEMA = avro.schema.parse('''
+ {
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "Transaction",
+ "fields": [
+ {"name": "transaction_id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "block_id", "type": "string"},
+ {"name": "previous_block", "type": "string"},
+ {"name": "num_inputs", "type": "int"},
+ {"name": "num_outputs", "type": "int"},
+ {"name": "sum_output", "type": "long"}
+ ]
+ }
+ ''')
+
+
+def run(argv=None):
+ """Test Avro IO (backed by fastavro or Apache Avro) on a simple pipeline
+ that transforms bitcoin transactions"""
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://beam-avro-test/bitcoin/txns/*',
+ help='Input file(s) to process.')
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+ parser.add_argument('--compress',
+ dest='compress',
+ required=False,
+ action='store_true',
+ help='When set, compress the output data')
+ parser.add_argument('--fastavro',
+ dest='use_fastavro',
+ required=False,
+ action='store_true',
+ help='When set, use fastavro for Avro I/O')
+
+ opts, pipeline_args = parser.parse_known_args(argv)
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
+
+ # Read the avro file[pattern] into a PCollection.
+ records = \
+ p | 'read' >> ReadFromAvro(opts.input, use_fastavro=opts.use_fastavro)
+
+ measured = records | 'scan' >> beam.ParDo(BitcoinTxnCountDoFn())
+
+ # pylint: disable=expression-not-assigned
+ measured | 'write' >> \
+ WriteToAvro(
+ opts.output,
+ schema=SCHEMA,
+ codec=('deflate' if opts.compress else 'null'),
+ use_fastavro=opts.use_fastavro
+ )
+
+ result = p.run()
+ result.wait_until_finish()
+
+ # Do not query metrics when creating a template which doesn't run
+ if (not hasattr(result, 'has_job') # direct runner
+ or result.has_job): # not just a template creation
+ metrics = result.metrics().query()
+
+ for counter in metrics['counters']:
+ logging.info("Counter: %s", counter)
+
+ for dist in metrics['distributions']:
+ logging.info("Distribution: %s", dist)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
diff --git a/sdks/python/apache_beam/io/avroio.py
b/sdks/python/apache_beam/io/avroio.py
index de883e15b27..1368734f17d 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -51,6 +51,8 @@
from avro import io as avroio
from avro import datafile
from avro import schema
+from fastavro.read import block_reader
+from fastavro.write import Writer
import apache_beam as beam
from apache_beam.io import filebasedsink
@@ -67,7 +69,8 @@ class ReadFromAvro(PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading avro
files."""
- def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
+ def __init__(self, file_pattern=None, min_bundle_size=0, validate=True,
+ use_fastavro=False):
"""Initializes :class:`ReadFromAvro`.
Uses source :class:`~apache_beam.io._AvroSource` to read a set of Avro
@@ -132,9 +135,16 @@ def __init__(self, file_pattern=None, min_bundle_size=0,
validate=True):
splitting the input into bundles.
validate (bool): flag to verify that the files exist during the pipeline
creation time.
+ use_fastavro (bool); when set, use the `fastavro` library for IO, which
+ is significantly faster, and will likely become the default
"""
super(ReadFromAvro, self).__init__()
- self._source = _AvroSource(file_pattern, min_bundle_size,
validate=validate)
+ self._source = _create_avro_source(
+ file_pattern,
+ min_bundle_size,
+ validate=validate,
+ use_fastavro=use_fastavro
+ )
def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)
@@ -153,7 +163,9 @@ class ReadAllFromAvro(PTransform):
DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB
def __init__(self, min_bundle_size=0,
- desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE):
+ desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE,
+ use_fastavro=False,
+ label='ReadAllFiles'):
"""Initializes ``ReadAllFromAvro``.
Args:
@@ -163,13 +175,18 @@ def __init__(self, min_bundle_size=0,
splitting the input into bundles.
"""
source_from_file = partial(
- _create_avro_source, min_bundle_size=min_bundle_size)
+ _create_avro_source,
+ min_bundle_size=min_bundle_size,
+ use_fastavro=use_fastavro
+ )
self._read_all_files = filebasedsource.ReadAllFiles(
True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size,
source_from_file)
+ self.label = label
+
def expand(self, pvalue):
- return pvalue | 'ReadAllFiles' >> self._read_all_files
+ return pvalue | self.label >> self._read_all_files
class _AvroUtils(object):
@@ -264,10 +281,23 @@ def advance_file_past_next_sync_marker(f, sync_marker):
data = f.read(buf_size)
-def _create_avro_source(file_pattern=None, min_bundle_size=None):
- return _AvroSource(
- file_pattern=file_pattern, min_bundle_size=min_bundle_size,
- validate=False)
+def _create_avro_source(file_pattern=None,
+ min_bundle_size=None,
+ validate=False,
+ use_fastavro=False):
+ return \
+ _FastAvroSource(
+ file_pattern=file_pattern,
+ min_bundle_size=min_bundle_size,
+ validate=validate
+ ) \
+ if use_fastavro \
+ else \
+ _AvroSource(
+ file_pattern=file_pattern,
+ min_bundle_size=min_bundle_size,
+ validate=validate
+ )
class _AvroBlock(object):
@@ -276,8 +306,8 @@ class _AvroBlock(object):
def __init__(self, block_bytes, num_records, codec, schema_string,
offset, size):
# Decompress data early on (if needed) and thus decrease the number of
- # parallel copies of the data in memory at any given in time during
- # block iteration.
+ # parallel copies of the data in memory at any given time during block
+ # iteration.
self._decompressed_block_bytes = self._decompress_bytes(block_bytes, codec)
self._num_records = num_records
self._schema = schema.parse(schema_string)
@@ -360,8 +390,8 @@ def split_points_unclaimed(stop_position):
start_offset = 0
with self.open_file(file_name) as f:
- codec, schema_string, sync_marker = _AvroUtils.read_meta_data_from_file(
- f)
+ codec, schema_string, sync_marker = \
+ _AvroUtils.read_meta_data_from_file(f)
# We have to start at current position if previous bundle ended at the
# end of a sync marker.
@@ -369,7 +399,9 @@ def split_points_unclaimed(stop_position):
f.seek(start_offset)
_AvroUtils.advance_file_past_next_sync_marker(f, sync_marker)
- while range_tracker.try_claim(f.tell()):
+ next_block_start = f.tell()
+
+ while range_tracker.try_claim(next_block_start):
block = _AvroUtils.read_block_from_file(f, codec, schema_string,
sync_marker)
next_block_start = block.offset() + block.size()
@@ -377,6 +409,56 @@ def split_points_unclaimed(stop_position):
yield record
+class _FastAvroSource(filebasedsource.FileBasedSource):
+ """A source for reading Avro files using the `fastavro` library.
+
+ ``_FastAvroSource`` is implemented using the file-based source framework
+ available in module 'filebasedsource'. Hence please refer to module
+ 'filebasedsource' to fully understand how this source implements operations
+ common to all file-based sources such as file-pattern expansion and splitting
+ into bundles for parallel processing.
+
+ TODO: remove ``_AvroSource`` in favor of using ``_FastAvroSource``
+ everywhere once it has been more widely tested
+ """
+
+ def read_records(self, file_name, range_tracker):
+ next_block_start = -1
+
+ def split_points_unclaimed(stop_position):
+ if next_block_start >= stop_position:
+ # Next block starts at or after the suggested stop position. Hence
+ # there will not be split points to be claimed for the range ending at
+ # suggested stop position.
+ return 0
+
+ return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN
+
+ range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
+
+ start_offset = range_tracker.start_position()
+ if start_offset is None:
+ start_offset = 0
+
+ with self.open_file(file_name) as f:
+ blocks = block_reader(f)
+ sync_marker = blocks._header['sync']
+
+ # We have to start at current position if previous bundle ended at the
+ # end of a sync marker.
+ start_offset = max(0, start_offset - len(sync_marker))
+ f.seek(start_offset)
+ _AvroUtils.advance_file_past_next_sync_marker(f, sync_marker)
+
+ next_block_start = f.tell()
+
+ while range_tracker.try_claim(next_block_start):
+ block = next(blocks)
+ next_block_start = block.offset + block.size
+ for record in block:
+ yield record
+
+
class WriteToAvro(beam.transforms.PTransform):
"""A ``PTransform`` for writing avro files."""
@@ -387,7 +469,8 @@ def __init__(self,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
- mime_type='application/x-avro'):
+ mime_type='application/x-avro',
+ use_fastavro=False):
"""Initialize a WriteToAvro transform.
Args:
@@ -414,12 +497,22 @@ def __init__(self,
is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template.
mime_type: The MIME type to use for the produced files, if the filesystem
supports specifying MIME types.
+ use_fastavro: when set, use the `fastavro` library for IO
Returns:
A WriteToAvro transform usable for writing.
"""
- self._sink = _AvroSink(file_path_prefix, schema, codec, file_name_suffix,
- num_shards, shard_name_template, mime_type)
+ self._sink = \
+ _create_avro_sink(
+ file_path_prefix,
+ schema,
+ codec,
+ file_name_suffix,
+ num_shards,
+ shard_name_template,
+ mime_type,
+ use_fastavro
+ )
def expand(self, pcoll):
return pcoll | beam.io.iobase.Write(self._sink)
@@ -428,8 +521,39 @@ def display_data(self):
return {'sink_dd': self._sink}
+def _create_avro_sink(file_path_prefix,
+ schema,
+ codec,
+ file_name_suffix,
+ num_shards,
+ shard_name_template,
+ mime_type,
+ use_fastavro):
+ return \
+ _FastAvroSink(
+ file_path_prefix,
+ schema,
+ codec,
+ file_name_suffix,
+ num_shards,
+ shard_name_template,
+ mime_type
+ ) \
+ if use_fastavro \
+ else \
+ _AvroSink(
+ file_path_prefix,
+ schema,
+ codec,
+ file_name_suffix,
+ num_shards,
+ shard_name_template,
+ mime_type
+ )
+
+
class _AvroSink(filebasedsink.FileBasedSink):
- """A sink to avro files."""
+ """A sink for avro files."""
def __init__(self,
file_path_prefix,
@@ -461,7 +585,21 @@ def write_record(self, writer, value):
writer.append(value)
def display_data(self):
- res = super(self.__class__, self).display_data()
+ res = super(_AvroSink, self).display_data()
res['codec'] = str(self._codec)
res['schema'] = str(self._schema)
return res
+
+
+class _FastAvroSink(_AvroSink):
+ """A sink for avro files that uses the `fastavro` library"""
+ def open(self, temp_path):
+ file_handle = super(_AvroSink, self).open(temp_path)
+ return Writer(file_handle, self._schema.to_json(), self._codec)
+
+ def write_record(self, writer, value):
+ writer.write(value)
+
+ def close(self, writer):
+ writer.flush()
+ super(_FastAvroSink, self).close(writer.fo)
diff --git a/sdks/python/apache_beam/io/avroio_test.py
b/sdks/python/apache_beam/io/avroio_test.py
index 8a344275a1a..9b9a855fcdf 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -33,8 +33,8 @@
from apache_beam.io import filebasedsource
from apache_beam.io import iobase
from apache_beam.io import source_test_utils
-from apache_beam.io.avroio import _AvroSink as AvroSink # For testing
-from apache_beam.io.avroio import _AvroSource as AvroSource # For testing
+from apache_beam.io.avroio import _create_avro_sink # For testing
+from apache_beam.io.avroio import _create_avro_source # For testing
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
@@ -53,6 +53,10 @@ class TestAvro(unittest.TestCase):
_temp_files = []
+ def __init__(self, methodName='runTest'):
+ super(TestAvro, self).__init__(methodName)
+ self.use_fastavro = False
+
def setUp(self):
# Reducing the size of thread pools. Without this test execution may fail
in
# environments with limited amount of resources.
@@ -123,7 +127,7 @@ def _write_pattern(self, num_files):
def _run_avro_test(self, pattern, desired_bundle_size, perform_splitting,
expected_result):
- source = AvroSource(pattern)
+ source = _create_avro_source(pattern, use_fastavro=self.use_fastavro)
read_records = []
if perform_splitting:
@@ -158,7 +162,12 @@ def test_read_with_splitting(self):
def test_source_display_data(self):
file_name = 'some_avro_source'
- source = AvroSource(file_name, validate=False)
+ source = \
+ _create_avro_source(
+ file_name,
+ validate=False,
+ use_fastavro=self.use_fastavro
+ )
dd = DisplayData.create_from(source)
# No extra avro parameters for AvroSource.
@@ -169,7 +178,11 @@ def test_source_display_data(self):
def test_read_display_data(self):
file_name = 'some_avro_source'
- read = avroio.ReadFromAvro(file_name, validate=False)
+ read = \
+ avroio.ReadFromAvro(
+ file_name,
+ validate=False,
+ use_fastavro=self.use_fastavro)
dd = DisplayData.create_from(read)
# No extra avro parameters for AvroSource.
@@ -180,13 +193,15 @@ def test_read_display_data(self):
def test_sink_display_data(self):
file_name = 'some_avro_sink'
- sink = AvroSink(file_name,
- self.SCHEMA,
- 'null',
- '.end',
- 0,
- None,
- 'application/x-avro')
+ sink = _create_avro_sink(
+ file_name,
+ self.SCHEMA,
+ 'null',
+ '.end',
+ 0,
+ None,
+ 'application/x-avro',
+ use_fastavro=self.use_fastavro)
dd = DisplayData.create_from(sink)
expected_items = [
DisplayDataItemMatcher(
@@ -206,7 +221,8 @@ def test_sink_display_data(self):
def test_write_display_data(self):
file_name = 'some_avro_sink'
write = avroio.WriteToAvro(file_name,
- self.SCHEMA)
+ self.SCHEMA,
+ use_fastavro=self.use_fastavro)
dd = DisplayData.create_from(write)
expected_items = [
DisplayDataItemMatcher(
@@ -225,12 +241,12 @@ def test_write_display_data(self):
def test_read_reentrant_without_splitting(self):
file_name = self._write_data()
- source = AvroSource(file_name)
+ source = _create_avro_source(file_name, use_fastavro=self.use_fastavro)
source_test_utils.assert_reentrant_reads_succeed((source, None, None))
def test_read_reantrant_with_splitting(self):
file_name = self._write_data()
- source = AvroSource(file_name)
+ source = _create_avro_source(file_name, use_fastavro=self.use_fastavro)
splits = [
split for split in source.split(desired_bundle_size=100000)]
assert len(splits) == 1
@@ -249,7 +265,7 @@ def test_read_with_splitting_multiple_blocks(self):
def test_split_points(self):
file_name = self._write_data(count=12000)
- source = AvroSource(file_name)
+ source = _create_avro_source(file_name, use_fastavro=self.use_fastavro)
splits = [
split
@@ -316,7 +332,7 @@ def test_dynamic_work_rebalancing_exhaustive(self):
try:
avro.datafile.SYNC_INTERVAL = 2
file_name = self._write_data(count=5)
- source = AvroSource(file_name)
+ source = _create_avro_source(file_name, use_fastavro=self.use_fastavro)
splits = [split
for split in source.split(desired_bundle_size=float('inf'))]
assert len(splits) == 1
@@ -339,7 +355,8 @@ def test_corrupted_file(self):
f.write(corrupted_data)
corrupted_file_name = f.name
- source = AvroSource(corrupted_file_name)
+ source = _create_avro_source(
+ corrupted_file_name, use_fastavro=self.use_fastavro)
with self.assertRaises(ValueError) as exn:
source_test_utils.read_from_source(source, None, None)
self.assertEqual(0, exn.exception.message.find('Unexpected sync marker'))
@@ -347,47 +364,64 @@ def test_corrupted_file(self):
def test_read_from_avro(self):
path = self._write_data()
with TestPipeline() as p:
- assert_that(p | avroio.ReadFromAvro(path), equal_to(self.RECORDS))
+ assert_that(
+ p | avroio.ReadFromAvro(path, use_fastavro=self.use_fastavro),
+ equal_to(self.RECORDS))
def test_read_all_from_avro_single_file(self):
path = self._write_data()
with TestPipeline() as p:
- assert_that(p | Create([path]) | avroio.ReadAllFromAvro(),
- equal_to(self.RECORDS))
+ assert_that(
+ p \
+ | Create([path]) \
+ | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro),
+ equal_to(self.RECORDS))
def test_read_all_from_avro_many_single_files(self):
path1 = self._write_data()
path2 = self._write_data()
path3 = self._write_data()
with TestPipeline() as p:
- assert_that(p | Create([path1, path2, path3]) | avroio.ReadAllFromAvro(),
- equal_to(self.RECORDS * 3))
+ assert_that(
+ p \
+ | Create([path1, path2, path3]) \
+ | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro),
+ equal_to(self.RECORDS * 3))
def test_read_all_from_avro_file_pattern(self):
file_pattern = self._write_pattern(5)
with TestPipeline() as p:
- assert_that(p | Create([file_pattern]) | avroio.ReadAllFromAvro(),
- equal_to(self.RECORDS * 5))
+ assert_that(
+ p \
+ | Create([file_pattern]) \
+ | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro),
+ equal_to(self.RECORDS * 5))
def test_read_all_from_avro_many_file_patterns(self):
file_pattern1 = self._write_pattern(5)
file_pattern2 = self._write_pattern(2)
file_pattern3 = self._write_pattern(3)
with TestPipeline() as p:
- assert_that(p
- | Create([file_pattern1, file_pattern2, file_pattern3])
- | avroio.ReadAllFromAvro(),
- equal_to(self.RECORDS * 10))
+ assert_that(
+ p \
+ | Create([file_pattern1, file_pattern2, file_pattern3]) \
+ | avroio.ReadAllFromAvro(use_fastavro=self.use_fastavro),
+ equal_to(self.RECORDS * 10))
def test_sink_transform(self):
with tempfile.NamedTemporaryFile() as dst:
path = dst.name
with TestPipeline() as p:
# pylint: disable=expression-not-assigned
- p | beam.Create(self.RECORDS) | avroio.WriteToAvro(path, self.SCHEMA)
+ p \
+ | beam.Create(self.RECORDS) \
+ | avroio.WriteToAvro(path, self.SCHEMA, use_fastavro=self.use_fastavro)
with TestPipeline() as p:
# json used for stable sortability
- readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
+ readback = \
+ p \
+ | avroio.ReadFromAvro(path + '*', use_fastavro=self.use_fastavro) \
+ | beam.Map(json.dumps)
assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
@unittest.skipIf(snappy is None, 'snappy not installed.')
@@ -396,14 +430,28 @@ def test_sink_transform_snappy(self):
path = dst.name
with TestPipeline() as p:
# pylint: disable=expression-not-assigned
- p | beam.Create(self.RECORDS) | avroio.WriteToAvro(
- path, self.SCHEMA, codec='snappy')
+ p \
+ | beam.Create(self.RECORDS) \
+ | avroio.WriteToAvro(
+ path,
+ self.SCHEMA,
+ codec='snappy',
+ use_fastavro=self.use_fastavro)
with TestPipeline() as p:
# json used for stable sortability
- readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
+ readback = \
+ p \
+ | avroio.ReadFromAvro(path + '*', use_fastavro=self.use_fastavro) \
+ | beam.Map(json.dumps)
assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
+class TestFastAvro(TestAvro):
+ def __init__(self, methodName='runTest'):
+ super(TestFastAvro, self).__init__(methodName)
+ self.use_fastavro = True
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/io/filebasedsource.py
b/sdks/python/apache_beam/io/filebasedsource.py
index 134800c3fff..4509a3616b5 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -383,8 +383,8 @@ def __init__(
source_from_file):
"""
Args:
- splittable: If True, files won't be split into sub-ranges. If False,
files
- may or may not be split into data ranges.
+ splittable: If False, files won't be split into sub-ranges. If True,
+ files may or may not be split into data ranges.
compression_type: A ``CompressionType`` object that specifies the
compression type of the files that will be processed. If
``CompressionType.AUTO``, system will try to automatically
diff --git a/sdks/python/apache_beam/io/filesystem.py
b/sdks/python/apache_beam/io/filesystem.py
index 8ebb6b5f029..0b99793dca0 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -312,7 +312,7 @@ def seek(self, offset, whence=os.SEEK_SET):
Seeking behavior:
* seeking from the end :data:`os.SEEK_END` the whole file is decompressed
- once to determine it's size. Therefore it is preferred to use
+ once to determine its size. Therefore it is preferred to use
:data:`os.SEEK_SET` or :data:`os.SEEK_CUR` to avoid the processing
overhead
* seeking backwards from the current position rewinds the file to ``0``
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 026a7ec6bef..b4d6cdc900a 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -98,6 +98,7 @@ def get_version():
'avro>=1.8.1,<2.0.0',
'crcmod>=1.7,<2.0',
'dill==0.2.6',
+ 'fastavro==0.19.7',
'grpcio>=1.8,<2',
'hdfs>=2.1.0,<3.0.0',
'httplib2>=0.8,<=0.11.3',
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 116784)
Time Spent: 5h (was: 4h 50m)
> Consider a faster Avro library in Python
> ----------------------------------------
>
> Key: BEAM-2810
> URL: https://issues.apache.org/jira/browse/BEAM-2810
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Eugene Kirpichov
> Assignee: Ryan Williams
> Priority: Major
> Time Spent: 5h
> Remaining Estimate: 0h
>
> https://stackoverflow.com/questions/45870789/bottleneck-on-data-source
> Seems like this job is reading Avro files (exported by BigQuery) at about 2
> MB/s.
> We use the standard Python "avro" library which is apparently known to be
> very slow (10x+ slower than Java)
> http://apache-avro.679487.n3.nabble.com/Avro-decode-very-slow-in-Python-td4034422.html,
> and there are alternatives e.g. https://pypi.python.org/pypi/fastavro/
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)