This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f56afb  parquet: improve BOOLEAN writing logic and report error on 
encoding fail (#443)
9f56afb is described below

commit 9f56afb2d2347310184706f7d5e46af583557bea
Author: Gary Pennington <[email protected]>
AuthorDate: Wed Jun 16 17:37:02 2021 +0100

    parquet: improve BOOLEAN writing logic and report error on encoding fail 
(#443)
    
    * improve BOOLEAN writing logic and report error on encoding fail
    
    When writing BOOLEAN data, writing more than 2048 rows of data will
    overflow the hard-coded 256 buffer set for the bit-writer in the
    PlainEncoder. Once this occurs, further attempts to write to the encoder
    fail, becuase capacity is exceeded, but the errors are silently ignored.
    
    This fix improves the error detection and reporting at the point of
    encoding and modifies the logic for bit_writing (BOOLEANS). The
    bit_writer is initially allocated 256 bytes (as at present), then each
    time the capacity is exceeded the capacity is incremented by another
    256 bytes.
    
    This certainly resolves the current problem, but it's not exactly a
    great fix because the capacity of the bit_writer could now grow
    substantially.
    
    Other data types seem to have a more sophisticated mechanism for writing
    data which doesn't involve growing or having a fixed size buffer. It
    would be desirable to make the BOOLEAN type use this same mechanism if
    possible, but that level of change is more intrusive and probably
    requires greater knowledge of the implementation than I possess.
    
    resolves: #349
    
    * only manipulate the bit_writer for BOOLEAN data
    
    Tacky, but I can't think of better way to do this without
    specialization.
    
    * better isolation of changes
    
    Remove the byte tracking from the PlainEncoder and use the existing
    bytes_written() method in BitWriter.
    
    This is neater.
    
    * add test for boolean writer
    
    The test ensures that we can write > 2048 rows to a parquet file and
    that when we read the data back, it finishes without hanging (defined as
    taking < 5 seconds).
    
    If we don't want that extra complexity, we could remove the
    thread/channel stuff and just try to read the file and let the test
    runner terminate hanging tests.
    
    * fix capacity calculation error in bool encoding
    
    The values.len() reports the number of values to be encoded and so must
    be divided by 8 (bits in a bytes) to determine the effect on the byte
    capacity of the bit_writer.
---
 parquet/src/data_type.rs        |   9 +++-
 parquet/src/util/bit_util.rs    |  14 ++++++
 parquet/tests/boolean_writer.rs | 100 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 122 insertions(+), 1 deletion(-)

diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index aa1def3..f97df3c 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -661,8 +661,15 @@ pub(crate) mod private {
             _: &mut W,
             bit_writer: &mut BitWriter,
         ) -> Result<()> {
+            if bit_writer.bytes_written() + values.len() / 8 >= 
bit_writer.capacity() {
+                bit_writer.extend(256);
+            }
             for value in values {
-                bit_writer.put_value(*value as u64, 1);
+                if !bit_writer.put_value(*value as u64, 1) {
+                    return Err(ParquetError::EOF(
+                        "unable to put boolean value".to_string(),
+                    ));
+                }
             }
             Ok(())
         }
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 8dfb631..45cfe2b 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -223,6 +223,20 @@ impl BitWriter {
         }
     }
 
+    /// Extend buffer size
+    #[inline]
+    pub fn extend(&mut self, increment: usize) {
+        self.max_bytes += increment;
+        let extra = vec![0; increment];
+        self.buffer.extend(extra);
+    }
+
+    /// Report buffer size
+    #[inline]
+    pub fn capacity(&mut self) -> usize {
+        self.max_bytes
+    }
+
     /// Consumes and returns the current buffer.
     #[inline]
     pub fn consume(mut self) -> Vec<u8> {
diff --git a/parquet/tests/boolean_writer.rs b/parquet/tests/boolean_writer.rs
new file mode 100644
index 0000000..b9d757e
--- /dev/null
+++ b/parquet/tests/boolean_writer.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use parquet::column::writer::ColumnWriter;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::FileReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use parquet::file::writer::FileWriter;
+use parquet::file::writer::SerializedFileWriter;
+use parquet::schema::parser::parse_message_type;
+use std::fs;
+use std::path::Path;
+use std::sync::{mpsc, Arc};
+use std::thread;
+use std::time::Duration;
+
+#[test]
+fn it_writes_data_without_hanging() {
+    let path = Path::new("it_writes_data_without_hanging.parquet");
+
+    let message_type = "
+  message BooleanType {
+    REQUIRED BOOLEAN DIM0;
+  }
+";
+    let schema = Arc::new(parse_message_type(message_type).expect("parse 
schema"));
+    let props = Arc::new(WriterProperties::builder().build());
+    let file = fs::File::create(&path).expect("create file");
+    let mut writer =
+        SerializedFileWriter::new(file, schema, props).expect("create parquet 
writer");
+    for _group in 0..1 {
+        let mut row_group_writer = writer.next_row_group().expect("get row 
group writer");
+        let values: Vec<i64> = vec![0; 2049];
+        let my_bool_values: Vec<bool> = values
+            .iter()
+            .enumerate()
+            .map(|(count, _x)| count % 2 == 0)
+            .collect();
+        while let Some(mut col_writer) =
+            row_group_writer.next_column().expect("next column")
+        {
+            match col_writer {
+                ColumnWriter::BoolColumnWriter(ref mut typed_writer) => {
+                    typed_writer
+                        .write_batch(&my_bool_values, None, None)
+                        .expect("writing bool column");
+                }
+                _ => {
+                    panic!("only test boolean values");
+                }
+            }
+            row_group_writer
+                .close_column(col_writer)
+                .expect("close column");
+        }
+        let rg_md = row_group_writer.close().expect("close row group");
+        println!("total rows written: {}", rg_md.num_rows());
+        writer
+            .close_row_group(row_group_writer)
+            .expect("close row groups");
+    }
+    writer.close().expect("close writer");
+
+    let bytes = fs::read(&path).expect("read file");
+    assert_eq!(&bytes[0..4], &[b'P', b'A', b'R', b'1']);
+
+    // Now that we have written our data and are happy with it, make
+    // sure we can read it back in < 5 seconds...
+    let (sender, receiver) = mpsc::channel();
+    let _t = thread::spawn(move || {
+        let file = 
fs::File::open(&Path::new("it_writes_data_without_hanging.parquet"))
+            .expect("open file");
+        let reader = SerializedFileReader::new(file).expect("get serialized 
reader");
+        let iter = reader.get_row_iter(None).expect("get iterator");
+        for record in iter {
+            println!("reading: {}", record);
+        }
+        println!("finished reading");
+        if let Ok(()) = sender.send(true) {}
+    });
+    assert_ne!(
+        Err(mpsc::RecvTimeoutError::Timeout),
+        receiver.recv_timeout(Duration::from_millis(5000))
+    );
+    fs::remove_file("it_writes_data_without_hanging.parquet").expect("remove 
file");
+}

Reply via email to