[
https://issues.apache.org/jira/browse/BEAM-4444?focusedWorklogId=167583&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167583
]
ASF GitHub Bot logged work on BEAM-4444:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Nov/18 01:17
Start Date: 20/Nov/18 01:17
Worklog Time Spent: 10m
Work Description: udim commented on a change in pull request #6763:
[BEAM-4444] Parquet IO for Python SDK
URL: https://github.com/apache/beam/pull/6763#discussion_r234820693
##########
File path: sdks/python/apache_beam/io/parquetio.py
##########
@@ -0,0 +1,465 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""``PTransforms`` for reading from and writing to Parquet files.
+
+Provides two read ``PTransform`` s, ``ReadFromParquet`` and
+``ReadAllFromParquet``, that produces a ``PCollection`` of records.
+Each record of this ``PCollection`` will contain a single record read from
+a Parquet file. Records that are of simple types will be mapped into
+corresponding Python types. The actual parquet file operations are done by
+pyarrow. Source splitting is supported at row group granularity.
+
+Additionally, this module provides a write ``PTransform`` ``WriteToParquet``
+that can be used to write a given ``PCollection`` of Python objects to a
+Parquet file.
+"""
+from __future__ import absolute_import
+
+from functools import partial
+
+import pyarrow as pa
+from pyarrow.parquet import ParquetFile
+from pyarrow.parquet import ParquetWriter
+
+from apache_beam.io import filebasedsink
+from apache_beam.io import filebasedsource
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.iobase import RangeTracker
+from apache_beam.io.iobase import Read
+from apache_beam.io.iobase import Write
+from apache_beam.transforms import PTransform
+
+__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet']
+
+
+class ReadFromParquet(PTransform):
+ """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading
+ Parquet files. This `PTransform` is currently experimental. No
+ backward-compatibility guarantees."""
+
+ def __init__(self, file_pattern=None, min_bundle_size=0,
+ validate=True, columns=None):
+ """Initializes :class:`ReadFromParquet`.
+
+ Uses source ``_ParquetSource`` to read a set of Parquet files defined by
+ a given file pattern.
+
+ If ``/mypath/myparquetfiles*`` is a file-pattern that points to a set of
+ Parquet files, a :class:`~apache_beam.pvalue.PCollection` for the records
in
+ these Parquet files can be created in the following manner.
+
+ .. testcode::
+
+ with beam.Pipeline() as p:
+ records = p | 'Read' >> ReadFromParquet('/mypath/myavrofiles*')
+
+ .. NOTE: We're not actually interested in this error; but if we get here,
+ it means that the way of calling this transform hasn't changed.
+
+ .. testoutput::
+ :hide:
+
+ Traceback (most recent call last):
+ ...
+ IOError: No files found based on the file pattern
+
+ Each element of this :class:`~apache_beam.pvalue.PCollection` will contain
+ a single record read from a source. The element is a Python dictionary that
+ keys of each dictionary will contain the corresponding column names and
will
+ be of type :class:`str` while the values of the dictionary will be of the
+ type defined in the corresponding Parquet schema. Records that are of
simple
+ types will be mapped into corresponding Python types. Records that are of
+ complex types like list and struct will be mapped to Python list and
+ dictionary respectively. For more information on supported types and
schema,
+ please see the pyarrow document.
+
+
+ Args:
+ file_pattern (str): the file glob to read
+ min_bundle_size (int): the minimum size in bytes, to be considered when
+ splitting the input into bundles.
+ validate (bool): flag to verify that the files exist during the pipeline
+ creation time.
+ columns (List[str]): list of columns that will be read from files.
+ A column name may be a prefix of a nested field, e.g. 'a' will select
+ 'a.b', 'a.c', and 'a.d.e'
+"""
+ super(ReadFromParquet, self).__init__()
+ self._source = _create_parquet_source(
+ file_pattern,
+ min_bundle_size,
+ validate=validate,
+ columns=columns
+ )
+
+ def expand(self, pvalue):
+ return pvalue.pipeline | Read(self._source)
+
+ def display_data(self):
+ return {'source_dd': self._source}
+
+
+class ReadAllFromParquet(PTransform):
+ """A ``PTransform`` for reading ``PCollection`` of Parquet files.
+
+ Uses source ``_ParquetSource`` to read a ``PCollection`` of Parquet files or
+ file patterns and produce a ``PCollection`` of Parquet records. This
+ ``PTransform`` is currently experimental. No backward-compatibility
+ guarantees.
+ """
+
+ DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB
+
+ def __init__(self, min_bundle_size=0,
+ desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE,
+ columns=None,
+ label='ReadAllFiles'):
+ """Initializes ``ReadAllFromParquet``.
+
+ Args:
+ min_bundle_size: the minimum size in bytes, to be considered when
+ splitting the input into bundles.
+ desired_bundle_size: the desired size in bytes, to be considered when
+ splitting the input into bundles.
+ columns: list of columns that will be read from files. A column name
+ may be a prefix of a nested field, e.g. 'a' will select
+ 'a.b', 'a.c', and 'a.d.e'
+ """
+ super(ReadAllFromParquet, self).__init__()
+ source_from_file = partial(
+ _create_parquet_source,
+ min_bundle_size=min_bundle_size,
+ columns=columns
+ )
+ self._read_all_files = filebasedsource.ReadAllFiles(
+ True, CompressionTypes.UNCOMPRESSED, desired_bundle_size,
min_bundle_size,
+ source_from_file)
+
+ self.label = label
+
+ def expand(self, pvalue):
+ return pvalue | self.label >> self._read_all_files
+
+
+def _create_parquet_source(file_pattern=None,
+ min_bundle_size=0,
+ validate=False,
+ columns=None):
+ return \
+ _ParquetSource(
+ file_pattern=file_pattern,
+ min_bundle_size=min_bundle_size,
+ validate=validate,
+ columns=columns
+ )
+
+
+class _ParquetUtils(object):
+ @staticmethod
+ def find_first_row_group_index(pf, start_offset):
+ for i in range(_ParquetUtils.get_number_of_row_groups(pf)):
+ row_group_start_offset = _ParquetUtils.get_offset(pf, i)
+ if row_group_start_offset >= start_offset:
+ return i
+ return -1
+
+ @staticmethod
+ def get_offset(pf, row_group_index):
+ first_column_metadata =\
+ pf.metadata.row_group(row_group_index).column(0)
+ if first_column_metadata.has_dictionary_page:
+ return first_column_metadata.dictionary_page_offset
+ else:
+ return first_column_metadata.data_page_offset
+
+ @staticmethod
+ def get_number_of_row_groups(pf):
+ return pf.metadata.num_row_groups
+
+
+class _ParquetSource(filebasedsource.FileBasedSource):
+ """A source for reading Parquet files.
+ """
+ def __init__(self, file_pattern, min_bundle_size, validate, columns):
+ super(_ParquetSource, self).__init__(
+ file_pattern=file_pattern,
+ min_bundle_size=min_bundle_size,
+ validate=validate
+ )
+ self._columns = columns
+
+ def read_records(self, file_name, range_tracker):
+ next_block_start = -1
+
+ def split_points_unclaimed(stop_position):
+ if next_block_start >= stop_position:
+ # Next block starts at or after the suggested stop position. Hence
+ # there will not be split points to be claimed for the range ending at
+ # suggested stop position.
+ return 0
+ return RangeTracker.SPLIT_POINTS_UNKNOWN
+
+ range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
+
+ start_offset = range_tracker.start_position()
+ if start_offset is None:
+ start_offset = 0
+
+ with self.open_file(file_name) as f:
+ pf = ParquetFile(f)
+
+ # find the first dictionary page (or data page if there's no dictionary
+ # page available) offset after the given start_offset. This offset is
also
+ # the starting offset of any row group since the Parquet specification
+ # describes that the data pages always come first before the meta data in
+ # each row group.
+ index = _ParquetUtils.find_first_row_group_index(pf, start_offset)
+ if index != -1:
+ next_block_start = _ParquetUtils.get_offset(pf, index)
+ else:
+ next_block_start = range_tracker.stop_position()
+ number_of_row_groups = _ParquetUtils.get_number_of_row_groups(pf)
+
+ while range_tracker.try_claim(next_block_start):
+ table = pf.read_row_group(index, self._columns)
+
+ if index + 1 < number_of_row_groups:
+ index = index + 1
+ next_block_start = _ParquetUtils.get_offset(pf, index)
+ else:
+ next_block_start = range_tracker.stop_position()
+
+ num_rows = table.num_rows
+ data_items = table.to_pydict().items()
+ for n in range(num_rows):
+ row = {}
+ for column, values in data_items:
+ row[column] = values[n]
+ yield row
+
+
+class WriteToParquet(PTransform):
+ """A ``PTransform`` for writing parquet files.
+
+ This ``PTransform`` is currently experimental. No backward-compatibility
+ guarantees.
+ """
+
+ def __init__(self,
+ file_path_prefix,
+ schema,
+ row_group_buffer_size=64*1024*1024,
+ record_batch_size=1000,
+ codec='none',
+ use_deprecated_int96_timestamps=False,
+ file_name_suffix='',
+ num_shards=0,
+ shard_name_template=None,
+ mime_type='application/x-parquet'):
+ """Initialize a WriteToParquet transform.
+
+ Writes parquet files from a :class:`~apache_beam.pvalue.PCollection` of
+ records. Each record is a dictionary with keys of a string type that
+ represent column names. Schema must be specified like the example below. ::
+
+ with beam.Pipeline() as p:
Review comment:
Try putting a `.. testcode::` above this line to test that this code snippet
is somewhat correct and kept up-to-date.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 167583)
Time Spent: 10h 10m (was: 10h)
> Parquet IO for Python SDK
> -------------------------
>
> Key: BEAM-4444
> URL: https://issues.apache.org/jira/browse/BEAM-4444
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Bruce Arctor
> Assignee: Heejong Lee
> Priority: Major
> Time Spent: 10h 10m
> Remaining Estimate: 0h
>
> Add Parquet Support for the Python SDK.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)