This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c522bea ARROW-2555: [C++/Python] Allow Parquet-Arrow writer to
truncate timestamps instead of failing
c522bea is described below
commit c522bea0528591120306a69b3224821d7d4167fd
Author: Eric Conlon <[email protected]>
AuthorDate: Mon Oct 1 06:05:03 2018 -0400
ARROW-2555: [C++/Python] Allow Parquet-Arrow writer to truncate timestamps
instead of failing
Plumbs an option `allow_truncated_timestamps` through ParquetWriter and the
like. If this option is set, it allows timestamps to be coerced from us to ms
without complaining or crashing.
Author: Eric Conlon <[email protected]>
Author: Wes McKinney <[email protected]>
Closes #2629 from ejconlon/econlon/pycoerce and squashes the following
commits:
79ee48373 <Wes McKinney> Add documentation for allow_truncated_timestamps,
explicit parameter to write_table
a4497fb2d <Eric Conlon> comment lint
8068dbe48 <Eric Conlon> pylint
7c0fe6faa <Eric Conlon> format
791fe7dcc <Eric Conlon> consistent truncated naming
67f032d26 <Eric Conlon> add python test for truncated timestamps
046ac3791 <Eric Conlon> flake8 line length fixed
56ed9ebbd <Eric Conlon> revert removal of clang format version
cd424b814 <Eric Conlon> fix clang-format issues
48a842cbc <Eric Conlon> cpp test for truncation
2c4bdae87 <Eric Conlon> allow truncation
169b87394 <Eric Conlon> add option to allow truncating timestamps
---
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 10 +++
cpp/src/parquet/arrow/writer.cc | 13 ++--
cpp/src/parquet/arrow/writer.h | 28 ++++++--
python/doc/source/parquet.rst | 79 ++++++++++++++++-------
python/pyarrow/_parquet.pxd | 2 +
python/pyarrow/_parquet.pyx | 13 +++-
python/pyarrow/parquet.py | 6 ++
python/pyarrow/tests/test_parquet.py | 29 +++++++++
8 files changed, 148 insertions(+), 32 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 30dbf4a..0866727 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1391,6 +1391,16 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink,
10,
default_writer_properties(),
coerce_millis));
+ // OK to lose precision if we explicitly allow it
+ auto allow_truncation = (ArrowWriterProperties::Builder()
+ .coerce_timestamps(TimeUnit::MILLI)
+ ->allow_truncated_timestamps()
+ ->build());
+ ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
+ default_writer_properties(),
allow_truncation));
+ ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
+ default_writer_properties(),
allow_truncation));
+
// OK to write micros to micros
auto coerce_micros =
(ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 9247b84..923f132 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -366,8 +366,9 @@ class ArrowColumnWriter {
Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t*
def_levels,
const int16_t* rep_levels);
- Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
- const int16_t* def_levels, const int16_t*
rep_levels);
+ Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const
Array& data,
+ int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels);
template <typename ParquetType, typename ArrowType>
Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
@@ -626,7 +627,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array&
values, int64_t num_level
// Casting is required. This covers several cases
// * Nanoseconds -> cast to microseconds
// * coerce_timestamps_enabled_, cast all timestamps to requested unit
- return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
+ return
WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values,
+ num_levels, def_levels, rep_levels);
} else {
// No casting of timestamps is required, take the fast path
return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values,
num_levels,
@@ -634,7 +636,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array&
values, int64_t num_level
}
}
-Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t
num_levels,
+Status ArrowColumnWriter::WriteTimestampsCoerce(const bool
truncated_timestamps_allowed,
+ const Array& array, int64_t
num_levels,
const int16_t* def_levels,
const int16_t* rep_levels) {
int64_t* buffer;
@@ -652,7 +655,7 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const
Array& array, int64_t num_
auto DivideBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
- if (!data.IsNull(i) && (values[i] % factor != 0)) {
+ if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] %
factor != 0)) {
std::stringstream ss;
ss << "Casting from " << type.ToString() << " to " <<
target_type->ToString()
<< " would lose data: " << values[i];
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index ad6f1d5..7e4b228 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -44,7 +44,10 @@ class PARQUET_EXPORT ArrowWriterProperties {
public:
class Builder {
public:
- Builder() : write_nanos_as_int96_(false),
coerce_timestamps_enabled_(false) {}
+ Builder()
+ : write_nanos_as_int96_(false),
+ coerce_timestamps_enabled_(false),
+ truncated_timestamps_allowed_(false) {}
virtual ~Builder() {}
Builder* disable_deprecated_int96_timestamps() {
@@ -63,9 +66,20 @@ class PARQUET_EXPORT ArrowWriterProperties {
return this;
}
+ Builder* allow_truncated_timestamps() {
+ truncated_timestamps_allowed_ = true;
+ return this;
+ }
+
+ Builder* disallow_truncated_timestamps() {
+ truncated_timestamps_allowed_ = false;
+ return this;
+ }
+
std::shared_ptr<ArrowWriterProperties> build() {
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
- write_nanos_as_int96_, coerce_timestamps_enabled_,
coerce_timestamps_unit_));
+ write_nanos_as_int96_, coerce_timestamps_enabled_,
coerce_timestamps_unit_,
+ truncated_timestamps_allowed_));
}
private:
@@ -73,6 +87,7 @@ class PARQUET_EXPORT ArrowWriterProperties {
bool coerce_timestamps_enabled_;
::arrow::TimeUnit::type coerce_timestamps_unit_;
+ bool truncated_timestamps_allowed_;
};
bool support_deprecated_int96_timestamps() const { return
write_nanos_as_int96_; }
@@ -82,17 +97,22 @@ class PARQUET_EXPORT ArrowWriterProperties {
return coerce_timestamps_unit_;
}
+ bool truncated_timestamps_allowed() const { return
truncated_timestamps_allowed_; }
+
private:
explicit ArrowWriterProperties(bool write_nanos_as_int96,
bool coerce_timestamps_enabled,
- ::arrow::TimeUnit::type
coerce_timestamps_unit)
+ ::arrow::TimeUnit::type
coerce_timestamps_unit,
+ bool truncated_timestamps_allowed)
: write_nanos_as_int96_(write_nanos_as_int96),
coerce_timestamps_enabled_(coerce_timestamps_enabled),
- coerce_timestamps_unit_(coerce_timestamps_unit) {}
+ coerce_timestamps_unit_(coerce_timestamps_unit),
+ truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
const bool write_nanos_as_int96_;
const bool coerce_timestamps_enabled_;
const ::arrow::TimeUnit::type coerce_timestamps_unit_;
+ const bool truncated_timestamps_allowed_;
};
std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT
default_arrow_writer_properties();
diff --git a/python/doc/source/parquet.rst b/python/doc/source/parquet.rst
index dfbb318..5422ebe 100644
--- a/python/doc/source/parquet.rst
+++ b/python/doc/source/parquet.rst
@@ -47,10 +47,10 @@ support bundled:
import pyarrow.parquet as pq
-If you are building ``pyarrow`` from source, you must also build `parquet-cpp
-<http://github.com/apache/parquet-cpp>`_ and enable the Parquet extensions when
-building ``pyarrow``. See the :ref:`Development <development>` page for more
-details.
+If you are building ``pyarrow`` from source, you must use
+``-DARROW_PARQUET=ON`` when compiling the C++ libraries and enable the Parquet
+extensions when building ``pyarrow``. See the :ref:`Development <development>`
+page for more details.
Reading and Writing Single Files
--------------------------------
@@ -190,6 +190,30 @@ Alternatively python ``with`` syntax can also be use:
!rm example2.parquet
!rm example3.parquet
+Data Type Handling
+------------------
+
+Storing timestamps
+~~~~~~~~~~~~~~~~~~
+
+Some Parquet readers may only support timestamps stored in millisecond
+(``'ms'``) or microsecond (``'us'``) resolution. Since pandas uses nanoseconds
+to represent timestamps, this can occasionally be a nuisance. We provide the
+``coerce_timestamps`` option to allow you to select the desired resolution:
+
+.. code-block:: python
+
+ pq.write_table(table, where, coerce_timestamps='ms')
+
+If a cast to a lower resolution value may result in a loss of data, by default
+an exception will be raised. This can be suppressed by passing
+``allow_truncated_timestamps=True``:
+
+.. code-block:: python
+
+ pq.write_table(table, where, coerce_timestamps='ms',
+ allow_truncated_timestamps=True)
+
Compression, Encoding, and File Compatibility
---------------------------------------------
@@ -256,29 +280,35 @@ A dataset partitioned by year and month may look like on
disk:
Writing to Partitioned Datasets
------------------------------------------------
-You can write a partitioned dataset for any ``pyarrow`` file system that is a
file-store (e.g. local, HDFS, S3). The
-default behaviour when no filesystem is added is to use the local filesystem.
+You can write a partitioned dataset for any ``pyarrow`` file system that is a
+file-store (e.g. local, HDFS, S3). The default behaviour when no filesystem is
+added is to use the local filesystem.
.. code-block:: python
# Local dataset write
- pq.write_to_dataset(table, root_path='dataset_name', partition_cols=['one',
'two'])
+ pq.write_to_dataset(table, root_path='dataset_name',
+ partition_cols=['one', 'two'])
-The root path in this case specifies the parent directory to which data will
be saved. The partition columns are the
-column names by which to partition the dataset. Columns are partitioned in the
order they are given. The partition
+The root path in this case specifies the parent directory to which data will be
+saved. The partition columns are the column names by which to partition the
+dataset. Columns are partitioned in the order they are given. The partition
splits are determined by the unique values in the partition columns.
-To use another filesystem you only need to add the filesystem parameter, the
individual table writes are wrapped
-using ``with`` statements so the ``pq.write_to_dataset`` function does not
need to be.
+To use another filesystem you only need to add the filesystem parameter, the
+individual table writes are wrapped using ``with`` statements so the
+``pq.write_to_dataset`` function does not need to be.
.. code-block:: python
# Remote file-system example
fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path)
- pq.write_to_dataset(table, root_path='dataset_name', partition_cols=['one',
'two'], filesystem=fs)
+ pq.write_to_dataset(table, root_path='dataset_name',
+ partition_cols=['one', 'two'], filesystem=fs)
-Compatibility Note: if using ``pq.write_to_dataset`` to create a table that
will then be used by HIVE then partition
-column values must be compatible with the allowed character set of the HIVE
version you are running.
+Compatibility Note: if using ``pq.write_to_dataset`` to create a table that
+will then be used by HIVE then partition column values must be compatible with
+the allowed character set of the HIVE version you are running.
Reading from Partitioned Datasets
------------------------------------------------
@@ -292,17 +322,20 @@ such as those produced by Hive:
dataset = pq.ParquetDataset('dataset_name/')
table = dataset.read()
-You can also use the convenience function ``read_table`` exposed by
``pyarrow.parquet``
-that avoids the need for an additional Dataset object creation step.
+You can also use the convenience function ``read_table`` exposed by
+``pyarrow.parquet`` that avoids the need for an additional Dataset object
+creation step.
.. code-block:: python
table = pq.read_table('dataset_name')
-Note: the partition columns in the original table will have their types
converted to Arrow dictionary types
-(pandas categorical) on load. Ordering of partition columns is not preserved
through the save/load process. If reading
-from a remote filesystem into a pandas dataframe you may need to run
``sort_index`` to maintain row ordering
-(as long as the ``preserve_index`` option was enabled on write).
+Note: the partition columns in the original table will have their types
+converted to Arrow dictionary types (pandas categorical) on load. Ordering of
+partition columns is not preserved through the save/load process. If reading
+from a remote filesystem into a pandas dataframe you may need to run
+``sort_index`` to maintain row ordering (as long as the ``preserve_index``
+option was enabled on write).
Using with Spark
----------------
@@ -362,6 +395,8 @@ Dependencies:
Notes:
-* The ``account_key`` can be found under ``Settings -> Access keys`` in the
Microsoft Azure portal for a given container
-* The code above works for a container with private access, Lease State =
Available, Lease Status = Unlocked
+* The ``account_key`` can be found under ``Settings -> Access keys`` in the
+ Microsoft Azure portal for a given container
+* The code above works for a container with private access, Lease State =
+ Available, Lease Status = Unlocked
* The parquet file was Blob Type = Block blob
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index b2f0a46..575ffbc 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -323,5 +323,7 @@ cdef extern from "parquet/arrow/writer.h" namespace
"parquet::arrow" nogil:
Builder* disable_deprecated_int96_timestamps()
Builder* enable_deprecated_int96_timestamps()
Builder* coerce_timestamps(TimeUnit unit)
+ Builder* allow_truncated_timestamps()
+ Builder* disallow_truncated_timestamps()
shared_ptr[ArrowWriterProperties] build()
c_bool support_deprecated_int96_timestamps()
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 937e70a..5ea5c18 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -804,6 +804,7 @@ cdef class ParquetWriter:
object use_dictionary
object use_deprecated_int96_timestamps
object coerce_timestamps
+ object allow_truncated_timestamps
object compression
object version
int row_group_size
@@ -812,7 +813,8 @@ cdef class ParquetWriter:
compression=None, version=None,
MemoryPool memory_pool=None,
use_deprecated_int96_timestamps=False,
- coerce_timestamps=None):
+ coerce_timestamps=None,
+ allow_truncated_timestamps=False):
cdef:
shared_ptr[WriterProperties] properties
c_string c_where
@@ -835,6 +837,7 @@ cdef class ParquetWriter:
self.version = version
self.use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
self.coerce_timestamps = coerce_timestamps
+ self.allow_truncated_timestamps = allow_truncated_timestamps
cdef WriterProperties.Builder properties_builder
self._set_version(&properties_builder)
@@ -845,6 +848,7 @@ cdef class ParquetWriter:
cdef ArrowWriterProperties.Builder arrow_properties_builder
self._set_int96_support(&arrow_properties_builder)
self._set_coerce_timestamps(&arrow_properties_builder)
+ self._set_allow_truncated_timestamps(&arrow_properties_builder)
arrow_properties = arrow_properties_builder.build()
pool = maybe_unbox_memory_pool(memory_pool)
@@ -870,6 +874,13 @@ cdef class ParquetWriter:
raise ValueError('Invalid value for coerce_timestamps: {0}'
.format(self.coerce_timestamps))
+ cdef void _set_allow_truncated_timestamps(
+ self, ArrowWriterProperties.Builder* props):
+ if self.allow_truncated_timestamps:
+ props.allow_truncated_timestamps()
+ else:
+ props.disallow_truncated_timestamps()
+
cdef void _set_version(self, WriterProperties.Builder* props):
if self.version is not None:
if self.version == "1.0":
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 9fa97b4..5fa9132 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -246,6 +246,10 @@ use_deprecated_int96_timestamps : boolean, default None
coerce_timestamps : string, default None
Cast timestamps a particular resolution.
Valid values: {None, 'ms', 'us'}
+allow_truncated_timestamps : boolean, default False
+ Allow loss of data when coercing timestamps to a particular
+ resolution. E.g. if microsecond or nanosecond data is lost when coercing to
+ 'ms', do not raise an exception
compression : str or dict
Specify the compression codec, either on a general basis or per-column.
Valid values: {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD'}
@@ -1049,6 +1053,7 @@ def write_table(table, where, row_group_size=None,
version='1.0',
use_dictionary=True, compression='snappy',
use_deprecated_int96_timestamps=None,
coerce_timestamps=None,
+ allow_truncated_timestamps=False,
flavor=None, **kwargs):
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps
@@ -1059,6 +1064,7 @@ def write_table(table, where, row_group_size=None,
version='1.0',
flavor=flavor,
use_dictionary=use_dictionary,
coerce_timestamps=coerce_timestamps,
+ allow_truncated_timestamps=allow_truncated_timestamps,
compression=compression,
use_deprecated_int96_timestamps=use_int96,
**kwargs) as writer:
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index 4e50e64..4f54366 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -767,6 +767,35 @@ def test_coerce_timestamps(tempdir):
coerce_timestamps='unknown')
+def test_coerce_timestamps_truncated(tempdir):
+ """
+ ARROW-2555: Test that we can truncate timestamps when coercing if
+ explicitly allowed.
+ """
+ dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
+ second=1, microsecond=1)
+ dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
+ second=1)
+
+ fields_us = [pa.field('datetime64', pa.timestamp('us'))]
+ arrays_us = {'datetime64': [dt_us, dt_ms]}
+
+ df_us = pd.DataFrame(arrays_us)
+ schema_us = pa.schema(fields_us)
+
+ filename = tempdir / 'pandas_truncated.parquet'
+ table_us = pa.Table.from_pandas(df_us, schema=schema_us)
+
+ _write_table(table_us, filename, version="2.0", coerce_timestamps='ms',
+ allow_truncated_timestamps=True)
+ table_ms = _read_table(filename)
+ df_ms = table_ms.to_pandas()
+
+ arrays_expected = {'datetime64': [dt_ms, dt_ms]}
+ df_expected = pd.DataFrame(arrays_expected)
+ tm.assert_frame_equal(df_expected, df_ms)
+
+
def test_column_of_lists(tempdir):
df, schema = dataframe_with_lists()