pitrou commented on code in PR #49880:
URL: https://github.com/apache/arrow/pull/49880#discussion_r3235110136


##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,6 +346,48 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* 
sink) const {
   PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
 }
 
+void BlockSplitBloomFilter::WriteEncrypted(ArrowOutputStream* sink, Encryptor* 
encryptor,
+                                           int16_t row_group_ordinal,
+                                           int16_t column_ordinal) const {
+  DCHECK(sink != nullptr);
+  if (encryptor == nullptr) {
+    throw ParquetException("Bloom filter encryptor must be provided");
+  }
+
+  format::BloomFilterHeader header;
+  if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
+    throw ParquetException("BloomFilter does not support Algorithm other than 
BLOCK");
+  }
+  header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
+  if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
+    throw ParquetException("BloomFilter does not support Hash other than 
XXHASH");
+  }
+  header.hash.__set_XXHASH(format::XxHash());
+  if (ARROW_PREDICT_FALSE(compression_strategy_ != 
CompressionStrategy::UNCOMPRESSED)) {
+    throw ParquetException(
+        "BloomFilter does not support Compression other than UNCOMPRESSED");
+  }
+  header.compression.__set_UNCOMPRESSED(format::Uncompressed());
+  header.__set_numBytes(num_bytes_);
+
+  // Bloom filter header and bitset are separate encrypted modules with 
different AADs.
+  encryptor->UpdateAad(
+      encryption::CreateModuleAad(encryptor->file_aad(), 
encryption::kBloomFilterHeader,
+                                  row_group_ordinal, column_ordinal, -1));
+  ThriftSerializer serializer;
+  serializer.Serialize(&header, sink, encryptor);

Review Comment:
   Most of this is identical to `BlockSplitBloomFilter::WriteTo`, can you 
factor things out into a common subroutine? 
   
   Perhaps something like:
   ```c++
     struct EncryptionParams {
       Encryptor* encryptor;
       int16_t row_group_ordinal;
       int16_t column_ordinal;
     };
     void WriteInternal(ArrowOutputStream*, std::optional<EncryptionParams>) 
const;
   ```
   



##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -225,14 +230,47 @@ IndexLocations 
BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
   }
   finished_ = true;
 
+  // Bloom filter ordinals are encoded as int16 in the AAD when encryption is 
enabled.
+  constexpr size_t kEncryptedOrdinalLimit = 
std::numeric_limits<int16_t>::max();  // 32767
+
   IndexLocations locations;
 
   for (size_t i = 0; i != bloom_filters_.size(); ++i) {
     auto& row_group_bloom_filters = bloom_filters_[i];
     for (const auto& [column_id, filter] : row_group_bloom_filters) {
       // TODO(GH-43138): Determine the quality of bloom filter before writing 
it.
       PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
-      filter->WriteTo(sink);
+
+      const auto column_path = 
schema_->Column(column_id)->path()->ToDotString();
+      std::shared_ptr<Encryptor> meta_encryptor =
+          file_encryptor_ != nullptr
+              ? file_encryptor_->GetColumnMetaEncryptor(column_path)
+              : nullptr;
+      if (meta_encryptor != nullptr) {
+        const auto& column_props = 
properties_->column_encryption_properties(column_path);
+        if (column_props != nullptr && column_props->is_encrypted() &&
+            !column_props->is_encrypted_with_footer_key()) {
+          ParquetException::NYI("Bloom filter writing with a dedicated column 
key");

Review Comment:
   I don't understand why this isn't implemented. You're using the column 
encryptor already, no?
   
   The Parquet encryption spec [says 
this](https://github.com/apache/parquet-format/blob/master/Encryption.md#3-technical-approach):
   > For encrypted columns, the following modules are always encrypted, with 
the same column key: pages and page headers (both dictionary and data), column 
indexes, offset indexes, bloom filter headers and bitsets.
   



##########
cpp/src/parquet/encryption/bloom_filter_encryption_test.cc:
##########
@@ -91,4 +94,104 @@ TEST(EncryptedBloomFilterReader, ReadEncryptedBloomFilter) {
   }
 }
 
+namespace {
+
+std::shared_ptr<schema::GroupNode> SingleInt64Schema(const std::string& 
field_name) {
+  auto field = schema::PrimitiveNode::Make(field_name, Repetition::REQUIRED, 
Type::INT64,
+                                           ConvertedType::NONE);
+  return std::static_pointer_cast<schema::GroupNode>(
+      schema::GroupNode::Make("schema", Repetition::REQUIRED, {field}));
+}
+
+std::shared_ptr<FileEncryptionProperties> BuildEncryptionProperties(
+    const std::string& /*field_name*/) {
+  FileEncryptionProperties::Builder builder(kFooterEncryptionKey);
+  return builder.build();
+}
+
+std::shared_ptr<FileDecryptionProperties> 
BuildDecryptionPropertiesWithExplicitKeys(
+    const std::string& /*field_name*/) {
+  FileDecryptionProperties::Builder builder;
+  return builder.footer_key(kFooterEncryptionKey)->build();
+}
+
+}  // namespace
+
+// Round trip, write a small encrypted file with a Bloom filter on the 
encrypted
+// column, then read it back and verify the Bloom filter contains the inserted
+// values and rejects values that were never inserted.
+TEST(EncryptedBloomFilterWriter, RoundTripEncryptedBloomFilter) {
+  const std::string field_name = "id";
+  constexpr int kNumValues = 64;
+
+  auto schema = SingleInt64Schema(field_name);
+
+  WriterProperties::Builder prop_builder;
+  prop_builder.compression(Compression::UNCOMPRESSED);
+  prop_builder.enable_bloom_filter(field_name, {});
+  prop_builder.encryption(BuildEncryptionProperties(field_name));
+  auto writer_properties = prop_builder.build();
+
+  PARQUET_ASSIGN_OR_THROW(auto sink, 
::arrow::io::BufferOutputStream::Create());
+  auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties);
+  auto* row_group_writer = file_writer->AppendRowGroup();

Review Comment:
   Ideally we would write at least two row groups to check that the ordinal is 
passed properly.



##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -225,14 +230,47 @@ IndexLocations 
BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
   }
   finished_ = true;
 
+  // Bloom filter ordinals are encoded as int16 in the AAD when encryption is 
enabled.
+  constexpr size_t kEncryptedOrdinalLimit = 
std::numeric_limits<int16_t>::max();  // 32767
+
   IndexLocations locations;
 
   for (size_t i = 0; i != bloom_filters_.size(); ++i) {
     auto& row_group_bloom_filters = bloom_filters_[i];
     for (const auto& [column_id, filter] : row_group_bloom_filters) {
       // TODO(GH-43138): Determine the quality of bloom filter before writing 
it.
       PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
-      filter->WriteTo(sink);
+
+      const auto column_path = 
schema_->Column(column_id)->path()->ToDotString();
+      std::shared_ptr<Encryptor> meta_encryptor =
+          file_encryptor_ != nullptr
+              ? file_encryptor_->GetColumnMetaEncryptor(column_path)
+              : nullptr;
+      if (meta_encryptor != nullptr) {
+        const auto& column_props = 
properties_->column_encryption_properties(column_path);
+        if (column_props != nullptr && column_props->is_encrypted() &&
+            !column_props->is_encrypted_with_footer_key()) {
+          ParquetException::NYI("Bloom filter writing with a dedicated column 
key");
+        }
+        if (ARROW_PREDICT_FALSE(i > kEncryptedOrdinalLimit)) {
+          throw ParquetException(
+              "Encrypted files cannot contain more than 32767 row groups");
+        }
+        if (ARROW_PREDICT_FALSE(static_cast<size_t>(column_id) >
+                                kEncryptedOrdinalLimit)) {
+          throw ParquetException(
+              "Encrypted files cannot contain more than 32767 columns");
+        }
+        auto* block_filter = 
dynamic_cast<BlockSplitBloomFilter*>(filter.get());

Review Comment:
   The `dynamic_cast` should not be required, just call `WriteEncrypted` on the 
base `BloomFilter` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to