arw2019 commented on a change in pull request #8816: URL: https://github.com/apache/arrow/pull/8816#discussion_r536199751
########## File path: python/pyarrow/tests/parquet/common.py ########## @@ -0,0 +1,317 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +import os + +import numpy as np +import pytest + +import pyarrow as pa +from pyarrow.filesystem import LocalFileSystem +from pyarrow.tests import util + +parametrize_legacy_dataset = pytest.mark.parametrize( + "use_legacy_dataset", + [True, pytest.param(False, marks=pytest.mark.dataset)]) +parametrize_legacy_dataset_not_supported = pytest.mark.parametrize( + "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)]) +parametrize_legacy_dataset_fixed = pytest.mark.parametrize( + "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail), + pytest.param(False, marks=pytest.mark.dataset)]) + +# Marks all of the tests in this module +# Ignore these with pytest ... -m 'not parquet' +pytestmark = pytest.mark.parquet + + +def _write_table(table, path, **kwargs): + # So we see the ImportError somewhere + import pyarrow.parquet as pq + from pyarrow.pandas_compat import _pandas_api + + if _pandas_api.is_data_frame(table): + table = pa.Table.from_pandas(table) + + pq.write_table(table, path, **kwargs) + return table + + +def _read_table(*args, **kwargs): + import pyarrow.parquet as pq + + table = pq.read_table(*args, **kwargs) + table.validate(full=True) + return table + + +def _roundtrip_table(table, read_table_kwargs=None, + write_table_kwargs=None, use_legacy_dataset=True): + read_table_kwargs = read_table_kwargs or {} + write_table_kwargs = write_table_kwargs or {} + + writer = pa.BufferOutputStream() + _write_table(table, writer, **write_table_kwargs) + reader = pa.BufferReader(writer.getvalue()) + return _read_table(reader, use_legacy_dataset=use_legacy_dataset, + **read_table_kwargs) + + +def _check_roundtrip(table, expected=None, read_table_kwargs=None, + use_legacy_dataset=True, **write_table_kwargs): + if expected is None: + expected = table + + read_table_kwargs = read_table_kwargs or {} + + # intentionally check twice + result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs, + write_table_kwargs=write_table_kwargs, + use_legacy_dataset=use_legacy_dataset) + assert result.equals(expected) + result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs, + write_table_kwargs=write_table_kwargs, + use_legacy_dataset=use_legacy_dataset) + assert result.equals(expected) + + +def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True): + table = pa.Table.from_pandas(df) + result = _roundtrip_table( + table, write_table_kwargs=write_kwargs, + use_legacy_dataset=use_legacy_dataset) + return result.to_pandas() + + +def _test_read_common_metadata_files(fs, base_path): + import pandas as pd + + import pyarrow.parquet as pq + + N = 100 + df = pd.DataFrame({ + 'index': np.arange(N), + 'values': np.random.randn(N) + }, columns=['index', 'values']) + + base_path = str(base_path) + data_path = os.path.join(base_path, 'data.parquet') + + table = pa.Table.from_pandas(df) + + with fs.open(data_path, 'wb') as f: + _write_table(table, f) + + metadata_path = os.path.join(base_path, '_common_metadata') + with fs.open(metadata_path, 'wb') as f: + pq.write_metadata(table.schema, f) + + dataset = pq.ParquetDataset(base_path, filesystem=fs) + assert dataset.common_metadata_path == str(metadata_path) + + with fs.open(data_path) as f: + common_schema = pq.read_metadata(f).schema + assert dataset.schema.equals(common_schema) + + # handle list of one directory + dataset2 = pq.ParquetDataset([base_path], filesystem=fs) + assert dataset2.schema.equals(dataset.schema) + + +def _random_integers(size, dtype): + # We do not generate integers outside the int64 range + platform_int_info = np.iinfo('int_') + iinfo = np.iinfo(dtype) + return np.random.randint(max(iinfo.min, platform_int_info.min), + min(iinfo.max, platform_int_info.max), + size=size).astype(dtype) + + +def _test_dataframe(size=10000, seed=0): + import pandas as pd + + np.random.seed(seed) + df = pd.DataFrame({ + 'uint8': _random_integers(size, np.uint8), + 'uint16': _random_integers(size, np.uint16), + 'uint32': _random_integers(size, np.uint32), + 'uint64': _random_integers(size, np.uint64), + 'int8': _random_integers(size, np.int8), + 'int16': _random_integers(size, np.int16), + 'int32': _random_integers(size, np.int32), + 'int64': _random_integers(size, np.int64), + 'float32': np.random.randn(size).astype(np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + 'strings': [util.rands(10) for i in range(size)], + 'all_none': [None] * size, + 'all_none_category': [None] * size + }) + + # TODO(PARQUET-1015) + # df['all_none_category'] = df['all_none_category'].astype('category') + return df + + +def _test_write_to_dataset_with_partitions(base_path, Review comment: Also reused in `test_hdfs.py` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
