This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 853d8491ad GH-34950: [C++][Parquet] Support encryption for page index 
(#36574)
853d8491ad is described below

commit 853d8491addff3a10fc40950823a2942bb9fbf98
Author: Gang Wu <[email protected]>
AuthorDate: Fri Sep 29 00:07:16 2023 +0800

    GH-34950: [C++][Parquet] Support encryption for page index (#36574)
    
    ### Rationale for this change
    
    Parquet modular encryption requires page index to be encrypted if the 
column chunk is encrypted. This feature is missing for now.
    
    ### What changes are included in this PR?
    
    Support both encryption and decryption for parquet page index.
    
    ### Are these changes tested?
    
    Added round trip tests in write_configurations_test.cc and 
read_configurations_test.cc.
    
    ### Are there any user-facing changes?
    
    NO.
    * Closes: #34950
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/column_reader.cc                   |   8 +-
 cpp/src/parquet/column_writer.cc                   |   4 +-
 cpp/src/parquet/encryption/encryption_internal.h   |   2 +
 .../parquet/encryption/internal_file_decryptor.cc  |  55 +++++++
 .../parquet/encryption/internal_file_decryptor.h   |  13 ++
 .../parquet/encryption/read_configurations_test.cc |  58 +++++--
 cpp/src/parquet/encryption/test_encryption_util.cc | 178 ++++++++++++++++++++-
 cpp/src/parquet/encryption/test_encryption_util.h  |  16 +-
 cpp/src/parquet/encryption/type_fwd.h              |  28 ++++
 .../encryption/write_configurations_test.cc        |   2 +-
 cpp/src/parquet/file_reader.cc                     |  34 +---
 cpp/src/parquet/file_writer.cc                     |   6 +-
 cpp/src/parquet/metadata.cc                        |   7 +-
 cpp/src/parquet/metadata.h                         |   8 +-
 cpp/src/parquet/page_index.cc                      | 105 ++++++++----
 cpp/src/parquet/page_index.h                       |  26 +--
 cpp/src/parquet/thrift_internal.h                  |   7 +-
 cpp/src/parquet/type_fwd.h                         |   3 +
 18 files changed, 439 insertions(+), 121 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 6fe1ce9da6..fa013dd2ea 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -363,10 +363,8 @@ void SerializedPageReader::UpdateDecryption(const 
std::shared_ptr<Decryptor>& de
                                             int8_t module_type, std::string* 
page_aad) {
   ARROW_DCHECK(decryptor != nullptr);
   if (crypto_ctx_.start_decrypt_with_dictionary_page) {
-    std::string aad = encryption::CreateModuleAad(
-        decryptor->file_aad(), module_type, crypto_ctx_.row_group_ordinal,
-        crypto_ctx_.column_ordinal, kNonPageOrdinal);
-    decryptor->UpdateAad(aad);
+    UpdateDecryptor(decryptor, crypto_ctx_.row_group_ordinal, 
crypto_ctx_.column_ordinal,
+                    module_type);
   } else {
     encryption::QuickUpdatePageAad(page_ordinal_, page_aad);
     decryptor->UpdateAad(*page_aad);
@@ -449,7 +447,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
         current_page_header_ = format::PageHeader();
         deserializer.DeserializeMessage(reinterpret_cast<const 
uint8_t*>(view.data()),
                                         &header_size, &current_page_header_,
-                                        crypto_ctx_.meta_decryptor);
+                                        crypto_ctx_.meta_decryptor.get());
         break;
       } catch (std::exception& e) {
         // Failed to deserialize. Double the allowed page header size and try 
again
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index ae9216ba7c..a0aedeee9e 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -330,7 +330,7 @@ class SerializedPageWriter : public PageWriter {
       UpdateEncryption(encryption::kDictionaryPageHeader);
     }
     const int64_t header_size =
-        thrift_serializer_->Serialize(&page_header, sink_.get(), 
meta_encryptor_);
+        thrift_serializer_->Serialize(&page_header, sink_.get(), 
meta_encryptor_.get());
 
     PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
 
@@ -422,7 +422,7 @@ class SerializedPageWriter : public PageWriter {
       UpdateEncryption(encryption::kDataPageHeader);
     }
     const int64_t header_size =
-        thrift_serializer_->Serialize(&page_header, sink_.get(), 
meta_encryptor_);
+        thrift_serializer_->Serialize(&page_header, sink_.get(), 
meta_encryptor_.get());
     PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
 
     /// Collect page index
diff --git a/cpp/src/parquet/encryption/encryption_internal.h 
b/cpp/src/parquet/encryption/encryption_internal.h
index 4ed5b5cf61..77921d8731 100644
--- a/cpp/src/parquet/encryption/encryption_internal.h
+++ b/cpp/src/parquet/encryption/encryption_internal.h
@@ -40,6 +40,8 @@ constexpr int8_t kDataPageHeader = 4;
 constexpr int8_t kDictionaryPageHeader = 5;
 constexpr int8_t kColumnIndex = 6;
 constexpr int8_t kOffsetIndex = 7;
+constexpr int8_t kBloomFilterHeader = 8;
+constexpr int8_t kBloomFilterBitset = 9;
 
 /// Performs AES encryption operations with GCM or CTR ciphers.
 class AesEncryptor {
diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.cc 
b/cpp/src/parquet/encryption/internal_file_decryptor.cc
index 87bfc2bd12..19e4845c87 100644
--- a/cpp/src/parquet/encryption/internal_file_decryptor.cc
+++ b/cpp/src/parquet/encryption/internal_file_decryptor.cc
@@ -16,8 +16,10 @@
 // under the License.
 
 #include "parquet/encryption/internal_file_decryptor.h"
+#include "arrow/util/logging.h"
 #include "parquet/encryption/encryption.h"
 #include "parquet/encryption/encryption_internal.h"
+#include "parquet/metadata.h"
 
 namespace parquet {
 
@@ -215,4 +217,57 @@ std::shared_ptr<Decryptor> 
InternalFileDecryptor::GetColumnDecryptor(
   return column_data_map_[column_path];
 }
 
+namespace {
+
+std::shared_ptr<Decryptor> GetColumnDecryptor(
+    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor,
+    const std::function<std::shared_ptr<Decryptor>(
+        InternalFileDecryptor* file_decryptor, const std::string& column_path,
+        const std::string& column_key_metadata, const std::string& aad)>& func,
+    bool metadata) {
+  if (crypto_metadata == nullptr) {
+    return nullptr;
+  }
+
+  if (file_decryptor == nullptr) {
+    throw ParquetException("RowGroup is noted as encrypted but no file 
decryptor");
+  }
+
+  if (crypto_metadata->encrypted_with_footer_key()) {
+    return metadata ? file_decryptor->GetFooterDecryptorForColumnMeta()
+                    : file_decryptor->GetFooterDecryptorForColumnData();
+  }
+
+  // The column is encrypted with its own key
+  const std::string& column_key_metadata = crypto_metadata->key_metadata();
+  const std::string column_path = 
crypto_metadata->path_in_schema()->ToDotString();
+  return func(file_decryptor, column_path, column_key_metadata, /*aad=*/"");
+}
+
+}  // namespace
+
+std::shared_ptr<Decryptor> GetColumnMetaDecryptor(
+    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor) {
+  return GetColumnDecryptor(crypto_metadata, file_decryptor,
+                            &InternalFileDecryptor::GetColumnMetaDecryptor,
+                            /*metadata=*/true);
+}
+
+std::shared_ptr<Decryptor> GetColumnDataDecryptor(
+    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor) {
+  return GetColumnDecryptor(crypto_metadata, file_decryptor,
+                            &InternalFileDecryptor::GetColumnDataDecryptor,
+                            /*metadata=*/false);
+}
+
+void UpdateDecryptor(const std::shared_ptr<Decryptor>& decryptor,
+                     int16_t row_group_ordinal, int16_t column_ordinal,
+                     int8_t module_type) {
+  ARROW_DCHECK(!decryptor->file_aad().empty());
+  const std::string aad =
+      encryption::CreateModuleAad(decryptor->file_aad(), module_type, 
row_group_ordinal,
+                                  column_ordinal, kNonPageOrdinal);
+  decryptor->UpdateAad(aad);
+}
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.h 
b/cpp/src/parquet/encryption/internal_file_decryptor.h
index 2f9c3952af..0b27effda8 100644
--- a/cpp/src/parquet/encryption/internal_file_decryptor.h
+++ b/cpp/src/parquet/encryption/internal_file_decryptor.h
@@ -31,6 +31,7 @@ class AesDecryptor;
 class AesEncryptor;
 }  // namespace encryption
 
+class ColumnCryptoMetaData;
 class FileDecryptionProperties;
 
 class PARQUET_EXPORT Decryptor {
@@ -110,4 +111,16 @@ class InternalFileDecryptor {
                                                 bool metadata = false);
 };
 
+/// Utility to get column meta decryptor of an encrypted column.
+std::shared_ptr<Decryptor> GetColumnMetaDecryptor(
+    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor);
+
+/// Utility to get column data decryptor of an encrypted column.
+std::shared_ptr<Decryptor> GetColumnDataDecryptor(
+    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor);
+
+void UpdateDecryptor(const std::shared_ptr<Decryptor>& decryptor,
+                     int16_t row_group_ordinal, int16_t column_ordinal,
+                     int8_t module_type);
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/encryption/read_configurations_test.cc 
b/cpp/src/parquet/encryption/read_configurations_test.cc
index 10de7198ac..695696db29 100644
--- a/cpp/src/parquet/encryption/read_configurations_test.cc
+++ b/cpp/src/parquet/encryption/read_configurations_test.cc
@@ -36,7 +36,7 @@
  * The unit-test is called multiple times, each time to decrypt parquet files 
using
  * different decryption configuration as described below.
  * In each call two encrypted files are read: one temporary file that was 
generated using
- * encryption-write-configurations-test.cc test and will be deleted upon
+ * write_configurations_test.cc test and will be deleted upon
  * reading it, while the second resides in
  * parquet-testing/data repository. Those two encrypted files were encrypted 
using the
  * same encryption configuration.
@@ -59,8 +59,8 @@
  *                                  read the footer + all non-encrypted 
columns.
  *                                  (pairs with encryption configuration 3)
  *
- * The encrypted parquet files that is read was encrypted using one of the 
configurations
- * below:
+ * The encrypted parquet files that are read were encrypted using one of the
+ * configurations below:
  *
  *  - Encryption configuration 1:   Encrypt all columns and the footer with 
the same key.
  *                                  (uniform encryption)
@@ -166,7 +166,11 @@ class TestDecryptionConfiguration
     vector_of_decryption_configurations_.push_back(NULL);
   }
 
-  void DecryptFile(std::string file, int decryption_config_num) {
+  void DecryptFileInternal(
+      const std::string& file, int decryption_config_num,
+      std::function<void(const std::string& file,
+                         const std::shared_ptr<FileDecryptionProperties>&)>
+          decrypt_func) {
     std::string exception_msg;
     std::shared_ptr<FileDecryptionProperties> file_decryption_properties;
     // if we get decryption_config_num = x then it means the actual number is 
x+1
@@ -176,18 +180,40 @@ class TestDecryptionConfiguration
           
vector_of_decryption_configurations_[decryption_config_num]->DeepClone();
     }
 
-    decryptor_.DecryptFile(file, file_decryption_properties);
+    decrypt_func(std::move(file), std::move(file_decryption_properties));
+  }
+
+  void DecryptFile(const std::string& file, int decryption_config_num) {
+    DecryptFileInternal(
+        file, decryption_config_num,
+        [&](const std::string& file,
+            const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
+          decryptor_.DecryptFile(file, file_decryption_properties);
+        });
+  }
+
+  void DecryptPageIndex(const std::string& file, int decryption_config_num) {
+    DecryptFileInternal(
+        file, decryption_config_num,
+        [&](const std::string& file,
+            const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
+          decryptor_.DecryptPageIndex(file, file_decryption_properties);
+        });
   }
 
   // Check that the decryption result is as expected.
-  void CheckResults(const std::string file_name, unsigned 
decryption_config_num,
-                    unsigned encryption_config_num) {
+  void CheckResults(const std::string& file_name, unsigned 
decryption_config_num,
+                    unsigned encryption_config_num, bool file_has_page_index) {
     // Encryption_configuration number five contains aad_prefix and
     // disable_aad_prefix_storage.
     // An exception is expected to be thrown if the file is not decrypted with 
aad_prefix.
     if (encryption_config_num == 5) {
       if (decryption_config_num == 1 || decryption_config_num == 3) {
         EXPECT_THROW(DecryptFile(file_name, decryption_config_num - 1), 
ParquetException);
+        if (file_has_page_index) {
+          EXPECT_THROW(DecryptPageIndex(file_name, decryption_config_num - 1),
+                       ParquetException);
+        }
         return;
       }
     }
@@ -196,6 +222,10 @@ class TestDecryptionConfiguration
     if (decryption_config_num == 2) {
       if (encryption_config_num != 5 && encryption_config_num != 4) {
         EXPECT_THROW(DecryptFile(file_name, decryption_config_num - 1), 
ParquetException);
+        if (file_has_page_index) {
+          EXPECT_THROW(DecryptPageIndex(file_name, decryption_config_num - 1),
+                       ParquetException);
+        }
         return;
       }
     }
@@ -205,6 +235,9 @@ class TestDecryptionConfiguration
       return;
     }
     EXPECT_NO_THROW(DecryptFile(file_name, decryption_config_num - 1));
+    if (file_has_page_index) {
+      EXPECT_NO_THROW(DecryptPageIndex(file_name, decryption_config_num - 1));
+    }
   }
 
   // Returns true if file exists. Otherwise returns false.
@@ -217,14 +250,13 @@ class TestDecryptionConfiguration
 // Read encrypted parquet file.
 // The test reads two parquet files that were encrypted using the same 
encryption
 // configuration:
-// one was generated in encryption-write-configurations-test.cc tests and is 
deleted
+// one was generated in write_configurations_test.cc tests and is deleted
 // once the file is read and the second exists in parquet-testing/data folder.
 // The name of the files are passed as parameters to the unit-test.
 TEST_P(TestDecryptionConfiguration, TestDecryption) {
   int encryption_config_num = std::get<0>(GetParam());
   const char* param_file_name = std::get<1>(GetParam());
-  // Decrypt parquet file that was generated in 
encryption-write-configurations-test.cc
-  // test.
+  // Decrypt parquet file that was generated in write_configurations_test.cc 
test.
   std::string tmp_file_name = "tmp_" + std::string(param_file_name);
   std::string file_name = temp_dir->path().ToString() + tmp_file_name;
   if (!fexists(file_name)) {
@@ -237,7 +269,8 @@ TEST_P(TestDecryptionConfiguration, TestDecryption) {
   // parquet file.
   for (unsigned index = 0; index < 
vector_of_decryption_configurations_.size(); ++index) {
     unsigned decryption_config_num = index + 1;
-    CheckResults(file_name, decryption_config_num, encryption_config_num);
+    CheckResults(file_name, decryption_config_num, encryption_config_num,
+                 /*file_has_page_index=*/true);
   }
   // Delete temporary test file.
   ASSERT_EQ(std::remove(file_name.c_str()), 0);
@@ -255,7 +288,8 @@ TEST_P(TestDecryptionConfiguration, TestDecryption) {
   // parquet file.
   for (unsigned index = 0; index < 
vector_of_decryption_configurations_.size(); ++index) {
     unsigned decryption_config_num = index + 1;
-    CheckResults(file_name, decryption_config_num, encryption_config_num);
+    CheckResults(file_name, decryption_config_num, encryption_config_num,
+                 /*file_has_page_index=*/false);
   }
 }
 
diff --git a/cpp/src/parquet/encryption/test_encryption_util.cc 
b/cpp/src/parquet/encryption/test_encryption_util.cc
index 694ed3cf42..4fa215312f 100644
--- a/cpp/src/parquet/encryption/test_encryption_util.cc
+++ b/cpp/src/parquet/encryption/test_encryption_util.cc
@@ -19,14 +19,17 @@
 // Parquet column chunk within a row group. It could be extended in the future
 // to iterate through all data pages in all chunks in a file.
 
+#include <numeric>
 #include <sstream>
 
-#include <arrow/io/file.h>
-
+#include "arrow/io/file.h"
 #include "arrow/testing/future_util.h"
+#include "arrow/util/unreachable.h"
+
 #include "parquet/encryption/test_encryption_util.h"
 #include "parquet/file_reader.h"
 #include "parquet/file_writer.h"
+#include "parquet/page_index.h"
 #include "parquet/test_util.h"
 
 using ::arrow::io::FileOutputStream;
@@ -206,6 +209,7 @@ void FileEncryptor::EncryptFile(
   WriterProperties::Builder prop_builder;
   prop_builder.compression(parquet::Compression::UNCOMPRESSED);
   prop_builder.encryption(encryption_configurations);
+  prop_builder.enable_write_page_index();
   std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
 
   PARQUET_ASSIGN_OR_THROW(auto out_file, FileOutputStream::Open(file));
@@ -340,8 +344,8 @@ void ReadAndVerifyColumn(RowGroupReader* rg_reader, 
RowGroupMetadata* rg_md,
 }
 
 void FileDecryptor::DecryptFile(
-    std::string file,
-    std::shared_ptr<FileDecryptionProperties> file_decryption_properties) {
+    const std::string& file,
+    const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
   std::string exception_msg;
   parquet::ReaderProperties reader_properties = 
parquet::default_reader_properties();
   if (file_decryption_properties) {
@@ -353,7 +357,7 @@ void FileDecryptor::DecryptFile(
       source, ::arrow::io::ReadableFile::Open(file, 
reader_properties.memory_pool()));
 
   auto file_reader = parquet::ParquetFileReader::Open(source, 
reader_properties);
-  CheckFile(file_reader.get(), file_decryption_properties.get());
+  CheckFile(file_reader.get(), file_decryption_properties);
 
   if (file_decryption_properties) {
     
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
@@ -361,14 +365,15 @@ void FileDecryptor::DecryptFile(
   auto fut = parquet::ParquetFileReader::OpenAsync(source, reader_properties);
   ASSERT_FINISHES_OK(fut);
   ASSERT_OK_AND_ASSIGN(file_reader, fut.MoveResult());
-  CheckFile(file_reader.get(), file_decryption_properties.get());
+  CheckFile(file_reader.get(), file_decryption_properties);
 
   file_reader->Close();
   PARQUET_THROW_NOT_OK(source->Close());
 }
 
-void FileDecryptor::CheckFile(parquet::ParquetFileReader* file_reader,
-                              FileDecryptionProperties* 
file_decryption_properties) {
+void FileDecryptor::CheckFile(
+    parquet::ParquetFileReader* file_reader,
+    const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
   // Get the File MetaData
   std::shared_ptr<parquet::FileMetaData> file_metadata = 
file_reader->metadata();
 
@@ -509,4 +514,161 @@ void FileDecryptor::CheckFile(parquet::ParquetFileReader* 
file_reader,
   }
 }
 
+void FileDecryptor::DecryptPageIndex(
+    const std::string& file,
+    const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
+  std::string exception_msg;
+  parquet::ReaderProperties reader_properties = 
parquet::default_reader_properties();
+  if (file_decryption_properties) {
+    
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
+  }
+
+  std::shared_ptr<::arrow::io::RandomAccessFile> source;
+  PARQUET_ASSIGN_OR_THROW(
+      source, ::arrow::io::ReadableFile::Open(file, 
reader_properties.memory_pool()));
+
+  auto file_reader = parquet::ParquetFileReader::Open(source, 
reader_properties);
+  CheckPageIndex(file_reader.get(), file_decryption_properties);
+
+  ASSERT_NO_FATAL_FAILURE(file_reader->Close());
+  PARQUET_THROW_NOT_OK(source->Close());
+}
+
+template <typename DType, typename c_type = typename DType::c_type>
+void AssertColumnIndex(const std::shared_ptr<ColumnIndex>& column_index,
+                       const std::vector<int64_t>& expected_null_counts,
+                       const std::vector<c_type>& expected_min_values,
+                       const std::vector<c_type>& expected_max_values) {
+  auto typed_column_index =
+      std::dynamic_pointer_cast<TypedColumnIndex<DType>>(column_index);
+  ASSERT_NE(typed_column_index, nullptr);
+  ASSERT_EQ(typed_column_index->null_counts(), expected_null_counts);
+  if constexpr (std::is_same_v<FLBAType, DType>) {
+    ASSERT_EQ(typed_column_index->min_values().size(), 
expected_min_values.size());
+    ASSERT_EQ(typed_column_index->max_values().size(), 
expected_max_values.size());
+    for (size_t i = 0; i < expected_min_values.size(); ++i) {
+      ASSERT_EQ(
+          FixedLenByteArrayToString(typed_column_index->min_values()[i], 
kFixedLength),
+          FixedLenByteArrayToString(expected_min_values[i], kFixedLength));
+    }
+    for (size_t i = 0; i < expected_max_values.size(); ++i) {
+      ASSERT_EQ(
+          FixedLenByteArrayToString(typed_column_index->max_values()[i], 
kFixedLength),
+          FixedLenByteArrayToString(expected_max_values[i], kFixedLength));
+    }
+  } else {
+    ASSERT_EQ(typed_column_index->min_values(), expected_min_values);
+    ASSERT_EQ(typed_column_index->max_values(), expected_max_values);
+  }
+}
+
+void FileDecryptor::CheckPageIndex(
+    parquet::ParquetFileReader* file_reader,
+    const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
+  std::shared_ptr<PageIndexReader> page_index_reader = 
file_reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const std::shared_ptr<parquet::FileMetaData> file_metadata = 
file_reader->metadata();
+  const int num_row_groups = file_metadata->num_row_groups();
+  const int num_columns = file_metadata->num_columns();
+  ASSERT_EQ(num_columns, 8);
+
+  // We cannot read page index of encrypted columns in the plaintext mode
+  std::vector<int32_t> need_row_groups(num_row_groups);
+  std::iota(need_row_groups.begin(), need_row_groups.end(), 0);
+  std::vector<int32_t> need_columns;
+  if (file_decryption_properties == nullptr) {
+    need_columns = {0, 1, 2, 3, 6, 7};
+  } else {
+    need_columns = {0, 1, 2, 3, 4, 5, 6, 7};
+  }
+
+  // Provide hint of requested columns to avoid accessing encrypted columns 
without
+  // decryption properties.
+  page_index_reader->WillNeed(
+      need_row_groups, need_columns,
+      PageIndexSelection{/*column_index=*/true, /*offset_index=*/true});
+
+  // Iterate over all the RowGroups in the file.
+  for (int r = 0; r < num_row_groups; ++r) {
+    auto row_group_page_index_reader = page_index_reader->RowGroup(r);
+    ASSERT_NE(row_group_page_index_reader, nullptr);
+
+    for (int c = 0; c < num_columns; ++c) {
+      // Skip reading encrypted columns without decryption properties.
+      if (file_decryption_properties == nullptr && (c == 4 || c == 5)) {
+        continue;
+      }
+
+      constexpr size_t kExpectedNumPages = 1;
+
+      // Check offset index.
+      auto offset_index = row_group_page_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(offset_index->page_locations().size(), kExpectedNumPages);
+      const auto& first_page = offset_index->page_locations()[0];
+      ASSERT_EQ(first_page.first_row_index, 0);
+      ASSERT_GT(first_page.compressed_page_size, 0);
+
+      // Int96 column does not have column index.
+      if (c == 3) {
+        continue;
+      }
+
+      // Check column index
+      auto column_index = row_group_page_index_reader->GetColumnIndex(c);
+      ASSERT_NE(column_index, nullptr);
+      ASSERT_EQ(column_index->null_pages().size(), kExpectedNumPages);
+      ASSERT_EQ(column_index->null_pages()[0], false);
+      ASSERT_EQ(column_index->encoded_min_values().size(), kExpectedNumPages);
+      ASSERT_EQ(column_index->encoded_max_values().size(), kExpectedNumPages);
+      ASSERT_TRUE(column_index->has_null_counts());
+
+      switch (c) {
+        case 0: {
+          AssertColumnIndex<BooleanType>(column_index, 
/*expected_null_counts=*/{0},
+                                         /*expected_min_values=*/{false},
+                                         /*expected_max_values=*/{true});
+        } break;
+        case 1: {
+          AssertColumnIndex<Int32Type>(column_index, 
/*expected_null_counts=*/{0},
+                                       /*expected_min_values=*/{0},
+                                       /*expected_max_values=*/{49});
+        } break;
+        case 2: {
+          AssertColumnIndex<Int64Type>(column_index, 
/*expected_null_counts=*/{0},
+                                       /*expected_min_values=*/{0},
+                                       
/*expected_max_values=*/{99000000000000});
+        } break;
+        case 4: {
+          AssertColumnIndex<FloatType>(column_index, 
/*expected_null_counts=*/{0},
+                                       /*expected_min_values=*/{0.0F},
+                                       /*expected_max_values=*/{53.9F});
+        } break;
+        case 5: {
+          AssertColumnIndex<DoubleType>(column_index, 
/*expected_null_counts=*/{0},
+                                        /*expected_min_values=*/{0.0},
+                                        /*expected_max_values=*/{54.4444439});
+        } break;
+        case 6: {
+          AssertColumnIndex<ByteArrayType>(
+              column_index, /*expected_null_counts=*/{25},
+              /*expected_min_values=*/{ByteArray("parquet000")},
+              /*expected_max_values=*/{ByteArray("parquet048")});
+        } break;
+        case 7: {
+          const std::vector<uint8_t> kExpectedMinValue(kFixedLength, 0);
+          const std::vector<uint8_t> kExpectedMaxValue(kFixedLength, 49);
+          AssertColumnIndex<FLBAType>(
+              column_index, /*expected_null_counts=*/{0},
+              /*expected_min_values=*/{FLBA(kExpectedMinValue.data())},
+              /*expected_max_values=*/{FLBA(kExpectedMaxValue.data())});
+        } break;
+        default:
+          ::arrow::Unreachable("Unexpected column index " + std::to_string(c));
+      }
+    }
+  }
+}
+
 }  // namespace parquet::encryption::test
diff --git a/cpp/src/parquet/encryption/test_encryption_util.h 
b/cpp/src/parquet/encryption/test_encryption_util.h
index 19c230ee5f..86aa0ff07c 100644
--- a/cpp/src/parquet/encryption/test_encryption_util.h
+++ b/cpp/src/parquet/encryption/test_encryption_util.h
@@ -113,12 +113,20 @@ class FileEncryptor {
 
 class FileDecryptor {
  public:
-  void DecryptFile(std::string file_name,
-                   std::shared_ptr<FileDecryptionProperties> 
file_decryption_properties);
+  void DecryptFile(
+      const std::string& file_name,
+      const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties);
+  void DecryptPageIndex(
+      const std::string& file_name,
+      const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties);
 
  private:
-  void CheckFile(parquet::ParquetFileReader* file_reader,
-                 FileDecryptionProperties* file_decryption_properties);
+  void CheckFile(
+      parquet::ParquetFileReader* file_reader,
+      const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties);
+  void CheckPageIndex(
+      parquet::ParquetFileReader* file_reader,
+      const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties);
 };
 
 }  // namespace encryption::test
diff --git a/cpp/src/parquet/encryption/type_fwd.h 
b/cpp/src/parquet/encryption/type_fwd.h
new file mode 100644
index 0000000000..6238117184
--- /dev/null
+++ b/cpp/src/parquet/encryption/type_fwd.h
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace parquet {
+
+class Decryptor;
+class Encryptor;
+
+class InternalFileDecryptor;
+class InternalFileEncryptor;
+
+}  // namespace parquet
diff --git a/cpp/src/parquet/encryption/write_configurations_test.cc 
b/cpp/src/parquet/encryption/write_configurations_test.cc
index e262003db3..f27da82694 100644
--- a/cpp/src/parquet/encryption/write_configurations_test.cc
+++ b/cpp/src/parquet/encryption/write_configurations_test.cc
@@ -33,7 +33,7 @@
  * This file contains unit-tests for writing encrypted Parquet files with
  * different encryption configurations.
  * The files are saved in temporary folder and will be deleted after reading
- * them in encryption-read-configurations-test.cc test.
+ * them in read_configurations_test.cc test.
  *
  * A detailed description of the Parquet Modular Encryption specification can 
be found
  * here:
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 08d493b0bc..5247b9d4b5 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -227,37 +227,19 @@ class SerializedRowGroup : public 
RowGroupReader::Contents {
                               always_compressed);
     }
 
-    if (file_decryptor_ == nullptr) {
-      throw ParquetException("RowGroup is noted as encrypted but no file 
decryptor");
-    }
+    // The column is encrypted
+    std::shared_ptr<Decryptor> meta_decryptor =
+        GetColumnMetaDecryptor(crypto_metadata.get(), file_decryptor_.get());
+    std::shared_ptr<Decryptor> data_decryptor =
+        GetColumnDataDecryptor(crypto_metadata.get(), file_decryptor_.get());
+    ARROW_DCHECK_NE(meta_decryptor, nullptr);
+    ARROW_DCHECK_NE(data_decryptor, nullptr);
 
     constexpr auto kEncryptedRowGroupsLimit = 32767;
     if (i > kEncryptedRowGroupsLimit) {
       throw ParquetException("Encrypted files cannot contain more than 32767 
row groups");
     }
 
-    // The column is encrypted
-    std::shared_ptr<Decryptor> meta_decryptor;
-    std::shared_ptr<Decryptor> data_decryptor;
-    // The column is encrypted with footer key
-    if (crypto_metadata->encrypted_with_footer_key()) {
-      meta_decryptor = file_decryptor_->GetFooterDecryptorForColumnMeta();
-      data_decryptor = file_decryptor_->GetFooterDecryptorForColumnData();
-      CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
-                        static_cast<int16_t>(i), meta_decryptor, 
data_decryptor);
-      return PageReader::Open(stream, col->num_values(), col->compression(), 
properties_,
-                              always_compressed, &ctx);
-    }
-
-    // The column is encrypted with its own key
-    std::string column_key_metadata = crypto_metadata->key_metadata();
-    const std::string column_path = 
crypto_metadata->path_in_schema()->ToDotString();
-
-    meta_decryptor =
-        file_decryptor_->GetColumnMetaDecryptor(column_path, 
column_key_metadata);
-    data_decryptor =
-        file_decryptor_->GetColumnDataDecryptor(column_path, 
column_key_metadata);
-
     CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
                       static_cast<int16_t>(i), meta_decryptor, data_decryptor);
     return PageReader::Open(stream, col->num_values(), col->compression(), 
properties_,
@@ -330,7 +312,7 @@ class SerializedFile : public ParquetFileReader::Contents {
     }
     if (!page_index_reader_) {
       page_index_reader_ = PageIndexReader::Make(source_.get(), file_metadata_,
-                                                 properties_, file_decryptor_);
+                                                 properties_, 
file_decryptor_.get());
     }
     return page_index_reader_;
   }
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index 2a6a88df2d..9a92d4525d 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -471,10 +471,6 @@ class FileSerializer : public ParquetFileWriter::Contents {
 
   void WritePageIndex() {
     if (page_index_builder_ != nullptr) {
-      if (properties_->file_encryption_properties()) {
-        throw ParquetException("Encryption is not supported with page index");
-      }
-
       // Serialize page index after all row groups have been written and report
       // location to the file metadata.
       PageIndexLocation page_index_location;
@@ -533,7 +529,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
     }
 
     if (properties_->page_index_enabled()) {
-      page_index_builder_ = PageIndexBuilder::Make(&schema_);
+      page_index_builder_ = PageIndexBuilder::Make(&schema_, 
file_encryptor_.get());
     }
   }
 };
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 8aedf5b926..4ef2151fee 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -211,7 +211,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
           ThriftDeserializer deserializer(properties_);
           deserializer.DeserializeMessage(
               reinterpret_cast<const 
uint8_t*>(column->encrypted_column_metadata.c_str()),
-              &len, &decrypted_metadata_, decryptor);
+              &len, &decrypted_metadata_, decryptor.get());
           column_metadata_ = &decrypted_metadata_;
         } else {
           throw ParquetException(
@@ -603,7 +603,8 @@ class FileMetaData::FileMetaDataImpl {
 
     ThriftDeserializer deserializer(properties_);
     deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(metadata),
-                                    metadata_len, metadata_.get(), 
footer_decryptor);
+                                    metadata_len, metadata_.get(),
+                                    footer_decryptor.get());
     metadata_len_ = *metadata_len;
 
     if (metadata_->__isset.created_by) {
@@ -705,7 +706,7 @@ class FileMetaData::FileMetaDataImpl {
                      encryption::kGcmTagLength));
     } else {  // either plaintext file (when encryptor is null)
       // or encrypted file with encrypted footer
-      serializer.Serialize(metadata_.get(), dst, encryptor);
+      serializer.Serialize(metadata_.get(), dst, encryptor.get());
     }
   }
 
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index e62b2d187a..6609cff48b 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -25,6 +25,7 @@
 #include <utility>
 #include <vector>
 
+#include "parquet/encryption/type_fwd.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
@@ -34,15 +35,10 @@ namespace parquet {
 
 class ColumnDescriptor;
 class EncodedStatistics;
+class FileCryptoMetaData;
 class Statistics;
 class SchemaDescriptor;
 
-class FileCryptoMetaData;
-class InternalFileDecryptor;
-class Decryptor;
-class Encryptor;
-class FooterSigningEncryptor;
-
 namespace schema {
 
 class ColumnPath;
diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc
index 9bae90e554..ec99af17f0 100644
--- a/cpp/src/parquet/page_index.cc
+++ b/cpp/src/parquet/page_index.cc
@@ -17,6 +17,9 @@
 
 #include "parquet/page_index.h"
 #include "parquet/encoding.h"
+#include "parquet/encryption/encryption_internal.h"
+#include "parquet/encryption/internal_file_decryptor.h"
+#include "parquet/encryption/internal_file_encryptor.h"
 #include "parquet/exception.h"
 #include "parquet/metadata.h"
 #include "parquet/schema.h"
@@ -192,13 +195,13 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
                               const ReaderProperties& properties,
                               int32_t row_group_ordinal,
                               const RowGroupIndexReadRange& index_read_range,
-                              std::shared_ptr<InternalFileDecryptor> 
file_decryptor)
+                              InternalFileDecryptor* file_decryptor)
       : input_(input),
         row_group_metadata_(std::move(row_group_metadata)),
         properties_(properties),
         row_group_ordinal_(row_group_ordinal),
         index_read_range_(index_read_range),
-        file_decryptor_(std::move(file_decryptor)) {}
+        file_decryptor_(file_decryptor) {}
 
   /// Read column index of a column chunk.
   std::shared_ptr<ColumnIndex> GetColumnIndex(int32_t i) override {
@@ -207,11 +210,6 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
     }
 
     auto col_chunk = row_group_metadata_->ColumnChunk(i);
-    std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = 
col_chunk->crypto_metadata();
-    if (crypto_metadata != nullptr) {
-      ParquetException::NYI("Cannot read encrypted column index yet");
-    }
-
     auto column_index_location = col_chunk->GetColumnIndexLocation();
     if (!column_index_location.has_value()) {
       return nullptr;
@@ -232,8 +230,17 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
     // uint32_t
     uint32_t length = static_cast<uint32_t>(column_index_location->length);
     auto descr = row_group_metadata_->schema()->Column(i);
+
+    // Get decryptor of column index if encrypted.
+    std::shared_ptr<Decryptor> decryptor = parquet::GetColumnMetaDecryptor(
+        col_chunk->crypto_metadata().get(), file_decryptor_);
+    if (decryptor != nullptr) {
+      UpdateDecryptor(decryptor, row_group_ordinal_, /*column_ordinal=*/i,
+                      encryption::kColumnIndex);
+    }
+
     return ColumnIndex::Make(*descr, column_index_buffer_->data() + 
buffer_offset, length,
-                             properties_);
+                             properties_, decryptor.get());
   }
 
   /// Read offset index of a column chunk.
@@ -243,11 +250,6 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
     }
 
     auto col_chunk = row_group_metadata_->ColumnChunk(i);
-    std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = 
col_chunk->crypto_metadata();
-    if (crypto_metadata != nullptr) {
-      ParquetException::NYI("Cannot read encrypted offset index yet");
-    }
-
     auto offset_index_location = col_chunk->GetOffsetIndexLocation();
     if (!offset_index_location.has_value()) {
       return nullptr;
@@ -267,8 +269,17 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
     // OffsetIndex::Make() requires the type of serialized thrift message to be
     // uint32_t
     uint32_t length = static_cast<uint32_t>(offset_index_location->length);
+
+    // Get decryptor of offset index if encrypted.
+    std::shared_ptr<Decryptor> decryptor =
+        GetColumnMetaDecryptor(col_chunk->crypto_metadata().get(), 
file_decryptor_);
+    if (decryptor != nullptr) {
+      UpdateDecryptor(decryptor, row_group_ordinal_, /*column_ordinal=*/i,
+                      encryption::kOffsetIndex);
+    }
+
     return OffsetIndex::Make(offset_index_buffer_->data() + buffer_offset, 
length,
-                             properties_);
+                             properties_, decryptor.get());
   }
 
  private:
@@ -325,7 +336,7 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
   RowGroupIndexReadRange index_read_range_;
 
   /// File-level decryptor.
-  std::shared_ptr<InternalFileDecryptor> file_decryptor_;
+  InternalFileDecryptor* file_decryptor_;
 
   /// Buffer to hold the raw bytes of the page index.
   /// Will be set lazily when the corresponding page index is accessed for the 
1st time.
@@ -338,11 +349,11 @@ class PageIndexReaderImpl : public PageIndexReader {
   PageIndexReaderImpl(::arrow::io::RandomAccessFile* input,
                       std::shared_ptr<FileMetaData> file_metadata,
                       const ReaderProperties& properties,
-                      std::shared_ptr<InternalFileDecryptor> file_decryptor)
+                      InternalFileDecryptor* file_decryptor)
       : input_(input),
         file_metadata_(std::move(file_metadata)),
         properties_(properties),
-        file_decryptor_(std::move(file_decryptor)) {}
+        file_decryptor_(file_decryptor) {}
 
   std::shared_ptr<RowGroupPageIndexReader> RowGroup(int i) override {
     if (i < 0 || i >= file_metadata_->num_row_groups()) {
@@ -418,7 +429,7 @@ class PageIndexReaderImpl : public PageIndexReader {
   const ReaderProperties& properties_;
 
   /// File-level decrypter.
-  std::shared_ptr<InternalFileDecryptor> file_decryptor_;
+  InternalFileDecryptor* file_decryptor_;
 
   /// Coalesced read ranges of page index of row groups that have been 
suggested by
   /// WillNeed(). Key is the row group ordinal.
@@ -524,9 +535,9 @@ class ColumnIndexBuilderImpl final : public 
ColumnIndexBuilder {
     column_index_.__set_boundary_order(ToThrift(boundary_order));
   }
 
-  void WriteTo(::arrow::io::OutputStream* sink) const override {
+  void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const 
override {
     if (state_ == BuilderState::kFinished) {
-      ThriftSerializer{}.Serialize(&column_index_, sink);
+      ThriftSerializer{}.Serialize(&column_index_, sink, encryptor);
     }
   }
 
@@ -634,9 +645,9 @@ class OffsetIndexBuilderImpl final : public 
OffsetIndexBuilder {
     }
   }
 
-  void WriteTo(::arrow::io::OutputStream* sink) const override {
+  void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const 
override {
     if (state_ == BuilderState::kFinished) {
-      ThriftSerializer{}.Serialize(&offset_index_, sink);
+      ThriftSerializer{}.Serialize(&offset_index_, sink, encryptor);
     }
   }
 
@@ -654,7 +665,9 @@ class OffsetIndexBuilderImpl final : public 
OffsetIndexBuilder {
 
 class PageIndexBuilderImpl final : public PageIndexBuilder {
  public:
-  explicit PageIndexBuilderImpl(const SchemaDescriptor* schema) : 
schema_(schema) {}
+  explicit PageIndexBuilderImpl(const SchemaDescriptor* schema,
+                                InternalFileEncryptor* file_encryptor)
+      : schema_(schema), file_encryptor_(file_encryptor) {}
 
   void AppendRowGroup() override {
     if (finished_) {
@@ -724,12 +737,31 @@ class PageIndexBuilderImpl final : public 
PageIndexBuilder {
     }
   }
 
+  std::shared_ptr<Encryptor> GetColumnMetaEncryptor(int row_group_ordinal,
+                                                    int column_ordinal,
+                                                    int8_t module_type) const {
+    std::shared_ptr<Encryptor> encryptor;
+    if (file_encryptor_ != nullptr) {
+      const auto column_path = 
schema_->Column(column_ordinal)->path()->ToDotString();
+      encryptor = file_encryptor_->GetColumnMetaEncryptor(column_path);
+      if (encryptor != nullptr) {
+        encryptor->UpdateAad(encryption::CreateModuleAad(
+            encryptor->file_aad(), module_type, row_group_ordinal, 
column_ordinal,
+            kNonPageOrdinal));
+      }
+    }
+    return encryptor;
+  }
+
   template <typename Builder>
   void SerializeIndex(
       const std::vector<std::vector<std::unique_ptr<Builder>>>& 
page_index_builders,
       ::arrow::io::OutputStream* sink,
       std::map<size_t, std::vector<std::optional<IndexLocation>>>* location) 
const {
     const auto num_columns = static_cast<size_t>(schema_->num_columns());
+    constexpr int8_t module_type = std::is_same_v<Builder, ColumnIndexBuilder>
+                                       ? encryption::kColumnIndex
+                                       : encryption::kOffsetIndex;
 
     /// Serialize the same kind of page index row group by row group.
     for (size_t row_group = 0; row_group < page_index_builders.size(); 
++row_group) {
@@ -743,9 +775,13 @@ class PageIndexBuilderImpl final : public PageIndexBuilder 
{
       for (size_t column = 0; column < num_columns; ++column) {
         const auto& column_page_index_builder = 
row_group_page_index_builders[column];
         if (column_page_index_builder != nullptr) {
+          /// Get encryptor if encryption is enabled.
+          std::shared_ptr<Encryptor> encryptor = GetColumnMetaEncryptor(
+              static_cast<int>(row_group), static_cast<int>(column), 
module_type);
+
           /// Try serializing the page index.
           PARQUET_ASSIGN_OR_THROW(int64_t pos_before_write, sink->Tell());
-          column_page_index_builder->WriteTo(sink);
+          column_page_index_builder->WriteTo(sink, encryptor.get());
           PARQUET_ASSIGN_OR_THROW(int64_t pos_after_write, sink->Tell());
           int64_t len = pos_after_write - pos_before_write;
 
@@ -769,6 +805,7 @@ class PageIndexBuilderImpl final : public PageIndexBuilder {
   }
 
   const SchemaDescriptor* schema_;
+  InternalFileEncryptor* file_encryptor_;
   std::vector<std::vector<std::unique_ptr<ColumnIndexBuilder>>> 
column_index_builders_;
   std::vector<std::vector<std::unique_ptr<OffsetIndexBuilder>>> 
offset_index_builders_;
   bool finished_ = false;
@@ -832,11 +869,12 @@ RowGroupIndexReadRange 
PageIndexReader::DeterminePageIndexRangesInRowGroup(
 std::unique_ptr<ColumnIndex> ColumnIndex::Make(const ColumnDescriptor& descr,
                                                const void* serialized_index,
                                                uint32_t index_len,
-                                               const ReaderProperties& 
properties) {
+                                               const ReaderProperties& 
properties,
+                                               Decryptor* decryptor) {
   format::ColumnIndex column_index;
   ThriftDeserializer deserializer(properties);
   deserializer.DeserializeMessage(reinterpret_cast<const 
uint8_t*>(serialized_index),
-                                  &index_len, &column_index);
+                                  &index_len, &column_index, decryptor);
   switch (descr.physical_type()) {
     case Type::BOOLEAN:
       return std::make_unique<TypedColumnIndexImpl<BooleanType>>(descr,
@@ -871,20 +909,20 @@ std::unique_ptr<ColumnIndex> ColumnIndex::Make(const 
ColumnDescriptor& descr,
 
 std::unique_ptr<OffsetIndex> OffsetIndex::Make(const void* serialized_index,
                                                uint32_t index_len,
-                                               const ReaderProperties& 
properties) {
+                                               const ReaderProperties& 
properties,
+                                               Decryptor* decryptor) {
   format::OffsetIndex offset_index;
   ThriftDeserializer deserializer(properties);
   deserializer.DeserializeMessage(reinterpret_cast<const 
uint8_t*>(serialized_index),
-                                  &index_len, &offset_index);
+                                  &index_len, &offset_index, decryptor);
   return std::make_unique<OffsetIndexImpl>(offset_index);
 }
 
 std::shared_ptr<PageIndexReader> PageIndexReader::Make(
     ::arrow::io::RandomAccessFile* input, std::shared_ptr<FileMetaData> 
file_metadata,
-    const ReaderProperties& properties,
-    std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+    const ReaderProperties& properties, InternalFileDecryptor* file_decryptor) 
{
   return std::make_shared<PageIndexReaderImpl>(input, std::move(file_metadata),
-                                               properties, 
std::move(file_decryptor));
+                                               properties, file_decryptor);
 }
 
 std::unique_ptr<ColumnIndexBuilder> ColumnIndexBuilder::Make(
@@ -917,8 +955,9 @@ std::unique_ptr<OffsetIndexBuilder> 
OffsetIndexBuilder::Make() {
   return std::make_unique<OffsetIndexBuilderImpl>();
 }
 
-std::unique_ptr<PageIndexBuilder> PageIndexBuilder::Make(const 
SchemaDescriptor* schema) {
-  return std::make_unique<PageIndexBuilderImpl>(schema);
+std::unique_ptr<PageIndexBuilder> PageIndexBuilder::Make(
+    const SchemaDescriptor* schema, InternalFileEncryptor* file_encryptor) {
+  return std::make_unique<PageIndexBuilderImpl>(schema, file_encryptor);
 }
 
 std::ostream& operator<<(std::ostream& out, const PageIndexSelection& 
selection) {
diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h
index b6ea5fd6ab..f2ed77cb97 100644
--- a/cpp/src/parquet/page_index.h
+++ b/cpp/src/parquet/page_index.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "arrow/io/interfaces.h"
+#include "parquet/encryption/type_fwd.h"
 #include "parquet/types.h"
 
 #include <optional>
@@ -25,14 +26,8 @@
 
 namespace parquet {
 
-class ColumnDescriptor;
 class EncodedStatistics;
-class FileMetaData;
-class InternalFileDecryptor;
 struct PageIndexLocation;
-class ReaderProperties;
-class RowGroupMetaData;
-class RowGroupPageIndexReader;
 
 /// \brief ColumnIndex is a proxy around format::ColumnIndex.
 class PARQUET_EXPORT ColumnIndex {
@@ -41,7 +36,8 @@ class PARQUET_EXPORT ColumnIndex {
   static std::unique_ptr<ColumnIndex> Make(const ColumnDescriptor& descr,
                                            const void* serialized_index,
                                            uint32_t index_len,
-                                           const ReaderProperties& properties);
+                                           const ReaderProperties& properties,
+                                           Decryptor* decryptor = NULLPTR);
 
   virtual ~ColumnIndex() = default;
 
@@ -126,7 +122,8 @@ class PARQUET_EXPORT OffsetIndex {
   /// \brief Create a OffsetIndex from a serialized thrift message.
   static std::unique_ptr<OffsetIndex> Make(const void* serialized_index,
                                            uint32_t index_len,
-                                           const ReaderProperties& properties);
+                                           const ReaderProperties& properties,
+                                           Decryptor* decryptor = NULLPTR);
 
   virtual ~OffsetIndex() = default;
 
@@ -187,7 +184,7 @@ class PARQUET_EXPORT PageIndexReader {
   static std::shared_ptr<PageIndexReader> Make(
       ::arrow::io::RandomAccessFile* input, std::shared_ptr<FileMetaData> 
file_metadata,
       const ReaderProperties& properties,
-      std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
+      InternalFileDecryptor* file_decryptor = NULLPTR);
 
   /// \brief Get the page index reader of a specific row group.
   /// \param[in] i row group ordinal to get page index reader.
@@ -283,7 +280,9 @@ class PARQUET_EXPORT ColumnIndexBuilder {
   /// not write any data to the sink.
   ///
   /// \param[out] sink output stream to write the serialized message.
-  virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0;
+  /// \param[in] encryptor encryptor to encrypt the serialized column index.
+  virtual void WriteTo(::arrow::io::OutputStream* sink,
+                       Encryptor* encryptor = NULLPTR) const = 0;
 
   /// \brief Create a ColumnIndex directly.
   ///
@@ -322,7 +321,9 @@ class PARQUET_EXPORT OffsetIndexBuilder {
   /// \brief Serialize the offset index thrift message.
   ///
   /// \param[out] sink output stream to write the serialized message.
-  virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0;
+  /// \param[in] encryptor encryptor to encrypt the serialized offset index.
+  virtual void WriteTo(::arrow::io::OutputStream* sink,
+                       Encryptor* encryptor = NULLPTR) const = 0;
 
   /// \brief Create an OffsetIndex directly.
   virtual std::unique_ptr<OffsetIndex> Build() const = 0;
@@ -332,7 +333,8 @@ class PARQUET_EXPORT OffsetIndexBuilder {
 class PARQUET_EXPORT PageIndexBuilder {
  public:
   /// \brief API convenience to create a PageIndexBuilder.
-  static std::unique_ptr<PageIndexBuilder> Make(const SchemaDescriptor* 
schema);
+  static std::unique_ptr<PageIndexBuilder> Make(
+      const SchemaDescriptor* schema, InternalFileEncryptor* file_encryptor = 
NULLPTR);
 
   virtual ~PageIndexBuilder() = default;
 
diff --git a/cpp/src/parquet/thrift_internal.h 
b/cpp/src/parquet/thrift_internal.h
index 5824a82d5b..7491f118d3 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -403,7 +403,7 @@ class ThriftDeserializer {
   // set to the actual length of the header.
   template <class T>
   void DeserializeMessage(const uint8_t* buf, uint32_t* len, T* 
deserialized_msg,
-                          const std::shared_ptr<Decryptor>& decryptor = 
NULLPTR) {
+                          Decryptor* decryptor = NULLPTR) {
     if (decryptor == NULLPTR) {
       // thrift message is not encrypted
       DeserializeUnencryptedMessage(buf, len, deserialized_msg);
@@ -495,7 +495,7 @@ class ThriftSerializer {
 
   template <class T>
   int64_t Serialize(const T* obj, ArrowOutputStream* out,
-                    const std::shared_ptr<Encryptor>& encryptor = NULLPTR) {
+                    Encryptor* encryptor = NULLPTR) {
     uint8_t* out_buffer;
     uint32_t out_length;
     SerializeToBuffer(obj, &out_length, &out_buffer);
@@ -523,8 +523,7 @@ class ThriftSerializer {
   }
 
   int64_t SerializeEncryptedObj(ArrowOutputStream* out, uint8_t* out_buffer,
-                                uint32_t out_length,
-                                const std::shared_ptr<Encryptor>& encryptor) {
+                                uint32_t out_length, Encryptor* encryptor) {
     auto cipher_buffer = 
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(
         encryptor->pool(),
         static_cast<int64_t>(encryptor->CiphertextSizeDelta() + out_length)));
diff --git a/cpp/src/parquet/type_fwd.h b/cpp/src/parquet/type_fwd.h
index 3e66f32fc0..da0d0f7bde 100644
--- a/cpp/src/parquet/type_fwd.h
+++ b/cpp/src/parquet/type_fwd.h
@@ -69,6 +69,9 @@ struct ParquetVersion {
 };
 
 class FileMetaData;
+class RowGroupMetaData;
+
+class ColumnDescriptor;
 class SchemaDescriptor;
 
 class ReaderProperties;

Reply via email to