This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c522bea  ARROW-2555: [C++/Python] Allow Parquet-Arrow writer to 
truncate timestamps instead of failing
c522bea is described below

commit c522bea0528591120306a69b3224821d7d4167fd
Author: Eric Conlon <[email protected]>
AuthorDate: Mon Oct 1 06:05:03 2018 -0400

    ARROW-2555: [C++/Python] Allow Parquet-Arrow writer to truncate timestamps 
instead of failing
    
    Plumbs an option `allow_truncated_timestamps` through ParquetWriter and the 
like.  If this option is set, it allows timestamps to be coerced from us to ms 
without complaining or crashing.
    
    Author: Eric Conlon <[email protected]>
    Author: Wes McKinney <[email protected]>
    
    Closes #2629 from ejconlon/econlon/pycoerce and squashes the following 
commits:
    
    79ee48373 <Wes McKinney> Add documentation for allow_truncated_timestamps, 
explicit parameter to write_table
    a4497fb2d <Eric Conlon> comment lint
    8068dbe48 <Eric Conlon> pylint
    7c0fe6faa <Eric Conlon> format
    791fe7dcc <Eric Conlon> consistent truncated naming
    67f032d26 <Eric Conlon> add python test for truncated timestamps
    046ac3791 <Eric Conlon> flake8 line length fixed
    56ed9ebbd <Eric Conlon> revert removal of clang format version
    cd424b814 <Eric Conlon> fix clang-format issues
    48a842cbc <Eric Conlon> cpp test for truncation
    2c4bdae87 <Eric Conlon> allow truncation
    169b87394 <Eric Conlon> add option to allow truncating timestamps
---
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 10 +++
 cpp/src/parquet/arrow/writer.cc                   | 13 ++--
 cpp/src/parquet/arrow/writer.h                    | 28 ++++++--
 python/doc/source/parquet.rst                     | 79 ++++++++++++++++-------
 python/pyarrow/_parquet.pxd                       |  2 +
 python/pyarrow/_parquet.pyx                       | 13 +++-
 python/pyarrow/parquet.py                         |  6 ++
 python/pyarrow/tests/test_parquet.py              | 29 +++++++++
 8 files changed, 148 insertions(+), 32 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc 
b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 30dbf4a..0866727 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1391,6 +1391,16 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
   ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 
10,
                                     default_writer_properties(), 
coerce_millis));
 
+  // OK to lose precision if we explicitly allow it
+  auto allow_truncation = (ArrowWriterProperties::Builder()
+                               .coerce_timestamps(TimeUnit::MILLI)
+                               ->allow_truncated_timestamps()
+                               ->build());
+  ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), 
allow_truncation));
+  ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), 
allow_truncation));
+
   // OK to write micros to micros
   auto coerce_micros =
       
(ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 9247b84..923f132 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -366,8 +366,9 @@ class ArrowColumnWriter {
   Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* 
def_levels,
                          const int16_t* rep_levels);
 
-  Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
-                               const int16_t* def_levels, const int16_t* 
rep_levels);
+  Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const 
Array& data,
+                               int64_t num_levels, const int16_t* def_levels,
+                               const int16_t* rep_levels);
 
   template <typename ParquetType, typename ArrowType>
   Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
@@ -626,7 +627,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& 
values, int64_t num_level
     // Casting is required. This covers several cases
     // * Nanoseconds -> cast to microseconds
     // * coerce_timestamps_enabled_, cast all timestamps to requested unit
-    return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
+    return 
WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values,
+                                 num_levels, def_levels, rep_levels);
   } else {
     // No casting of timestamps is required, take the fast path
     return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, 
num_levels,
@@ -634,7 +636,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& 
values, int64_t num_level
   }
 }
 
-Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t 
num_levels,
+Status ArrowColumnWriter::WriteTimestampsCoerce(const bool 
truncated_timestamps_allowed,
+                                                const Array& array, int64_t 
num_levels,
                                                 const int16_t* def_levels,
                                                 const int16_t* rep_levels) {
   int64_t* buffer;
@@ -652,7 +655,7 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const 
Array& array, int64_t num_
 
   auto DivideBy = [&](const int64_t factor) {
     for (int64_t i = 0; i < array.length(); i++) {
-      if (!data.IsNull(i) && (values[i] % factor != 0)) {
+      if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] % 
factor != 0)) {
         std::stringstream ss;
         ss << "Casting from " << type.ToString() << " to " << 
target_type->ToString()
            << " would lose data: " << values[i];
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index ad6f1d5..7e4b228 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -44,7 +44,10 @@ class PARQUET_EXPORT ArrowWriterProperties {
  public:
   class Builder {
    public:
-    Builder() : write_nanos_as_int96_(false), 
coerce_timestamps_enabled_(false) {}
+    Builder()
+        : write_nanos_as_int96_(false),
+          coerce_timestamps_enabled_(false),
+          truncated_timestamps_allowed_(false) {}
     virtual ~Builder() {}
 
     Builder* disable_deprecated_int96_timestamps() {
@@ -63,9 +66,20 @@ class PARQUET_EXPORT ArrowWriterProperties {
       return this;
     }
 
+    Builder* allow_truncated_timestamps() {
+      truncated_timestamps_allowed_ = true;
+      return this;
+    }
+
+    Builder* disallow_truncated_timestamps() {
+      truncated_timestamps_allowed_ = false;
+      return this;
+    }
+
     std::shared_ptr<ArrowWriterProperties> build() {
       return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
-          write_nanos_as_int96_, coerce_timestamps_enabled_, 
coerce_timestamps_unit_));
+          write_nanos_as_int96_, coerce_timestamps_enabled_, 
coerce_timestamps_unit_,
+          truncated_timestamps_allowed_));
     }
 
    private:
@@ -73,6 +87,7 @@ class PARQUET_EXPORT ArrowWriterProperties {
 
     bool coerce_timestamps_enabled_;
     ::arrow::TimeUnit::type coerce_timestamps_unit_;
+    bool truncated_timestamps_allowed_;
   };
 
   bool support_deprecated_int96_timestamps() const { return 
write_nanos_as_int96_; }
@@ -82,17 +97,22 @@ class PARQUET_EXPORT ArrowWriterProperties {
     return coerce_timestamps_unit_;
   }
 
+  bool truncated_timestamps_allowed() const { return 
truncated_timestamps_allowed_; }
+
  private:
   explicit ArrowWriterProperties(bool write_nanos_as_int96,
                                  bool coerce_timestamps_enabled,
-                                 ::arrow::TimeUnit::type 
coerce_timestamps_unit)
+                                 ::arrow::TimeUnit::type 
coerce_timestamps_unit,
+                                 bool truncated_timestamps_allowed)
       : write_nanos_as_int96_(write_nanos_as_int96),
         coerce_timestamps_enabled_(coerce_timestamps_enabled),
-        coerce_timestamps_unit_(coerce_timestamps_unit) {}
+        coerce_timestamps_unit_(coerce_timestamps_unit),
+        truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
 
   const bool write_nanos_as_int96_;
   const bool coerce_timestamps_enabled_;
   const ::arrow::TimeUnit::type coerce_timestamps_unit_;
+  const bool truncated_timestamps_allowed_;
 };
 
 std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT 
default_arrow_writer_properties();
diff --git a/python/doc/source/parquet.rst b/python/doc/source/parquet.rst
index dfbb318..5422ebe 100644
--- a/python/doc/source/parquet.rst
+++ b/python/doc/source/parquet.rst
@@ -47,10 +47,10 @@ support bundled:
 
    import pyarrow.parquet as pq
 
-If you are building ``pyarrow`` from source, you must also build `parquet-cpp
-<http://github.com/apache/parquet-cpp>`_ and enable the Parquet extensions when
-building ``pyarrow``. See the :ref:`Development <development>` page for more
-details.
+If you are building ``pyarrow`` from source, you must use
+``-DARROW_PARQUET=ON`` when compiling the C++ libraries and enable the Parquet
+extensions when building ``pyarrow``. See the :ref:`Development <development>`
+page for more details.
 
 Reading and Writing Single Files
 --------------------------------
@@ -190,6 +190,30 @@ Alternatively python ``with`` syntax can also be use:
    !rm example2.parquet
    !rm example3.parquet
 
+Data Type Handling
+------------------
+
+Storing timestamps
+~~~~~~~~~~~~~~~~~~
+
+Some Parquet readers may only support timestamps stored in millisecond
+(``'ms'``) or microsecond (``'us'``) resolution. Since pandas uses nanoseconds
+to represent timestamps, this can occasionally be a nuisance. We provide the
+``coerce_timestamps`` option to allow you to select the desired resolution:
+
+.. code-block:: python
+
+   pq.write_table(table, where, coerce_timestamps='ms')
+
+If a cast to a lower resolution value may result in a loss of data, by default
+an exception will be raised. This can be suppressed by passing
+``allow_truncated_timestamps=True``:
+
+.. code-block:: python
+
+   pq.write_table(table, where, coerce_timestamps='ms',
+                  allow_truncated_timestamps=True)
+
 Compression, Encoding, and File Compatibility
 ---------------------------------------------
 
@@ -256,29 +280,35 @@ A dataset partitioned by year and month may look like on 
disk:
 Writing to Partitioned Datasets
 ------------------------------------------------
 
-You can write a partitioned dataset for any ``pyarrow`` file system that is a 
file-store (e.g. local, HDFS, S3). The
-default behaviour when no filesystem is added is to use the local filesystem.
+You can write a partitioned dataset for any ``pyarrow`` file system that is a
+file-store (e.g. local, HDFS, S3). The default behaviour when no filesystem is
+added is to use the local filesystem.
 
 .. code-block:: python
 
    # Local dataset write
-   pq.write_to_dataset(table, root_path='dataset_name', partition_cols=['one', 
'two'])
+   pq.write_to_dataset(table, root_path='dataset_name',
+                       partition_cols=['one', 'two'])
 
-The root path in this case specifies the parent directory to which data will 
be saved. The partition columns are the
-column names by which to partition the dataset. Columns are partitioned in the 
order they are given. The partition
+The root path in this case specifies the parent directory to which data will be
+saved. The partition columns are the column names by which to partition the
+dataset. Columns are partitioned in the order they are given. The partition
 splits are determined by the unique values in the partition columns.
 
-To use another filesystem you only need to add the filesystem parameter, the 
individual table writes are wrapped
-using ``with`` statements so the ``pq.write_to_dataset`` function does not 
need to be.
+To use another filesystem you only need to add the filesystem parameter, the
+individual table writes are wrapped using ``with`` statements so the
+``pq.write_to_dataset`` function does not need to be.
 
 .. code-block:: python
 
    # Remote file-system example
    fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path)
-   pq.write_to_dataset(table, root_path='dataset_name', partition_cols=['one', 
'two'], filesystem=fs)
+   pq.write_to_dataset(table, root_path='dataset_name',
+                       partition_cols=['one', 'two'], filesystem=fs)
 
-Compatibility Note: if using ``pq.write_to_dataset`` to create a table that 
will then be used by HIVE then partition
-column values must be compatible with the allowed character set of the HIVE 
version you are running.
+Compatibility Note: if using ``pq.write_to_dataset`` to create a table that
+will then be used by HIVE then partition column values must be compatible with
+the allowed character set of the HIVE version you are running.
 
 Reading from Partitioned Datasets
 ------------------------------------------------
@@ -292,17 +322,20 @@ such as those produced by Hive:
    dataset = pq.ParquetDataset('dataset_name/')
    table = dataset.read()
 
-You can also use the convenience function ``read_table`` exposed by 
``pyarrow.parquet``
-that avoids the need for an additional Dataset object creation step.
+You can also use the convenience function ``read_table`` exposed by
+``pyarrow.parquet`` that avoids the need for an additional Dataset object
+creation step.
 
 .. code-block:: python
 
    table = pq.read_table('dataset_name')
 
-Note: the partition columns in the original table will have their types 
converted to Arrow dictionary types
-(pandas categorical) on load. Ordering of partition columns is not preserved 
through the save/load process. If reading
-from a remote filesystem into a pandas dataframe you may need to run 
``sort_index`` to maintain row ordering
-(as long as the ``preserve_index`` option was enabled on write).
+Note: the partition columns in the original table will have their types
+converted to Arrow dictionary types (pandas categorical) on load. Ordering of
+partition columns is not preserved through the save/load process. If reading
+from a remote filesystem into a pandas dataframe you may need to run
+``sort_index`` to maintain row ordering (as long as the ``preserve_index``
+option was enabled on write).
 
 Using with Spark
 ----------------
@@ -362,6 +395,8 @@ Dependencies:
 
 Notes:
 
-* The ``account_key`` can be found under ``Settings -> Access keys`` in the 
Microsoft Azure portal for a given container
-* The code above works for a container with private access, Lease State = 
Available, Lease Status = Unlocked
+* The ``account_key`` can be found under ``Settings -> Access keys`` in the
+  Microsoft Azure portal for a given container
+* The code above works for a container with private access, Lease State =
+  Available, Lease Status = Unlocked
 * The parquet file was Blob Type = Block blob
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index b2f0a46..575ffbc 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -323,5 +323,7 @@ cdef extern from "parquet/arrow/writer.h" namespace 
"parquet::arrow" nogil:
             Builder* disable_deprecated_int96_timestamps()
             Builder* enable_deprecated_int96_timestamps()
             Builder* coerce_timestamps(TimeUnit unit)
+            Builder* allow_truncated_timestamps()
+            Builder* disallow_truncated_timestamps()
             shared_ptr[ArrowWriterProperties] build()
         c_bool support_deprecated_int96_timestamps()
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 937e70a..5ea5c18 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -804,6 +804,7 @@ cdef class ParquetWriter:
         object use_dictionary
         object use_deprecated_int96_timestamps
         object coerce_timestamps
+        object allow_truncated_timestamps
         object compression
         object version
         int row_group_size
@@ -812,7 +813,8 @@ cdef class ParquetWriter:
                   compression=None, version=None,
                   MemoryPool memory_pool=None,
                   use_deprecated_int96_timestamps=False,
-                  coerce_timestamps=None):
+                  coerce_timestamps=None,
+                  allow_truncated_timestamps=False):
         cdef:
             shared_ptr[WriterProperties] properties
             c_string c_where
@@ -835,6 +837,7 @@ cdef class ParquetWriter:
         self.version = version
         self.use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
         self.coerce_timestamps = coerce_timestamps
+        self.allow_truncated_timestamps = allow_truncated_timestamps
 
         cdef WriterProperties.Builder properties_builder
         self._set_version(&properties_builder)
@@ -845,6 +848,7 @@ cdef class ParquetWriter:
         cdef ArrowWriterProperties.Builder arrow_properties_builder
         self._set_int96_support(&arrow_properties_builder)
         self._set_coerce_timestamps(&arrow_properties_builder)
+        self._set_allow_truncated_timestamps(&arrow_properties_builder)
         arrow_properties = arrow_properties_builder.build()
 
         pool = maybe_unbox_memory_pool(memory_pool)
@@ -870,6 +874,13 @@ cdef class ParquetWriter:
             raise ValueError('Invalid value for coerce_timestamps: {0}'
                              .format(self.coerce_timestamps))
 
+    cdef void _set_allow_truncated_timestamps(
+            self, ArrowWriterProperties.Builder* props):
+        if self.allow_truncated_timestamps:
+            props.allow_truncated_timestamps()
+        else:
+            props.disallow_truncated_timestamps()
+
     cdef void _set_version(self, WriterProperties.Builder* props):
         if self.version is not None:
             if self.version == "1.0":
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 9fa97b4..5fa9132 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -246,6 +246,10 @@ use_deprecated_int96_timestamps : boolean, default None
 coerce_timestamps : string, default None
     Cast timestamps a particular resolution.
     Valid values: {None, 'ms', 'us'}
+allow_truncated_timestamps : boolean, default False
+    Allow loss of data when coercing timestamps to a particular
+    resolution. E.g. if microsecond or nanosecond data is lost when coercing to
+    'ms', do not raise an exception
 compression : str or dict
     Specify the compression codec, either on a general basis or per-column.
     Valid values: {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD'}
@@ -1049,6 +1053,7 @@ def write_table(table, where, row_group_size=None, 
version='1.0',
                 use_dictionary=True, compression='snappy',
                 use_deprecated_int96_timestamps=None,
                 coerce_timestamps=None,
+                allow_truncated_timestamps=False,
                 flavor=None, **kwargs):
     row_group_size = kwargs.pop('chunk_size', row_group_size)
     use_int96 = use_deprecated_int96_timestamps
@@ -1059,6 +1064,7 @@ def write_table(table, where, row_group_size=None, 
version='1.0',
                 flavor=flavor,
                 use_dictionary=use_dictionary,
                 coerce_timestamps=coerce_timestamps,
+                allow_truncated_timestamps=allow_truncated_timestamps,
                 compression=compression,
                 use_deprecated_int96_timestamps=use_int96,
                 **kwargs) as writer:
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 4e50e64..4f54366 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -767,6 +767,35 @@ def test_coerce_timestamps(tempdir):
                      coerce_timestamps='unknown')
 
 
+def test_coerce_timestamps_truncated(tempdir):
+    """
+    ARROW-2555: Test that we can truncate timestamps when coercing if
+    explicitly allowed.
+    """
+    dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
+                              second=1, microsecond=1)
+    dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
+                              second=1)
+
+    fields_us = [pa.field('datetime64', pa.timestamp('us'))]
+    arrays_us = {'datetime64': [dt_us, dt_ms]}
+
+    df_us = pd.DataFrame(arrays_us)
+    schema_us = pa.schema(fields_us)
+
+    filename = tempdir / 'pandas_truncated.parquet'
+    table_us = pa.Table.from_pandas(df_us, schema=schema_us)
+
+    _write_table(table_us, filename, version="2.0", coerce_timestamps='ms',
+                 allow_truncated_timestamps=True)
+    table_ms = _read_table(filename)
+    df_ms = table_ms.to_pandas()
+
+    arrays_expected = {'datetime64': [dt_ms, dt_ms]}
+    df_expected = pd.DataFrame(arrays_expected)
+    tm.assert_frame_equal(df_expected, df_ms)
+
+
 def test_column_of_lists(tempdir):
     df, schema = dataframe_with_lists()
 

Reply via email to