This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a376968089 ARROW-13388 [C++][Parquet] Enable DELTA_LENGTH_BYTE_ARRAY
decoder (#13386)
a376968089 is described below
commit a376968089d7310f4a88d054822fa1eaf96c46f5
Author: Muthunagappan Muthuraman <[email protected]>
AuthorDate: Mon Jun 27 20:35:28 2022 -0700
ARROW-13388 [C++][Parquet] Enable DELTA_LENGTH_BYTE_ARRAY decoder (#13386)
Looks like we have DeltaLengthByteArrayDecoder implemented. Enabling it in
this commit to support DELTA_LENGTH_BYTE_ARRAY decoding
Authored-by: Muthunagappan Muthuraman <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/parquet/column_reader.cc | 9 ++++-
cpp/src/parquet/encoding.cc | 5 +++
cpp/src/parquet/reader_test.cc | 84 ++++++++++++++++++++++++++++++++++++++++
cpp/submodules/parquet-testing | 2 +-
4 files changed, 97 insertions(+), 3 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 8982de7032..b8d3b767b0 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -804,8 +804,13 @@ class ColumnReaderImplBase {
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
- case Encoding::DELTA_LENGTH_BYTE_ARRAY:
- ParquetException::NYI("Unsupported encoding");
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
+ auto decoder =
+ MakeTypedDecoder<DType>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_);
+ current_decoder_ = decoder.get();
+ decoders_[static_cast<int>(encoding)] = std::move(decoder);
+ break;
+ }
default:
throw ParquetException("Unknown encoding type.");
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 083117c201..5a0184b186 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2757,6 +2757,11 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type
type_num, Encoding::type encodin
return std::unique_ptr<Decoder>(new DeltaByteArrayDecoder(descr));
}
throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY");
+ } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
+ if (type_num == Type::BYTE_ARRAY) {
+ return std::unique_ptr<Decoder>(new DeltaLengthByteArrayDecoder(descr));
+ }
+ throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
} else {
ParquetException::NYI("Selected encoding is not supported");
}
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 4b2db178f3..7776d995c0 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -127,6 +127,90 @@ void CheckRowGroupMetadata(const RowGroupMetaData*
rg_metadata,
}
}
+class TestTextDeltaLengthByteArray : public ::testing::Test {
+ public:
+ void SetUp() {
+ reader_ =
ParquetFileReader::OpenFile(data_file("delta_length_byte_array.parquet"));
+ }
+
+ void TearDown() {}
+
+ protected:
+ std::unique_ptr<ParquetFileReader> reader_;
+};
+
+TEST_F(TestTextDeltaLengthByteArray, TestTextScanner) {
+ auto group = reader_->RowGroup(0);
+
+ // column 0, id
+ auto scanner = std::make_shared<ByteArrayScanner>(group->Column(0));
+ ByteArray val;
+ bool is_null;
+ std::string expected_prefix("apple_banana_mango");
+ for (int i = 0; i < 1000; ++i) {
+ ASSERT_TRUE(scanner->HasNext());
+ ASSERT_TRUE(scanner->NextValue(&val, &is_null));
+ ASSERT_FALSE(is_null);
+ std::string expected = expected_prefix + std::to_string(i * i);
+ ASSERT_TRUE(val.len == expected.length());
+ ASSERT_EQ(::arrow::util::string_view(reinterpret_cast<const
char*>(val.ptr), val.len),
+ expected);
+ }
+ ASSERT_FALSE(scanner->HasNext());
+ ASSERT_FALSE(scanner->NextValue(&val, &is_null));
+}
+
+TEST_F(TestTextDeltaLengthByteArray, TestBatchRead) {
+ auto group = reader_->RowGroup(0);
+
+ // column 0, id
+ auto col = std::dynamic_pointer_cast<ByteArrayReader>(group->Column(0));
+
+ // This file only has 1000 rows
+ ASSERT_EQ(1000, reader_->metadata()->num_rows());
+ // This file only has 1 row group
+ ASSERT_EQ(1, reader_->metadata()->num_row_groups());
+ // Size of the metadata is 105 bytes
+ ASSERT_EQ(105, reader_->metadata()->size());
+ // This row group must have 1000 rows
+ ASSERT_EQ(1000, group->metadata()->num_rows());
+
+ // Check if the column is encoded with DELTA_LENGTH_BYTE_ARRAY
+ auto col_chunk = group->metadata()->ColumnChunk(0);
+
+ ASSERT_TRUE(std::find(col_chunk->encodings().begin(),
col_chunk->encodings().end(),
+ Encoding::DELTA_LENGTH_BYTE_ARRAY) !=
+ col_chunk->encodings().end());
+
+ ASSERT_TRUE(col->HasNext());
+ int64_t values_read = 0;
+ int64_t curr_batch_read;
+ std::string expected_prefix("apple_banana_mango");
+ while (values_read < 1000) {
+ const int16_t batch_size = 25;
+ int16_t def_levels[batch_size];
+ int16_t rep_levels[batch_size];
+ ByteArray values[batch_size];
+
+ auto levels_read =
+ col->ReadBatch(batch_size, def_levels, rep_levels, values,
&curr_batch_read);
+ ASSERT_EQ(batch_size, levels_read);
+ ASSERT_EQ(batch_size, curr_batch_read);
+ for (int16_t i = 0; i < batch_size; i++) {
+ auto expected =
+ expected_prefix + std::to_string((i + values_read) * (i +
values_read));
+ ASSERT_TRUE(values[i].len == expected.length());
+ ASSERT_EQ(::arrow::util::string_view(reinterpret_cast<const
char*>(values[i].ptr),
+ values[i].len),
+ expected);
+ }
+ values_read += curr_batch_read;
+ }
+
+ // Now read past the end of the file
+ ASSERT_FALSE(col->HasNext());
+}
+
class TestAllTypesPlain : public ::testing::Test {
public:
void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); }
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index acd375eb86..b76cde43ba 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit acd375eb86a81cd856476fca0f52ba6036a067ff
+Subproject commit b76cde43bad62ebf531ae3736d7a59cf645d3a6f