pitrou commented on code in PR #49880:
URL: https://github.com/apache/arrow/pull/49880#discussion_r3235110136
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,6 +346,48 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream*
sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}
+void BlockSplitBloomFilter::WriteEncrypted(ArrowOutputStream* sink, Encryptor*
encryptor,
+ int16_t row_group_ordinal,
+ int16_t column_ordinal) const {
+ DCHECK(sink != nullptr);
+ if (encryptor == nullptr) {
+ throw ParquetException("Bloom filter encryptor must be provided");
+ }
+
+ format::BloomFilterHeader header;
+ if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
+ throw ParquetException("BloomFilter does not support Algorithm other than
BLOCK");
+ }
+ header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
+ if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
+ throw ParquetException("BloomFilter does not support Hash other than
XXHASH");
+ }
+ header.hash.__set_XXHASH(format::XxHash());
+ if (ARROW_PREDICT_FALSE(compression_strategy_ !=
CompressionStrategy::UNCOMPRESSED)) {
+ throw ParquetException(
+ "BloomFilter does not support Compression other than UNCOMPRESSED");
+ }
+ header.compression.__set_UNCOMPRESSED(format::Uncompressed());
+ header.__set_numBytes(num_bytes_);
+
+ // Bloom filter header and bitset are separate encrypted modules with
different AADs.
+ encryptor->UpdateAad(
+ encryption::CreateModuleAad(encryptor->file_aad(),
encryption::kBloomFilterHeader,
+ row_group_ordinal, column_ordinal, -1));
+ ThriftSerializer serializer;
+ serializer.Serialize(&header, sink, encryptor);
Review Comment:
Most of this is identical to `BlockSplitBloomFilter::WriteTo`, can you
factor things out into a common subroutine?
Perhaps something like:
```c++
struct EncryptionParams {
Encryptor* encryptor;
int16_t row_group_ordinal;
int16_t column_ordinal;
};
void WriteInternal(ArrowOutputStream*, std::optional<EncryptionParams>)
const;
```
##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -225,14 +230,47 @@ IndexLocations
BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
}
finished_ = true;
+ // Bloom filter ordinals are encoded as int16 in the AAD when encryption is
enabled.
+ constexpr size_t kEncryptedOrdinalLimit =
std::numeric_limits<int16_t>::max(); // 32767
+
IndexLocations locations;
for (size_t i = 0; i != bloom_filters_.size(); ++i) {
auto& row_group_bloom_filters = bloom_filters_[i];
for (const auto& [column_id, filter] : row_group_bloom_filters) {
// TODO(GH-43138): Determine the quality of bloom filter before writing
it.
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
- filter->WriteTo(sink);
+
+ const auto column_path =
schema_->Column(column_id)->path()->ToDotString();
+ std::shared_ptr<Encryptor> meta_encryptor =
+ file_encryptor_ != nullptr
+ ? file_encryptor_->GetColumnMetaEncryptor(column_path)
+ : nullptr;
+ if (meta_encryptor != nullptr) {
+ const auto& column_props =
properties_->column_encryption_properties(column_path);
+ if (column_props != nullptr && column_props->is_encrypted() &&
+ !column_props->is_encrypted_with_footer_key()) {
+ ParquetException::NYI("Bloom filter writing with a dedicated column
key");
Review Comment:
I don't understand why this isn't implemented. You're using the column
encryptor already, no?
The Parquet encryption spec [says
this](https://github.com/apache/parquet-format/blob/master/Encryption.md#3-technical-approach):
> For encrypted columns, the following modules are always encrypted, with
the same column key: pages and page headers (both dictionary and data), column
indexes, offset indexes, bloom filter headers and bitsets.
##########
cpp/src/parquet/encryption/bloom_filter_encryption_test.cc:
##########
@@ -91,4 +94,104 @@ TEST(EncryptedBloomFilterReader, ReadEncryptedBloomFilter) {
}
}
+namespace {
+
+std::shared_ptr<schema::GroupNode> SingleInt64Schema(const std::string&
field_name) {
+ auto field = schema::PrimitiveNode::Make(field_name, Repetition::REQUIRED,
Type::INT64,
+ ConvertedType::NONE);
+ return std::static_pointer_cast<schema::GroupNode>(
+ schema::GroupNode::Make("schema", Repetition::REQUIRED, {field}));
+}
+
+std::shared_ptr<FileEncryptionProperties> BuildEncryptionProperties(
+ const std::string& /*field_name*/) {
+ FileEncryptionProperties::Builder builder(kFooterEncryptionKey);
+ return builder.build();
+}
+
+std::shared_ptr<FileDecryptionProperties>
BuildDecryptionPropertiesWithExplicitKeys(
+ const std::string& /*field_name*/) {
+ FileDecryptionProperties::Builder builder;
+ return builder.footer_key(kFooterEncryptionKey)->build();
+}
+
+} // namespace
+
+// Round trip, write a small encrypted file with a Bloom filter on the
encrypted
+// column, then read it back and verify the Bloom filter contains the inserted
+// values and rejects values that were never inserted.
+TEST(EncryptedBloomFilterWriter, RoundTripEncryptedBloomFilter) {
+ const std::string field_name = "id";
+ constexpr int kNumValues = 64;
+
+ auto schema = SingleInt64Schema(field_name);
+
+ WriterProperties::Builder prop_builder;
+ prop_builder.compression(Compression::UNCOMPRESSED);
+ prop_builder.enable_bloom_filter(field_name, {});
+ prop_builder.encryption(BuildEncryptionProperties(field_name));
+ auto writer_properties = prop_builder.build();
+
+ PARQUET_ASSIGN_OR_THROW(auto sink,
::arrow::io::BufferOutputStream::Create());
+ auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties);
+ auto* row_group_writer = file_writer->AppendRowGroup();
Review Comment:
Ideally we would write at least two row groups to check that the ordinal is
passed properly.
##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -225,14 +230,47 @@ IndexLocations
BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
}
finished_ = true;
+ // Bloom filter ordinals are encoded as int16 in the AAD when encryption is
enabled.
+ constexpr size_t kEncryptedOrdinalLimit =
std::numeric_limits<int16_t>::max(); // 32767
+
IndexLocations locations;
for (size_t i = 0; i != bloom_filters_.size(); ++i) {
auto& row_group_bloom_filters = bloom_filters_[i];
for (const auto& [column_id, filter] : row_group_bloom_filters) {
// TODO(GH-43138): Determine the quality of bloom filter before writing
it.
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
- filter->WriteTo(sink);
+
+ const auto column_path =
schema_->Column(column_id)->path()->ToDotString();
+ std::shared_ptr<Encryptor> meta_encryptor =
+ file_encryptor_ != nullptr
+ ? file_encryptor_->GetColumnMetaEncryptor(column_path)
+ : nullptr;
+ if (meta_encryptor != nullptr) {
+ const auto& column_props =
properties_->column_encryption_properties(column_path);
+ if (column_props != nullptr && column_props->is_encrypted() &&
+ !column_props->is_encrypted_with_footer_key()) {
+ ParquetException::NYI("Bloom filter writing with a dedicated column
key");
+ }
+ if (ARROW_PREDICT_FALSE(i > kEncryptedOrdinalLimit)) {
+ throw ParquetException(
+ "Encrypted files cannot contain more than 32767 row groups");
+ }
+ if (ARROW_PREDICT_FALSE(static_cast<size_t>(column_id) >
+ kEncryptedOrdinalLimit)) {
+ throw ParquetException(
+ "Encrypted files cannot contain more than 32767 columns");
+ }
+ auto* block_filter =
dynamic_cast<BlockSplitBloomFilter*>(filter.get());
Review Comment:
The `dynamic_cast` should not be required, just call `WriteEncrypted` on the
base `BloomFilter` class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]