This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 64521946ea GH-43057: [C++] Thread-safe AesEncryptor / AesDecryptor 
(#44990)
64521946ea is described below

commit 64521946eafcea99ac941b0005a4010f8bd1c923
Author: Enrico Minack <[email protected]>
AuthorDate: Wed Apr 2 11:23:30 2025 +0200

    GH-43057: [C++] Thread-safe AesEncryptor / AesDecryptor (#44990)
    
    ### Rationale for this change
    
    OpenSSL encryption / decryption is wrapped by AesEncryptor / AesDencryptor, 
which is used by multiple threads of a single scanner or by multiple concurrent 
scanners when scanning a dataset. Some thread may call `WipeOut` while other 
threads still use the instance.
    
    ### What changes are included in this PR?
    
    - Remove the `WipeOut` methods and related datastructures entirely.
    - Each call into `CtrEncrypt` / `CtrDecrypt` and `GcmEncrypt` / 
`GcmDecrypt` uses its own `EVP_CIPHER_CTX` instance, making this thread-safe.
    
    After fixing this `"AesDecryptor was wiped out"` issue, two other 
segmentation faults surfaced: GH-44988. This has also been addressed here as it 
can only be exposed after fixing the wipe-out issue.
    
    Fixes GH-43057.
    Fixes GH-44852.
    Fixes GH-44988.
    
    ### Are these changes tested?
    A unit test that scans a dataset concurrently reproduced the initial issue 
in 30% of the test runs.
    
    ### Are there any user-facing changes?
    No.
    * GitHub Issue: #43057
    
    Lead-authored-by: Enrico Minack <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 .../encryption_reader_writer_all_crypto_options.cc |   2 +-
 cpp/src/arrow/dataset/file_parquet.cc              |  21 +-
 .../arrow/dataset/file_parquet_encryption_test.cc  | 314 ++++++++++++++++-----
 cpp/src/parquet/column_reader.cc                   |  66 +++--
 cpp/src/parquet/column_reader.h                    |  13 +-
 cpp/src/parquet/encryption/encryption.cc           |  94 +-----
 cpp/src/parquet/encryption/encryption.h            |  88 +-----
 cpp/src/parquet/encryption/encryption_internal.cc  | 308 +++++++++-----------
 cpp/src/parquet/encryption/encryption_internal.h   |  27 +-
 .../encryption/encryption_internal_nossl.cc        |  15 +-
 .../parquet/encryption/internal_file_decryptor.cc  | 207 +++++---------
 .../parquet/encryption/internal_file_decryptor.h   | 101 ++++---
 .../parquet/encryption/internal_file_encryptor.cc  |  22 +-
 .../parquet/encryption/internal_file_encryptor.h   |   1 -
 cpp/src/parquet/encryption/key_management_test.cc  |   2 +-
 .../parquet/encryption/read_configurations_test.cc |  27 +-
 cpp/src/parquet/encryption/test_encryption_util.cc |   6 +-
 cpp/src/parquet/file_reader.cc                     |  43 ++-
 cpp/src/parquet/file_writer.cc                     |   3 -
 cpp/src/parquet/metadata.cc                        |   2 -
 cpp/src/parquet/page_index.cc                      |  14 +-
 21 files changed, 618 insertions(+), 758 deletions(-)

diff --git 
a/cpp/examples/parquet/low_level_api/encryption_reader_writer_all_crypto_options.cc
 
b/cpp/examples/parquet/low_level_api/encryption_reader_writer_all_crypto_options.cc
index 5b01e02846..290ba87c5f 100644
--- 
a/cpp/examples/parquet/low_level_api/encryption_reader_writer_all_crypto_options.cc
+++ 
b/cpp/examples/parquet/low_level_api/encryption_reader_writer_all_crypto_options.cc
@@ -429,7 +429,7 @@ void InteropTestReadEncryptedParquetFiles(std::string 
root_path) {
 
         // Add the current decryption configuration to ReaderProperties.
         reader_properties.file_decryption_properties(
-            vector_of_decryption_configurations[example_id]->DeepClone());
+            vector_of_decryption_configurations[example_id]);
 
         // Create a ParquetReader instance
         std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
diff --git a/cpp/src/arrow/dataset/file_parquet.cc 
b/cpp/src/arrow/dataset/file_parquet.cc
index cf51ea18d7..e6ac38936e 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -76,17 +76,17 @@ parquet::ReaderProperties MakeReaderProperties(
   }
   
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
 
+  auto file_decryption_prop =
+      parquet_scan_options->reader_properties->file_decryption_properties();
+
 #ifdef PARQUET_REQUIRE_ENCRYPTION
   auto parquet_decrypt_config = 
parquet_scan_options->parquet_decryption_config;
 
   if (parquet_decrypt_config != nullptr) {
-    auto file_decryption_prop =
+    file_decryption_prop =
         parquet_decrypt_config->crypto_factory->GetFileDecryptionProperties(
             *parquet_decrypt_config->kms_connection_config,
             *parquet_decrypt_config->decryption_config, path, filesystem);
-
-    parquet_scan_options->reader_properties->file_decryption_properties(
-        std::move(file_decryption_prop));
   }
 #else
   if (parquet_scan_options->parquet_decryption_config != nullptr) {
@@ -94,8 +94,7 @@ parquet::ReaderProperties MakeReaderProperties(
   }
 #endif
 
-  properties.file_decryption_properties(
-      parquet_scan_options->reader_properties->file_decryption_properties());
+  properties.file_decryption_properties(file_decryption_prop);
 
   properties.set_thrift_string_size_limit(
       parquet_scan_options->reader_properties->thrift_string_size_limit());
@@ -527,9 +526,11 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
   auto self = checked_pointer_cast<const 
ParquetFileFormat>(shared_from_this());
 
   return source.OpenAsync().Then(
-      [=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
-        return parquet::ParquetFileReader::OpenAsync(input, 
std::move(properties),
-                                                     metadata)
+      [self = self, properties = std::move(properties), source = source,
+       options = options, metadata = metadata,
+       parquet_scan_options = parquet_scan_options](
+          const std::shared_ptr<io::RandomAccessFile>& input) mutable {
+        return parquet::ParquetFileReader::OpenAsync(input, properties, 
metadata)
             .Then(
                 [=](const std::unique_ptr<parquet::ParquetFileReader>& reader) 
mutable
                 -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
@@ -544,7 +545,7 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
                       // here we know there are no other waiters on the reader.
                       
std::move(const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(
                           reader)),
-                      std::move(arrow_properties), &arrow_reader));
+                      arrow_properties, &arrow_reader));
 
                   // R build with openSUSE155 requires an explicit shared_ptr 
construction
                   return std::shared_ptr<parquet::arrow::FileReader>(
diff --git a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc 
b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc
index 0287d593d1..d2e1763c62 100644
--- a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc
@@ -20,18 +20,21 @@
 #include "gtest/gtest.h"
 
 #include "arrow/array.h"
+#include "arrow/compute/api_vector.h"
 #include "arrow/dataset/dataset.h"
 #include "arrow/dataset/file_base.h"
 #include "arrow/dataset/file_parquet.h"
 #include "arrow/dataset/parquet_encryption_config.h"
 #include "arrow/dataset/partition.h"
 #include "arrow/filesystem/mockfs.h"
-#include "arrow/io/api.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
+#include "arrow/testing/future_util.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
 #include "arrow/type.h"
+#include "arrow/util/future.h"
+#include "arrow/util/thread_pool.h"
 #include "parquet/arrow/reader.h"
 #include "parquet/encryption/crypto_factory.h"
 #include "parquet/encryption/encryption_internal.h"
@@ -51,9 +54,34 @@ using arrow::internal::checked_pointer_cast;
 namespace arrow {
 namespace dataset {
 
+struct EncryptionTestParam {
+  bool uniform_encryption;  // false is using per-column keys
+  bool concurrently;
+  bool use_crypto_factory;
+};
+
+std::ostream& operator<<(std::ostream& os, const EncryptionTestParam& param) {
+  os << (param.uniform_encryption ? "UniformEncryption" : "ColumnKeys") << " ";
+  os << (param.concurrently ? "Threaded" : "Serial") << " ";
+  os << (param.use_crypto_factory ? "CryptoFactory" : "PropertyKeys");
+  return os;
+}
+
+const auto kAllParamValues = ::testing::Values(
+    // non-uniform encryption not supported for property keys by test
+    EncryptionTestParam{true, false, false}, EncryptionTestParam{true, true, 
false},
+    EncryptionTestParam{false, false, true}, EncryptionTestParam{true, false, 
true},
+    EncryptionTestParam{false, true, true}, EncryptionTestParam{true, true, 
true});
+
 // Base class to test writing and reading encrypted dataset.
-class DatasetEncryptionTestBase : public ::testing::Test {
+class DatasetEncryptionTestBase : public 
testing::TestWithParam<EncryptionTestParam> {
  public:
+#ifdef ARROW_VALGRIND
+  static constexpr int kConcurrentIterations = 4;
+#else
+  static constexpr int kConcurrentIterations = 20;
+#endif
+
   // This function creates a mock file system using the current time point, 
creates a
   // directory with the given base directory path, and writes a dataset to it 
using
   // provided Parquet file write options. The function also checks if the 
written files
@@ -73,102 +101,213 @@ class DatasetEncryptionTestBase : public ::testing::Test 
{
 
     // Init dataset and partitioning.
     ASSERT_NO_FATAL_FAILURE(PrepareTableAndPartitioning());
+    ASSERT_OK_AND_ASSIGN(expected_table_, table_->CombineChunks());
+    ASSERT_OK_AND_ASSIGN(expected_table_, SortTable(expected_table_));
 
     // Prepare encryption properties.
     std::unordered_map<std::string, std::string> key_map;
     key_map.emplace(kColumnMasterKeyId, kColumnMasterKey);
     key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);
 
-    crypto_factory_ = std::make_shared<parquet::encryption::CryptoFactory>();
-    auto kms_client_factory =
-        
std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
-            /*wrap_locally=*/true, key_map);
-    crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory));
-    kms_connection_config_ = 
std::make_shared<parquet::encryption::KmsConnectionConfig>();
-
-    // Set write options with encryption configuration.
-    auto encryption_config =
-        std::make_shared<parquet::encryption::EncryptionConfiguration>(
-            std::string(kFooterKeyName));
-    encryption_config->column_keys = kColumnKeyMapping;
-    auto parquet_encryption_config = 
std::make_shared<ParquetEncryptionConfig>();
-    // Directly assign shared_ptr objects to ParquetEncryptionConfig members
-    parquet_encryption_config->crypto_factory = crypto_factory_;
-    parquet_encryption_config->kms_connection_config = kms_connection_config_;
-    parquet_encryption_config->encryption_config = 
std::move(encryption_config);
-
     auto file_format = std::make_shared<ParquetFileFormat>();
     auto parquet_file_write_options =
         
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
-    parquet_file_write_options->parquet_encryption_config =
-        std::move(parquet_encryption_config);
+
+    if (GetParam().use_crypto_factory) {
+      // Configure encryption keys via crypto factory.
+      crypto_factory_ = std::make_shared<parquet::encryption::CryptoFactory>();
+      auto kms_client_factory =
+          
std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
+              /*wrap_locally=*/true, key_map);
+      crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory));
+      kms_connection_config_ =
+          std::make_shared<parquet::encryption::KmsConnectionConfig>();
+
+      // Set write options with encryption configuration.
+      auto encryption_config =
+          std::make_shared<parquet::encryption::EncryptionConfiguration>(
+              std::string(kFooterKeyName));
+      encryption_config->uniform_encryption = GetParam().uniform_encryption;
+      if (!GetParam().uniform_encryption) {
+        encryption_config->column_keys = kColumnKeyMapping;
+      }
+
+      auto parquet_encryption_config = 
std::make_shared<ParquetEncryptionConfig>();
+      // Directly assign shared_ptr objects to ParquetEncryptionConfig members
+      parquet_encryption_config->crypto_factory = crypto_factory_;
+      parquet_encryption_config->kms_connection_config = 
kms_connection_config_;
+      parquet_encryption_config->encryption_config = 
std::move(encryption_config);
+      parquet_file_write_options->parquet_encryption_config =
+          std::move(parquet_encryption_config);
+    } else {
+      // Configure encryption keys via writer options / file encryption 
properties.
+      // non-uniform encryption not support by test
+      ASSERT_TRUE(GetParam().uniform_encryption);
+      auto file_encryption_properties =
+          std::make_unique<parquet::FileEncryptionProperties::Builder>(
+              std::string(kFooterKeyMasterKey))
+              ->build();
+      auto writer_properties = 
std::make_unique<parquet::WriterProperties::Builder>()
+                                   ->encryption(file_encryption_properties)
+                                   ->build();
+      parquet_file_write_options->writer_properties = writer_properties;
+    }
 
     // Write dataset.
     auto dataset = std::make_shared<InMemoryDataset>(table_);
     EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+    ARROW_EXPECT_OK(scanner_builder->UseThreads(GetParam().concurrently));
     EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
 
-    FileSystemDatasetWriteOptions write_options;
-    write_options.file_write_options = parquet_file_write_options;
-    write_options.filesystem = file_system_;
-    write_options.base_dir = kBaseDir;
-    write_options.partitioning = partitioning_;
-    write_options.basename_template = "part{i}.parquet";
-    ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
+    if (GetParam().concurrently) {
+      // Have a notable number of threads to exhibit multi-threading issues
+      ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(16));
+      std::vector<Future<>> futures;
+
+      // Write dataset above multiple times concurrently to see that is 
thread-safe.
+      for (int i = 1; i <= kConcurrentIterations; ++i) {
+        FileSystemDatasetWriteOptions write_options;
+        write_options.file_write_options = parquet_file_write_options;
+        write_options.filesystem = file_system_;
+        write_options.base_dir = "thread-" + std::to_string(i);
+        write_options.partitioning = partitioning_;
+        write_options.basename_template = "part{i}.parquet";
+        futures.push_back(
+            DeferNotOk(pool->Submit(FileSystemDataset::Write, write_options, 
scanner)));
+      }
+
+      // Assert all jobs succeeded
+      for (auto& future : futures) {
+        ASSERT_FINISHES_OK(future);
+      }
+    } else {
+      FileSystemDatasetWriteOptions write_options;
+      write_options.file_write_options = parquet_file_write_options;
+      write_options.filesystem = file_system_;
+      write_options.base_dir = kBaseDir;
+      write_options.partitioning = partitioning_;
+      write_options.basename_template = "part{i}.parquet";
+      ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
+    }
   }
 
   virtual void PrepareTableAndPartitioning() = 0;
 
-  void TestScanDataset() {
-    // Create decryption properties.
-    auto decryption_config =
-        std::make_shared<parquet::encryption::DecryptionConfiguration>();
-    auto parquet_decryption_config = 
std::make_shared<ParquetDecryptionConfig>();
-    parquet_decryption_config->crypto_factory = crypto_factory_;
-    parquet_decryption_config->kms_connection_config = kms_connection_config_;
-    parquet_decryption_config->decryption_config = 
std::move(decryption_config);
-
-    // Set scan options.
-    auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
-    parquet_scan_options->parquet_decryption_config =
-        std::move(parquet_decryption_config);
-
-    auto file_format = std::make_shared<ParquetFileFormat>();
-    file_format->default_fragment_scan_options = 
std::move(parquet_scan_options);
-
+  Result<std::shared_ptr<Dataset>> OpenDataset(
+      std::string_view base_dir, const std::shared_ptr<ParquetFileFormat>& 
file_format) {
     // Get FileInfo objects for all files under the base directory
     fs::FileSelector selector;
-    selector.base_dir = kBaseDir;
+    selector.base_dir = base_dir;
     selector.recursive = true;
 
     FileSystemFactoryOptions factory_options;
     factory_options.partitioning = partitioning_;
-    factory_options.partition_base_dir = kBaseDir;
-    ASSERT_OK_AND_ASSIGN(auto dataset_factory,
-                         FileSystemDatasetFactory::Make(file_system_, selector,
-                                                        file_format, 
factory_options));
+    factory_options.partition_base_dir = base_dir;
+    ARROW_ASSIGN_OR_RAISE(auto dataset_factory,
+                          FileSystemDatasetFactory::Make(file_system_, 
selector,
+                                                         file_format, 
factory_options));
 
     // Create the dataset
-    ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
-
-    // Reuse the dataset above to scan it twice to make sure decryption works 
correctly.
-    for (size_t i = 0; i < 2; ++i) {
-      // Read dataset into table
-      ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
-      ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
-      ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());
-
-      // Verify the data was read correctly
-      ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
-      // Validate the table
-      ASSERT_OK(combined_table->ValidateFull());
-      AssertTablesEqual(*combined_table, *table_);
+    return dataset_factory->Finish();
+  }
+
+  void TestScanDataset() {
+    // Set scan options.
+    auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
+
+    if (GetParam().use_crypto_factory) {
+      // Configure decryption keys via crypto factory.
+      auto decryption_config =
+          std::make_shared<parquet::encryption::DecryptionConfiguration>();
+      auto parquet_decryption_config = 
std::make_shared<ParquetDecryptionConfig>();
+      parquet_decryption_config->crypto_factory = crypto_factory_;
+      parquet_decryption_config->kms_connection_config = 
kms_connection_config_;
+      parquet_decryption_config->decryption_config = 
std::move(decryption_config);
+
+      parquet_scan_options->parquet_decryption_config =
+          std::move(parquet_decryption_config);
+    } else {
+      // Configure decryption keys via reader properties / file decryption 
properties.
+      auto file_decryption_properties =
+          std::make_unique<parquet::FileDecryptionProperties::Builder>()
+              ->footer_key(std::string(kFooterKeyMasterKey))
+              ->build();
+      parquet_scan_options->reader_properties->file_decryption_properties(
+          file_decryption_properties);
+    }
+
+    auto file_format = std::make_shared<ParquetFileFormat>();
+    file_format->default_fragment_scan_options = 
std::move(parquet_scan_options);
+
+    if (GetParam().concurrently) {
+      // Create the dataset
+      ASSERT_OK_AND_ASSIGN(auto dataset, OpenDataset("thread-1", file_format));
+
+      // Have a notable number of threads to exhibit multi-threading issues
+      ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(16));
+      std::vector<Future<std::shared_ptr<Table>>> futures;
+
+      // Read dataset above multiple times concurrently to see that is 
thread-safe.
+      for (int i = 0; i < kConcurrentIterations; ++i) {
+        futures.push_back(DeferNotOk(pool->Submit(ReadDataset, dataset)));
+      }
+
+      // Assert correctness of jobs
+      for (auto& future : futures) {
+        ASSERT_OK_AND_ASSIGN(auto read_table, future.result());
+        CheckDatasetResults(read_table);
+      }
+
+      // Finally check datasets written by all other threads are as expected
+      for (int i = 2; i <= kConcurrentIterations; ++i) {
+        ASSERT_OK_AND_ASSIGN(dataset,
+                             OpenDataset("thread-" + std::to_string(i), 
file_format));
+        ASSERT_OK_AND_ASSIGN(auto read_table, ReadDataset(dataset));
+        CheckDatasetResults(read_table);
+      }
+    } else {
+      // Create the dataset
+      ASSERT_OK_AND_ASSIGN(auto dataset, OpenDataset(kBaseDir, file_format));
+
+      // Reuse the dataset above to scan it twice to make sure decryption 
works correctly.
+      for (int i = 0; i < 2; ++i) {
+        ASSERT_OK_AND_ASSIGN(auto read_table, ReadDataset(dataset));
+        CheckDatasetResults(read_table);
+      }
     }
   }
 
+  static Result<std::shared_ptr<Table>> ReadDataset(
+      const std::shared_ptr<Dataset>& dataset) {
+    // Read dataset into table
+    ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
+    ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
+    ARROW_EXPECT_OK(scanner_builder->UseThreads(GetParam().concurrently));
+    return scanner->ToTable();
+  }
+
+  void CheckDatasetResults(const std::shared_ptr<Table>& table) {
+    ASSERT_OK(table->ValidateFull());
+    // Make results comparable despite ordering and chunking differences
+    ASSERT_OK_AND_ASSIGN(auto combined_table, table->CombineChunks());
+    ASSERT_OK_AND_ASSIGN(auto sorted_table, SortTable(combined_table));
+    AssertTablesEqual(*sorted_table, *expected_table_);
+  }
+
+  // Sort table for comparability of dataset read results, which may be 
unordered.
+  // This relies on column "a" having statistically unique values.
+  Result<std::shared_ptr<Table>> SortTable(const std::shared_ptr<Table>& 
table) {
+    compute::SortOptions options({compute::SortKey("a")});
+    ARROW_ASSIGN_OR_RAISE(auto indices, compute::SortIndices(table, options));
+    ARROW_ASSIGN_OR_RAISE(auto sorted, compute::Take(table, indices));
+    EXPECT_EQ(sorted.kind(), Datum::TABLE);
+    return sorted.table();
+  }
+
  protected:
+  std::string base_dir_ = GetParam().concurrently ? "thread-1" : 
std::string(kBaseDir);
   std::shared_ptr<fs::FileSystem> file_system_;
-  std::shared_ptr<Table> table_;
+  std::shared_ptr<Table> table_, expected_table_;
   std::shared_ptr<Partitioning> partitioning_;
   std::shared_ptr<parquet::encryption::CryptoFactory> crypto_factory_;
   std::shared_ptr<parquet::encryption::KmsConnectionConfig> 
kms_connection_config_;
@@ -204,14 +343,15 @@ class DatasetEncryptionTest : public 
DatasetEncryptionTestBase {
 // properties are determined based on the selected columns. After writing the 
dataset, the
 // test reads the data back and verifies that it can be successfully decrypted 
and
 // scanned.
-TEST_F(DatasetEncryptionTest, WriteReadDatasetWithEncryption) {
+TEST_P(DatasetEncryptionTest, WriteReadDatasetWithEncryption) {
   ASSERT_NO_FATAL_FAILURE(TestScanDataset());
 }
 
 // Read a single parquet file with and without decryption properties.
-TEST_F(DatasetEncryptionTest, ReadSingleFile) {
+TEST_P(DatasetEncryptionTest, ReadSingleFile) {
   // Open the Parquet file.
-  ASSERT_OK_AND_ASSIGN(auto input, 
file_system_->OpenInputFile("part=a/part0.parquet"));
+  ASSERT_OK_AND_ASSIGN(auto input,
+                       file_system_->OpenInputFile(base_dir_ + 
"/part=a/part0.parquet"));
 
   // Try to read metadata without providing decryption properties
   // when the footer is encrypted.
@@ -220,8 +360,19 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
   // Create the ReaderProperties object using the FileDecryptionProperties 
object
   auto decryption_config =
       std::make_shared<parquet::encryption::DecryptionConfiguration>();
-  auto file_decryption_properties = 
crypto_factory_->GetFileDecryptionProperties(
-      *kms_connection_config_, *decryption_config);
+  std::shared_ptr<parquet::FileDecryptionProperties> 
file_decryption_properties;
+  if (GetParam().use_crypto_factory) {
+    // Configure decryption keys via file decryption properties with crypto 
factory key
+    // retriever.
+    file_decryption_properties = crypto_factory_->GetFileDecryptionProperties(
+        *kms_connection_config_, *decryption_config);
+  } else {
+    // Configure decryption keys via file decryption properties with static 
footer key.
+    file_decryption_properties =
+        std::make_unique<parquet::FileDecryptionProperties::Builder>()
+            ->footer_key(std::string(kFooterKeyMasterKey))
+            ->build();
+  }
   auto reader_properties = parquet::default_reader_properties();
   reader_properties.file_decryption_properties(file_decryption_properties);
 
@@ -240,30 +391,41 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
   
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0),
 1);
 }
 
+INSTANTIATE_TEST_SUITE_P(DatasetEncryptionTest, DatasetEncryptionTest, 
kAllParamValues);
+
 // GH-39444: This test covers the case where parquet dataset scanner crashes 
when
 // processing encrypted datasets over 2^15 rows in multi-threaded mode.
-class LargeRowEncryptionTest : public DatasetEncryptionTestBase {
+class LargeRowCountEncryptionTest : public DatasetEncryptionTestBase {
  public:
   // The dataset is partitioned using a Hive partitioning scheme.
   void PrepareTableAndPartitioning() override {
     // Specifically chosen to be greater than batch size for triggering 
prefetch.
     constexpr int kRowCount = 32769;
+    // Number of batches
+    constexpr int kBatchCount = 5;
 
-    // Create a random floating-point array with large number of rows.
+    // Create multiple random floating-point arrays with large number of rows.
     arrow::random::RandomArrayGenerator rand_gen(0);
-    auto array = rand_gen.Float32(kRowCount, 0.0, 1.0, false);
+    auto arrays = std::vector<std::shared_ptr<arrow::Array>>();
+    for (int i = 0; i < kBatchCount; i++) {
+      arrays.push_back(rand_gen.Float32(kRowCount, 0.0, 1.0, false));
+    }
+    ASSERT_OK_AND_ASSIGN(auto column, ChunkedArray::Make(arrays, float32()));
     auto table_schema = schema({field("a", float32())});
 
     // Prepare table and partitioning.
-    table_ = arrow::Table::Make(table_schema, {array});
+    table_ = arrow::Table::Make(table_schema, {column});
     partitioning_ = 
std::make_shared<dataset::DirectoryPartitioning>(arrow::schema({}));
   }
 };
 
 // Test for writing and reading encrypted dataset with large row count.
-TEST_F(LargeRowEncryptionTest, ReadEncryptLargeRows) {
+TEST_P(LargeRowCountEncryptionTest, ReadEncryptLargeRowCount) {
   ASSERT_NO_FATAL_FAILURE(TestScanDataset());
 }
 
+INSTANTIATE_TEST_SUITE_P(LargeRowCountEncryptionTest, 
LargeRowCountEncryptionTest,
+                         kAllParamValues);
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 3268701bed..aa486b1f46 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -273,8 +273,7 @@ class SerializedPageReader : public PageReader {
   void set_max_page_header_size(uint32_t size) override { 
max_page_header_size_ = size; }
 
  private:
-  void UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, int8_t 
module_type,
-                        std::string* page_aad);
+  void UpdateDecryption(Decryptor* decryptor, int8_t module_type, std::string* 
page_aad);
 
   void InitDecryption();
 
@@ -306,8 +305,13 @@ class SerializedPageReader : public PageReader {
   // Please refer to the encryption specification for more details:
   // 
https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data
 
-  // The ordinal fields in the context below are used for AAD suffix 
calculation.
+  // The CryptoContext used by this PageReader.
   CryptoContext crypto_ctx_;
+  // This PageReader has its own Decryptor instances in order to be 
thread-safe.
+  std::unique_ptr<Decryptor> meta_decryptor_;
+  std::unique_ptr<Decryptor> data_decryptor_;
+
+  // The ordinal fields in the context below are used for AAD suffix 
calculation.
   int32_t page_ordinal_;  // page ordinal does not count the dictionary page
 
   // Maximum allowed page size
@@ -331,22 +335,28 @@ class SerializedPageReader : public PageReader {
 
 void SerializedPageReader::InitDecryption() {
   // Prepare the AAD for quick update later.
-  if (crypto_ctx_.data_decryptor != nullptr) {
-    ARROW_DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty());
-    data_page_aad_ = encryption::CreateModuleAad(
-        crypto_ctx_.data_decryptor->file_aad(), encryption::kDataPage,
-        crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, 
kNonPageOrdinal);
-  }
-  if (crypto_ctx_.meta_decryptor != nullptr) {
-    ARROW_DCHECK(!crypto_ctx_.meta_decryptor->file_aad().empty());
-    data_page_header_aad_ = encryption::CreateModuleAad(
-        crypto_ctx_.meta_decryptor->file_aad(), encryption::kDataPageHeader,
-        crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, 
kNonPageOrdinal);
+  if (crypto_ctx_.data_decryptor_factory) {
+    data_decryptor_ = crypto_ctx_.data_decryptor_factory();
+    if (data_decryptor_) {
+      ARROW_DCHECK(!data_decryptor_->file_aad().empty());
+      data_page_aad_ = encryption::CreateModuleAad(
+          data_decryptor_->file_aad(), encryption::kDataPage,
+          crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, 
kNonPageOrdinal);
+    }
+  }
+  if (crypto_ctx_.meta_decryptor_factory) {
+    meta_decryptor_ = crypto_ctx_.meta_decryptor_factory();
+    if (meta_decryptor_) {
+      ARROW_DCHECK(!meta_decryptor_->file_aad().empty());
+      data_page_header_aad_ = encryption::CreateModuleAad(
+          meta_decryptor_->file_aad(), encryption::kDataPageHeader,
+          crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, 
kNonPageOrdinal);
+    }
   }
 }
 
-void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& 
decryptor,
-                                            int8_t module_type, std::string* 
page_aad) {
+void SerializedPageReader::UpdateDecryption(Decryptor* decryptor, int8_t 
module_type,
+                                            std::string* page_aad) {
   ARROW_DCHECK(decryptor != nullptr);
   if (crypto_ctx_.start_decrypt_with_dictionary_page) {
     UpdateDecryptor(decryptor, crypto_ctx_.row_group_ordinal, 
crypto_ctx_.column_ordinal,
@@ -425,15 +435,15 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       // This gets used, then set by DeserializeThriftMsg
       header_size = static_cast<uint32_t>(view.size());
       try {
-        if (crypto_ctx_.meta_decryptor != nullptr) {
-          UpdateDecryption(crypto_ctx_.meta_decryptor, 
encryption::kDictionaryPageHeader,
+        if (meta_decryptor_ != nullptr) {
+          UpdateDecryption(meta_decryptor_.get(), 
encryption::kDictionaryPageHeader,
                            &data_page_header_aad_);
         }
         // Reset current page header to avoid unclearing the __isset flag.
         current_page_header_ = format::PageHeader();
         deserializer.DeserializeMessage(reinterpret_cast<const 
uint8_t*>(view.data()),
                                         &header_size, &current_page_header_,
-                                        crypto_ctx_.meta_decryptor.get());
+                                        meta_decryptor_.get());
         break;
       } catch (std::exception& e) {
         // Failed to deserialize. Double the allowed page header size and try 
again
@@ -461,8 +471,8 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       continue;
     }
 
-    if (crypto_ctx_.data_decryptor != nullptr) {
-      UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
+    if (data_decryptor_ != nullptr) {
+      UpdateDecryption(data_decryptor_.get(), encryption::kDictionaryPage,
                        &data_page_aad_);
     }
 
@@ -491,13 +501,13 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     }
 
     // Decrypt it if we need to
-    if (crypto_ctx_.data_decryptor != nullptr) {
-      PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
-          crypto_ctx_.data_decryptor->PlaintextLength(compressed_len),
-          /*shrink_to_fit=*/false));
-      compressed_len = crypto_ctx_.data_decryptor->Decrypt(
-          page_buffer->span_as<uint8_t>(),
-          decryption_buffer_->mutable_span_as<uint8_t>());
+    if (data_decryptor_ != nullptr) {
+      PARQUET_THROW_NOT_OK(
+          
decryption_buffer_->Resize(data_decryptor_->PlaintextLength(compressed_len),
+                                     /*shrink_to_fit=*/false));
+      compressed_len =
+          data_decryptor_->Decrypt(page_buffer->span_as<uint8_t>(),
+                                   
decryption_buffer_->mutable_span_as<uint8_t>());
 
       page_buffer = decryption_buffer_;
     }
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index c31088c96c..80c82acd5e 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -102,20 +102,11 @@ class PARQUET_EXPORT LevelDecoder {
 };
 
 struct CryptoContext {
-  CryptoContext(bool start_with_dictionary_page, int16_t rg_ordinal, int16_t 
col_ordinal,
-                std::shared_ptr<Decryptor> meta, std::shared_ptr<Decryptor> 
data)
-      : start_decrypt_with_dictionary_page(start_with_dictionary_page),
-        row_group_ordinal(rg_ordinal),
-        column_ordinal(col_ordinal),
-        meta_decryptor(std::move(meta)),
-        data_decryptor(std::move(data)) {}
-  CryptoContext() {}
-
   bool start_decrypt_with_dictionary_page = false;
   int16_t row_group_ordinal = -1;
   int16_t column_ordinal = -1;
-  std::shared_ptr<Decryptor> meta_decryptor;
-  std::shared_ptr<Decryptor> data_decryptor;
+  std::function<std::unique_ptr<Decryptor>()> meta_decryptor_factory;
+  std::function<std::unique_ptr<Decryptor>()> data_decryptor_factory;
 };
 
 // Abstract page iterator interface. This way, we can feed column pages to the
diff --git a/cpp/src/parquet/encryption/encryption.cc 
b/cpp/src/parquet/encryption/encryption.cc
index 731120d9a6..f4031a995a 100644
--- a/cpp/src/parquet/encryption/encryption.cc
+++ b/cpp/src/parquet/encryption/encryption.cc
@@ -87,50 +87,10 @@ FileDecryptionProperties::Builder* 
FileDecryptionProperties::Builder::column_key
   if (column_decryption_properties_.size() != 0)
     throw ParquetException("Column properties already set");
 
-  for (const auto& element : column_decryption_properties) {
-    if (element.second->is_utilized()) {
-      throw ParquetException("Column properties utilized in another file");
-    }
-    element.second->set_utilized();
-  }
-
   column_decryption_properties_ = column_decryption_properties;
   return this;
 }
 
-void FileDecryptionProperties::WipeOutDecryptionKeys() {
-  footer_key_.clear();
-
-  for (const auto& element : column_decryption_properties_) {
-    element.second->WipeOutDecryptionKey();
-  }
-}
-
-bool FileDecryptionProperties::is_utilized() {
-  if (footer_key_.empty() && column_decryption_properties_.size() == 0 &&
-      aad_prefix_.empty())
-    return false;
-
-  return utilized_;
-}
-
-std::shared_ptr<FileDecryptionProperties> FileDecryptionProperties::DeepClone(
-    std::string new_aad_prefix) {
-  std::string footer_key_copy = footer_key_;
-  ColumnPathToDecryptionPropertiesMap column_decryption_properties_map_copy;
-
-  for (const auto& element : column_decryption_properties_) {
-    column_decryption_properties_map_copy.insert(
-        {element.second->column_path(), element.second->DeepClone()});
-  }
-
-  if (new_aad_prefix.empty()) new_aad_prefix = aad_prefix_;
-  return std::shared_ptr<FileDecryptionProperties>(new 
FileDecryptionProperties(
-      footer_key_copy, key_retriever_, check_plaintext_footer_integrity_, 
new_aad_prefix,
-      aad_prefix_verifier_, column_decryption_properties_map_copy,
-      plaintext_files_allowed_));
-}
-
 FileDecryptionProperties::Builder* 
FileDecryptionProperties::Builder::footer_key(
     const std::string footer_key) {
   if (footer_key.empty()) {
@@ -183,14 +143,6 @@ std::shared_ptr<ColumnDecryptionProperties> 
ColumnDecryptionProperties::Builder:
       new ColumnDecryptionProperties(column_path_, key_));
 }
 
-void ColumnDecryptionProperties::WipeOutDecryptionKey() { key_.clear(); }
-
-std::shared_ptr<ColumnDecryptionProperties> 
ColumnDecryptionProperties::DeepClone() {
-  std::string key_copy = key_;
-  return std::shared_ptr<ColumnDecryptionProperties>(
-      new ColumnDecryptionProperties(column_path_, key_copy));
-}
-
 FileEncryptionProperties::Builder* 
FileEncryptionProperties::Builder::footer_key_metadata(
     const std::string& footer_key_metadata) {
   if (footer_key_metadata.empty()) return this;
@@ -207,39 +159,10 @@ FileEncryptionProperties::Builder* 
FileEncryptionProperties::Builder::encrypted_
   if (encrypted_columns_.size() != 0)
     throw ParquetException("Column properties already set");
 
-  for (const auto& element : encrypted_columns) {
-    if (element.second->is_utilized()) {
-      throw ParquetException("Column properties utilized in another file");
-    }
-    element.second->set_utilized();
-  }
   encrypted_columns_ = encrypted_columns;
   return this;
 }
 
-void FileEncryptionProperties::WipeOutEncryptionKeys() {
-  footer_key_.clear();
-  for (const auto& element : encrypted_columns_) {
-    element.second->WipeOutEncryptionKey();
-  }
-}
-
-std::shared_ptr<FileEncryptionProperties> FileEncryptionProperties::DeepClone(
-    std::string new_aad_prefix) {
-  std::string footer_key_copy = footer_key_;
-  ColumnPathToEncryptionPropertiesMap encrypted_columns_map_copy;
-
-  for (const auto& element : encrypted_columns_) {
-    encrypted_columns_map_copy.insert(
-        {element.second->column_path(), element.second->DeepClone()});
-  }
-
-  if (new_aad_prefix.empty()) new_aad_prefix = aad_prefix_;
-  return std::shared_ptr<FileEncryptionProperties>(new 
FileEncryptionProperties(
-      algorithm_.algorithm, footer_key_copy, footer_key_metadata_, 
encrypted_footer_,
-      new_aad_prefix, store_aad_prefix_in_file_, encrypted_columns_map_copy));
-}
-
 FileEncryptionProperties::Builder* 
FileEncryptionProperties::Builder::aad_prefix(
     const std::string& aad_prefix) {
   if (aad_prefix.empty()) return this;
@@ -263,12 +186,6 @@ 
ColumnEncryptionProperties::ColumnEncryptionProperties(bool encrypted,
                                                        const std::string& key,
                                                        const std::string& 
key_metadata)
     : column_path_(column_path) {
-  // column encryption properties object (with a column key) can be used for 
writing only
-  // one file.
-  // Upon completion of file writing, the encryption keys in the properties 
will be wiped
-  // out (set to 0 in memory).
-  utilized_ = false;
-
   DCHECK(!column_path.empty());
   if (!encrypted) {
     DCHECK(key.empty() && key_metadata.empty());
@@ -291,7 +208,6 @@ ColumnEncryptionProperties::ColumnEncryptionProperties(bool 
encrypted,
 ColumnDecryptionProperties::ColumnDecryptionProperties(const std::string& 
column_path,
                                                        const std::string& key)
     : column_path_(column_path) {
-  utilized_ = false;
   DCHECK(!column_path.empty());
 
   if (!key.empty()) {
@@ -335,7 +251,6 @@ FileDecryptionProperties::FileDecryptionProperties(
   aad_prefix_ = aad_prefix;
   column_decryption_properties_ = column_decryption_properties;
   plaintext_files_allowed_ = plaintext_files_allowed;
-  utilized_ = false;
 }
 
 FileEncryptionProperties::Builder* 
FileEncryptionProperties::Builder::footer_key_id(
@@ -378,11 +293,6 @@ FileEncryptionProperties::FileEncryptionProperties(
       aad_prefix_(aad_prefix),
       store_aad_prefix_in_file_(store_aad_prefix_in_file),
       encrypted_columns_(encrypted_columns) {
-  // file encryption properties object can be used for writing only one file.
-  // Upon completion of file writing, the encryption keys in the properties 
will be wiped
-  // out (set to 0 in memory).
-  utilized_ = false;
-
   DCHECK(!footer_key.empty());
   // footer_key must be either 16, 24 or 32 bytes.
   DCHECK(footer_key.length() == 16 || footer_key.length() == 24 ||
@@ -398,12 +308,12 @@ FileEncryptionProperties::FileEncryptionProperties(
     file_aad_ = aad_file_unique_str;
   } else {
     file_aad_ = aad_prefix + aad_file_unique_str;
-    if (!store_aad_prefix_in_file) supply_aad_prefix = true;
+    if (!store_aad_prefix_in_file_) supply_aad_prefix = true;
   }
   algorithm_.algorithm = cipher;
   algorithm_.aad.aad_file_unique = aad_file_unique_str;
   algorithm_.aad.supply_aad_prefix = supply_aad_prefix;
-  if (!aad_prefix.empty() && store_aad_prefix_in_file) {
+  if (!aad_prefix.empty() && store_aad_prefix_in_file_) {
     algorithm_.aad.aad_prefix = aad_prefix;
   }
 }
diff --git a/cpp/src/parquet/encryption/encryption.h 
b/cpp/src/parquet/encryption/encryption.h
index 1ddef9e823..6604e32991 100644
--- a/cpp/src/parquet/encryption/encryption.h
+++ b/cpp/src/parquet/encryption/encryption.h
@@ -112,8 +112,6 @@ class PARQUET_EXPORT ColumnEncryptionProperties {
     /// If key is not set on an encrypted column, the column will
     /// be encrypted with the footer key.
     /// keyBytes Key length must be either 16, 24 or 32 bytes.
-    /// The key is cloned, and will be wiped out (array values set to 0) upon 
completion
-    /// of file writing.
     /// Caller is responsible for wiping out the input key array.
     Builder* key(std::string column_key);
 
@@ -148,39 +146,18 @@ class PARQUET_EXPORT ColumnEncryptionProperties {
   std::string key() const { return key_; }
   std::string key_metadata() const { return key_metadata_; }
 
-  /// Upon completion of file writing, the encryption key
-  /// will be wiped out.
-  void WipeOutEncryptionKey() { key_.clear(); }
-
-  bool is_utilized() {
-    if (key_.empty())
-      return false;  // can re-use column properties without encryption keys
-    return utilized_;
-  }
-
-  /// ColumnEncryptionProperties object can be used for writing one file only.
-  /// Mark ColumnEncryptionProperties as utilized once it is used in
-  /// FileEncryptionProperties as the encryption key will be wiped out upon
-  /// completion of file writing.
-  void set_utilized() { utilized_ = true; }
-
-  std::shared_ptr<ColumnEncryptionProperties> DeepClone() {
-    std::string key_copy = key_;
-    return std::shared_ptr<ColumnEncryptionProperties>(new 
ColumnEncryptionProperties(
-        encrypted_, column_path_, key_copy, key_metadata_));
-  }
-
   ColumnEncryptionProperties() = default;
   ColumnEncryptionProperties(const ColumnEncryptionProperties& other) = 
default;
   ColumnEncryptionProperties(ColumnEncryptionProperties&& other) = default;
 
+  ~ColumnEncryptionProperties() { key_.clear(); }
+
  private:
   const std::string column_path_;
   bool encrypted_;
   bool encrypted_with_footer_key_;
   std::string key_;
   std::string key_metadata_;
-  bool utilized_;
   explicit ColumnEncryptionProperties(bool encrypted, const std::string& 
column_path,
                                       const std::string& key,
                                       const std::string& key_metadata);
@@ -212,26 +189,14 @@ class PARQUET_EXPORT ColumnDecryptionProperties {
   ColumnDecryptionProperties(const ColumnDecryptionProperties& other) = 
default;
   ColumnDecryptionProperties(ColumnDecryptionProperties&& other) = default;
 
+  ~ColumnDecryptionProperties() { key_.clear(); }
+
   std::string column_path() const { return column_path_; }
   std::string key() const { return key_; }
-  bool is_utilized() { return utilized_; }
-
-  /// ColumnDecryptionProperties object can be used for reading one file only.
-  /// Mark ColumnDecryptionProperties as utilized once it is used in
-  /// FileDecryptionProperties as the encryption key will be wiped out upon
-  /// completion of file reading.
-  void set_utilized() { utilized_ = true; }
-
-  /// Upon completion of file reading, the encryption key
-  /// will be wiped out.
-  void WipeOutDecryptionKey();
-
-  std::shared_ptr<ColumnDecryptionProperties> DeepClone();
 
  private:
   const std::string column_path_;
   std::string key_;
-  bool utilized_;
 
   /// This class is only required for setting explicit column decryption keys -
   /// to override key retriever (or to provide keys when key metadata and/or
@@ -339,6 +304,8 @@ class PARQUET_EXPORT FileDecryptionProperties {
     bool plaintext_files_allowed_;
   };
 
+  ~FileDecryptionProperties() { footer_key_.clear(); }
+
   std::string column_key(const std::string& column_path) const;
 
   std::string footer_key() const { return footer_key_; }
@@ -359,26 +326,6 @@ class PARQUET_EXPORT FileDecryptionProperties {
     return aad_prefix_verifier_;
   }
 
-  /// Upon completion of file reading, the encryption keys in the properties
-  /// will be wiped out (array values set to 0).
-  void WipeOutDecryptionKeys();
-
-  bool is_utilized();
-
-  /// FileDecryptionProperties object can be used for reading one file only.
-  /// Mark FileDecryptionProperties as utilized once it is used to read a file 
as the
-  /// encryption keys will be wiped out upon completion of file reading.
-  void set_utilized() { utilized_ = true; }
-
-  /// FileDecryptionProperties object can be used for reading one file only.
-  /// (unless this object keeps the keyRetrieval callback only, and no explicit
-  /// keys or aadPrefix).
-  /// At the end, keys are wiped out in the memory.
-  /// This method allows to clone identical properties for another file,
-  /// with an option to update the aadPrefix (if newAadPrefix is null,
-  /// aadPrefix will be cloned too)
-  std::shared_ptr<FileDecryptionProperties> DeepClone(std::string 
new_aad_prefix = "");
-
  private:
   std::string footer_key_;
   std::string aad_prefix_;
@@ -390,7 +337,6 @@ class PARQUET_EXPORT FileDecryptionProperties {
   std::shared_ptr<DecryptionKeyRetriever> key_retriever_;
   bool check_plaintext_footer_integrity_;
   bool plaintext_files_allowed_;
-  bool utilized_;
 
   FileDecryptionProperties(
       const std::string& footer_key,
@@ -463,6 +409,9 @@ class PARQUET_EXPORT FileEncryptionProperties {
     bool store_aad_prefix_in_file_;
     ColumnPathToEncryptionPropertiesMap encrypted_columns_;
   };
+
+  ~FileEncryptionProperties() { footer_key_.clear(); }
+
   bool encrypted_footer() const { return encrypted_footer_; }
 
   EncryptionAlgorithm algorithm() const { return algorithm_; }
@@ -476,24 +425,6 @@ class PARQUET_EXPORT FileEncryptionProperties {
   std::shared_ptr<ColumnEncryptionProperties> column_encryption_properties(
       const std::string& column_path);
 
-  bool is_utilized() const { return utilized_; }
-
-  /// FileEncryptionProperties object can be used for writing one file only.
-  /// Mark FileEncryptionProperties as utilized once it is used to write a 
file as the
-  /// encryption keys will be wiped out upon completion of file writing.
-  void set_utilized() { utilized_ = true; }
-
-  /// Upon completion of file writing, the encryption keys
-  /// will be wiped out (array values set to 0).
-  void WipeOutEncryptionKeys();
-
-  /// FileEncryptionProperties object can be used for writing one file only.
-  /// (at the end, keys are wiped out in the memory).
-  /// This method allows to clone identical properties for another file,
-  /// with an option to update the aadPrefix (if newAadPrefix is null,
-  /// aadPrefix will be cloned too)
-  std::shared_ptr<FileEncryptionProperties> DeepClone(std::string 
new_aad_prefix = "");
-
   ColumnPathToEncryptionPropertiesMap encrypted_columns() const {
     return encrypted_columns_;
   }
@@ -505,7 +436,6 @@ class PARQUET_EXPORT FileEncryptionProperties {
   bool encrypted_footer_;
   std::string file_aad_;
   std::string aad_prefix_;
-  bool utilized_;
   bool store_aad_prefix_in_file_;
   ColumnPathToEncryptionPropertiesMap encrypted_columns_;
 
diff --git a/cpp/src/parquet/encryption/encryption_internal.cc 
b/cpp/src/parquet/encryption/encryption_internal.cc
index 31cad130a1..657198f170 100644
--- a/cpp/src/parquet/encryption/encryption_internal.cc
+++ b/cpp/src/parquet/encryption/encryption_internal.cc
@@ -52,25 +52,68 @@ constexpr int32_t kBufferSizeLength = 4;
     throw ParquetException("Couldn't init ALG decryption");           \
   }
 
-class AesEncryptor::AesEncryptorImpl {
+class AesCryptoContext {
+ public:
+  AesCryptoContext(ParquetCipher::type alg_id, int32_t key_len, bool metadata,
+                   bool include_length) {
+    openssl::EnsureInitialized();
+
+    length_buffer_length_ = include_length ? kBufferSizeLength : 0;
+    ciphertext_size_delta_ = length_buffer_length_ + kNonceLength;
+
+    if (ParquetCipher::AES_GCM_V1 != alg_id && ParquetCipher::AES_GCM_CTR_V1 
!= alg_id) {
+      std::stringstream ss;
+      ss << "Crypto algorithm " << alg_id << " is not supported";
+      throw ParquetException(ss.str());
+    }
+    if (16 != key_len && 24 != key_len && 32 != key_len) {
+      std::stringstream ss;
+      ss << "Wrong key length: " << key_len;
+      throw ParquetException(ss.str());
+    }
+
+    if (metadata || (ParquetCipher::AES_GCM_V1 == alg_id)) {
+      aes_mode_ = kGcmMode;
+      ciphertext_size_delta_ += kGcmTagLength;
+    } else {
+      aes_mode_ = kCtrMode;
+    }
+
+    key_length_ = key_len;
+  }
+
+  virtual ~AesCryptoContext() = default;
+
+ protected:
+  static void DeleteCipherContext(EVP_CIPHER_CTX* ctx) { 
EVP_CIPHER_CTX_free(ctx); }
+
+  using CipherContext = std::unique_ptr<EVP_CIPHER_CTX, 
decltype(&DeleteCipherContext)>;
+
+  static CipherContext NewCipherContext() {
+    auto ctx = CipherContext(EVP_CIPHER_CTX_new(), DeleteCipherContext);
+    if (!ctx) {
+      throw ParquetException("Couldn't init cipher context");
+    }
+    return ctx;
+  }
+
+  int32_t aes_mode_;
+  int32_t key_length_;
+  int32_t ciphertext_size_delta_;
+  int32_t length_buffer_length_;
+};
+
+class AesEncryptor::AesEncryptorImpl : public AesCryptoContext {
  public:
   explicit AesEncryptorImpl(ParquetCipher::type alg_id, int32_t key_len, bool 
metadata,
                             bool write_length);
 
-  ~AesEncryptorImpl() { WipeOut(); }
-
   int32_t Encrypt(span<const uint8_t> plaintext, span<const uint8_t> key,
                   span<const uint8_t> aad, span<uint8_t> ciphertext);
 
   int32_t SignedFooterEncrypt(span<const uint8_t> footer, span<const uint8_t> 
key,
                               span<const uint8_t> aad, span<const uint8_t> 
nonce,
                               span<uint8_t> encrypted_footer);
-  void WipeOut() {
-    if (nullptr != ctx_) {
-      EVP_CIPHER_CTX_free(ctx_);
-      ctx_ = nullptr;
-    }
-  }
 
   [[nodiscard]] int32_t CiphertextLength(int64_t plaintext_len) const {
     if (plaintext_len < 0) {
@@ -89,17 +132,7 @@ class AesEncryptor::AesEncryptorImpl {
   }
 
  private:
-  void CheckValid() const {
-    if (ctx_ == nullptr) {
-      throw ParquetException("AesEncryptor was wiped out");
-    }
-  }
-
-  EVP_CIPHER_CTX* ctx_;
-  int32_t aes_mode_;
-  int32_t key_length_;
-  int32_t ciphertext_size_delta_;
-  int32_t length_buffer_length_;
+  [[nodiscard]] CipherContext MakeCipherContext() const;
 
   int32_t GcmEncrypt(span<const uint8_t> plaintext, span<const uint8_t> key,
                      span<const uint8_t> nonce, span<const uint8_t> aad,
@@ -111,59 +144,37 @@ class AesEncryptor::AesEncryptorImpl {
 
 AesEncryptor::AesEncryptorImpl::AesEncryptorImpl(ParquetCipher::type alg_id,
                                                  int32_t key_len, bool 
metadata,
-                                                 bool write_length) {
-  openssl::EnsureInitialized();
-
-  ctx_ = nullptr;
-
-  length_buffer_length_ = write_length ? kBufferSizeLength : 0;
-  ciphertext_size_delta_ = length_buffer_length_ + kNonceLength;
-  if (metadata || (ParquetCipher::AES_GCM_V1 == alg_id)) {
-    aes_mode_ = kGcmMode;
-    ciphertext_size_delta_ += kGcmTagLength;
-  } else {
-    aes_mode_ = kCtrMode;
-  }
-
-  if (16 != key_len && 24 != key_len && 32 != key_len) {
-    std::stringstream ss;
-    ss << "Wrong key length: " << key_len;
-    throw ParquetException(ss.str());
-  }
-
-  key_length_ = key_len;
-
-  ctx_ = EVP_CIPHER_CTX_new();
-  if (nullptr == ctx_) {
-    throw ParquetException("Couldn't init cipher context");
-  }
+                                                 bool write_length)
+    : AesCryptoContext(alg_id, key_len, metadata, write_length) {}
 
+AesCryptoContext::CipherContext 
AesEncryptor::AesEncryptorImpl::MakeCipherContext()
+    const {
+  auto ctx = NewCipherContext();
   if (kGcmMode == aes_mode_) {
     // Init AES-GCM with specified key length
-    if (16 == key_len) {
-      ENCRYPT_INIT(ctx_, EVP_aes_128_gcm());
-    } else if (24 == key_len) {
-      ENCRYPT_INIT(ctx_, EVP_aes_192_gcm());
-    } else if (32 == key_len) {
-      ENCRYPT_INIT(ctx_, EVP_aes_256_gcm());
+    if (16 == key_length_) {
+      ENCRYPT_INIT(ctx.get(), EVP_aes_128_gcm());
+    } else if (24 == key_length_) {
+      ENCRYPT_INIT(ctx.get(), EVP_aes_192_gcm());
+    } else if (32 == key_length_) {
+      ENCRYPT_INIT(ctx.get(), EVP_aes_256_gcm());
     }
   } else {
     // Init AES-CTR with specified key length
-    if (16 == key_len) {
-      ENCRYPT_INIT(ctx_, EVP_aes_128_ctr());
-    } else if (24 == key_len) {
-      ENCRYPT_INIT(ctx_, EVP_aes_192_ctr());
-    } else if (32 == key_len) {
-      ENCRYPT_INIT(ctx_, EVP_aes_256_ctr());
+    if (16 == key_length_) {
+      ENCRYPT_INIT(ctx.get(), EVP_aes_128_ctr());
+    } else if (24 == key_length_) {
+      ENCRYPT_INIT(ctx.get(), EVP_aes_192_ctr());
+    } else if (32 == key_length_) {
+      ENCRYPT_INIT(ctx.get(), EVP_aes_256_ctr());
     }
   }
+  return ctx;
 }
 
 int32_t AesEncryptor::AesEncryptorImpl::SignedFooterEncrypt(
     span<const uint8_t> footer, span<const uint8_t> key, span<const uint8_t> 
aad,
     span<const uint8_t> nonce, span<uint8_t> encrypted_footer) {
-  CheckValid();
-
   if (static_cast<size_t>(key_length_) != key.size()) {
     std::stringstream ss;
     ss << "Wrong key length " << key.size() << ". Should be " << key_length_;
@@ -188,8 +199,6 @@ int32_t AesEncryptor::AesEncryptorImpl::Encrypt(span<const 
uint8_t> plaintext,
                                                 span<const uint8_t> key,
                                                 span<const uint8_t> aad,
                                                 span<uint8_t> ciphertext) {
-  CheckValid();
-
   if (static_cast<size_t>(key_length_) != key.size()) {
     std::stringstream ss;
     ss << "Wrong key length " << key.size() << ". Should be " << key_length_;
@@ -231,8 +240,10 @@ int32_t 
AesEncryptor::AesEncryptorImpl::GcmEncrypt(span<const uint8_t> plaintext
     throw ParquetException(ss.str());
   }
 
+  auto ctx = MakeCipherContext();
+
   // Setting key and IV (nonce)
-  if (1 != EVP_EncryptInit_ex(ctx_, nullptr, nullptr, key.data(), 
nonce.data())) {
+  if (1 != EVP_EncryptInit_ex(ctx.get(), nullptr, nullptr, key.data(), 
nonce.data())) {
     throw ParquetException("Couldn't set key and nonce");
   }
 
@@ -242,7 +253,7 @@ int32_t 
AesEncryptor::AesEncryptorImpl::GcmEncrypt(span<const uint8_t> plaintext
     ss << "AAD size " << aad.size() << " overflows int";
     throw ParquetException(ss.str());
   }
-  if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, 
aad.data(),
+  if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx.get(), nullptr, &len, 
aad.data(),
                                                 
static_cast<int>(aad.size())))) {
     throw ParquetException("Couldn't set AAD");
   }
@@ -253,25 +264,26 @@ int32_t 
AesEncryptor::AesEncryptorImpl::GcmEncrypt(span<const uint8_t> plaintext
     ss << "Plaintext size " << plaintext.size() << " overflows int";
     throw ParquetException(ss.str());
   }
-  if (1 !=
-      EVP_EncryptUpdate(ctx_, ciphertext.data() + length_buffer_length_ + 
kNonceLength,
-                        &len, plaintext.data(), 
static_cast<int>(plaintext.size()))) {
+  if (1 != EVP_EncryptUpdate(
+               ctx.get(), ciphertext.data() + length_buffer_length_ + 
kNonceLength, &len,
+               plaintext.data(), static_cast<int>(plaintext.size()))) {
     throw ParquetException("Failed encryption update");
   }
 
   ciphertext_len = len;
 
   // Finalization
-  if (1 !=
-      EVP_EncryptFinal_ex(
-          ctx_, ciphertext.data() + length_buffer_length_ + kNonceLength + 
len, &len)) {
+  if (1 != EVP_EncryptFinal_ex(
+               ctx.get(), ciphertext.data() + length_buffer_length_ + 
kNonceLength + len,
+               &len)) {
     throw ParquetException("Failed encryption finalization");
   }
 
   ciphertext_len += len;
 
   // Getting the tag
-  if (1 != EVP_CIPHER_CTX_ctrl(ctx_, EVP_CTRL_GCM_GET_TAG, kGcmTagLength, 
tag.data())) {
+  if (1 !=
+      EVP_CIPHER_CTX_ctrl(ctx.get(), EVP_CTRL_GCM_GET_TAG, kGcmTagLength, 
tag.data())) {
     throw ParquetException("Couldn't get AES-GCM tag");
   }
 
@@ -312,8 +324,10 @@ int32_t 
AesEncryptor::AesEncryptorImpl::CtrEncrypt(span<const uint8_t> plaintext
   std::copy(nonce.begin(), nonce.begin() + kNonceLength, iv.begin());
   iv[kCtrIvLength - 1] = 1;
 
+  auto ctx = MakeCipherContext();
+
   // Setting key and IV
-  if (1 != EVP_EncryptInit_ex(ctx_, nullptr, nullptr, key.data(), iv.data())) {
+  if (1 != EVP_EncryptInit_ex(ctx.get(), nullptr, nullptr, key.data(), 
iv.data())) {
     throw ParquetException("Couldn't set key and IV");
   }
 
@@ -323,18 +337,18 @@ int32_t 
AesEncryptor::AesEncryptorImpl::CtrEncrypt(span<const uint8_t> plaintext
     ss << "Plaintext size " << plaintext.size() << " overflows int";
     throw ParquetException(ss.str());
   }
-  if (1 !=
-      EVP_EncryptUpdate(ctx_, ciphertext.data() + length_buffer_length_ + 
kNonceLength,
-                        &len, plaintext.data(), 
static_cast<int>(plaintext.size()))) {
+  if (1 != EVP_EncryptUpdate(
+               ctx.get(), ciphertext.data() + length_buffer_length_ + 
kNonceLength, &len,
+               plaintext.data(), static_cast<int>(plaintext.size()))) {
     throw ParquetException("Failed encryption update");
   }
 
   ciphertext_len = len;
 
   // Finalization
-  if (1 !=
-      EVP_EncryptFinal_ex(
-          ctx_, ciphertext.data() + length_buffer_length_ + kNonceLength + 
len, &len)) {
+  if (1 != EVP_EncryptFinal_ex(
+               ctx.get(), ciphertext.data() + length_buffer_length_ + 
kNonceLength + len,
+               &len)) {
     throw ParquetException("Failed encryption finalization");
   }
 
@@ -354,7 +368,7 @@ int32_t 
AesEncryptor::AesEncryptorImpl::CtrEncrypt(span<const uint8_t> plaintext
   return length_buffer_length_ + buffer_size;
 }
 
-AesEncryptor::~AesEncryptor() {}
+AesEncryptor::~AesEncryptor() = default;
 
 int32_t AesEncryptor::SignedFooterEncrypt(span<const uint8_t> footer,
                                           span<const uint8_t> key,
@@ -364,8 +378,6 @@ int32_t AesEncryptor::SignedFooterEncrypt(span<const 
uint8_t> footer,
   return impl_->SignedFooterEncrypt(footer, key, aad, nonce, encrypted_footer);
 }
 
-void AesEncryptor::WipeOut() { impl_->WipeOut(); }
-
 int32_t AesEncryptor::CiphertextLength(int64_t plaintext_len) const {
   return impl_->CiphertextLength(plaintext_len);
 }
@@ -380,23 +392,14 @@ AesEncryptor::AesEncryptor(ParquetCipher::type alg_id, 
int32_t key_len, bool met
     : impl_{std::unique_ptr<AesEncryptorImpl>(
           new AesEncryptorImpl(alg_id, key_len, metadata, write_length))} {}
 
-class AesDecryptor::AesDecryptorImpl {
+class AesDecryptor::AesDecryptorImpl : AesCryptoContext {
  public:
   explicit AesDecryptorImpl(ParquetCipher::type alg_id, int32_t key_len, bool 
metadata,
                             bool contains_length);
 
-  ~AesDecryptorImpl() { WipeOut(); }
-
   int32_t Decrypt(span<const uint8_t> ciphertext, span<const uint8_t> key,
                   span<const uint8_t> aad, span<uint8_t> plaintext);
 
-  void WipeOut() {
-    if (nullptr != ctx_) {
-      EVP_CIPHER_CTX_free(ctx_);
-      ctx_ = nullptr;
-    }
-  }
-
   [[nodiscard]] int32_t PlaintextLength(int32_t ciphertext_len) const {
     if (ciphertext_len < ciphertext_size_delta_) {
       std::stringstream ss;
@@ -423,17 +426,7 @@ class AesDecryptor::AesDecryptorImpl {
   }
 
  private:
-  void CheckValid() const {
-    if (ctx_ == nullptr) {
-      throw ParquetException("AesDecryptor was wiped out");
-    }
-  }
-
-  EVP_CIPHER_CTX* ctx_;
-  int32_t aes_mode_;
-  int32_t key_length_;
-  int32_t ciphertext_size_delta_;
-  int32_t length_buffer_length_;
+  [[nodiscard]] CipherContext MakeCipherContext() const;
 
   /// Get the actual ciphertext length, inclusive of the length buffer length,
   /// and validate that the provided buffer size is large enough.
@@ -451,95 +444,52 @@ int32_t AesDecryptor::Decrypt(span<const uint8_t> 
ciphertext, span<const uint8_t
   return impl_->Decrypt(ciphertext, key, aad, plaintext);
 }
 
-void AesDecryptor::WipeOut() { impl_->WipeOut(); }
-
 AesDecryptor::~AesDecryptor() {}
 
 AesDecryptor::AesDecryptorImpl::AesDecryptorImpl(ParquetCipher::type alg_id,
                                                  int32_t key_len, bool 
metadata,
-                                                 bool contains_length) {
-  openssl::EnsureInitialized();
-
-  ctx_ = nullptr;
-  length_buffer_length_ = contains_length ? kBufferSizeLength : 0;
-  ciphertext_size_delta_ = length_buffer_length_ + kNonceLength;
-  if (metadata || (ParquetCipher::AES_GCM_V1 == alg_id)) {
-    aes_mode_ = kGcmMode;
-    ciphertext_size_delta_ += kGcmTagLength;
-  } else {
-    aes_mode_ = kCtrMode;
-  }
-
-  if (16 != key_len && 24 != key_len && 32 != key_len) {
-    std::stringstream ss;
-    ss << "Wrong key length: " << key_len;
-    throw ParquetException(ss.str());
-  }
-
-  key_length_ = key_len;
-
-  ctx_ = EVP_CIPHER_CTX_new();
-  if (nullptr == ctx_) {
-    throw ParquetException("Couldn't init cipher context");
-  }
+                                                 bool contains_length)
+    : AesCryptoContext(alg_id, key_len, metadata, contains_length) {}
 
+AesCryptoContext::CipherContext 
AesDecryptor::AesDecryptorImpl::MakeCipherContext()
+    const {
+  auto ctx = NewCipherContext();
   if (kGcmMode == aes_mode_) {
     // Init AES-GCM with specified key length
-    if (16 == key_len) {
-      DECRYPT_INIT(ctx_, EVP_aes_128_gcm());
-    } else if (24 == key_len) {
-      DECRYPT_INIT(ctx_, EVP_aes_192_gcm());
-    } else if (32 == key_len) {
-      DECRYPT_INIT(ctx_, EVP_aes_256_gcm());
+    if (16 == key_length_) {
+      DECRYPT_INIT(ctx.get(), EVP_aes_128_gcm());
+    } else if (24 == key_length_) {
+      DECRYPT_INIT(ctx.get(), EVP_aes_192_gcm());
+    } else if (32 == key_length_) {
+      DECRYPT_INIT(ctx.get(), EVP_aes_256_gcm());
     }
   } else {
     // Init AES-CTR with specified key length
-    if (16 == key_len) {
-      DECRYPT_INIT(ctx_, EVP_aes_128_ctr());
-    } else if (24 == key_len) {
-      DECRYPT_INIT(ctx_, EVP_aes_192_ctr());
-    } else if (32 == key_len) {
-      DECRYPT_INIT(ctx_, EVP_aes_256_ctr());
+    if (16 == key_length_) {
+      DECRYPT_INIT(ctx.get(), EVP_aes_128_ctr());
+    } else if (24 == key_length_) {
+      DECRYPT_INIT(ctx.get(), EVP_aes_192_ctr());
+    } else if (32 == key_length_) {
+      DECRYPT_INIT(ctx.get(), EVP_aes_256_ctr());
     }
   }
-}
-
-std::unique_ptr<AesEncryptor> AesEncryptor::Make(ParquetCipher::type alg_id,
-                                                 int32_t key_len, bool 
metadata) {
-  return Make(alg_id, key_len, metadata, true /*write_length*/);
+  return ctx;
 }
 
 std::unique_ptr<AesEncryptor> AesEncryptor::Make(ParquetCipher::type alg_id,
                                                  int32_t key_len, bool 
metadata,
                                                  bool write_length) {
-  if (ParquetCipher::AES_GCM_V1 != alg_id && ParquetCipher::AES_GCM_CTR_V1 != 
alg_id) {
-    std::stringstream ss;
-    ss << "Crypto algorithm " << alg_id << " is not supported";
-    throw ParquetException(ss.str());
-  }
-
   return std::make_unique<AesEncryptor>(alg_id, key_len, metadata, 
write_length);
 }
 
 AesDecryptor::AesDecryptor(ParquetCipher::type alg_id, int32_t key_len, bool 
metadata,
                            bool contains_length)
-    : impl_{std::unique_ptr<AesDecryptorImpl>(
-          new AesDecryptorImpl(alg_id, key_len, metadata, contains_length))} {}
-
-std::shared_ptr<AesDecryptor> AesDecryptor::Make(
-    ParquetCipher::type alg_id, int32_t key_len, bool metadata,
-    std::vector<std::weak_ptr<AesDecryptor>>* all_decryptors) {
-  if (ParquetCipher::AES_GCM_V1 != alg_id && ParquetCipher::AES_GCM_CTR_V1 != 
alg_id) {
-    std::stringstream ss;
-    ss << "Crypto algorithm " << alg_id << " is not supported";
-    throw ParquetException(ss.str());
-  }
+    : impl_{std::make_unique<AesDecryptorImpl>(alg_id, key_len, metadata,
+                                               contains_length)} {}
 
-  auto decryptor = std::make_shared<AesDecryptor>(alg_id, key_len, metadata);
-  if (all_decryptors != nullptr) {
-    all_decryptors->push_back(decryptor);
-  }
-  return decryptor;
+std::unique_ptr<AesDecryptor> AesDecryptor::Make(ParquetCipher::type alg_id,
+                                                 int32_t key_len, bool 
metadata) {
+  return std::make_unique<AesDecryptor>(alg_id, key_len, metadata);
 }
 
 int32_t AesDecryptor::PlaintextLength(int32_t ciphertext_len) const {
@@ -628,8 +578,10 @@ int32_t 
AesDecryptor::AesDecryptorImpl::GcmDecrypt(span<const uint8_t> ciphertex
   std::copy(ciphertext.begin() + ciphertext_len - kGcmTagLength,
             ciphertext.begin() + ciphertext_len, tag.begin());
 
+  auto ctx = MakeCipherContext();
+
   // Setting key and IV
-  if (1 != EVP_DecryptInit_ex(ctx_, nullptr, nullptr, key.data(), 
nonce.data())) {
+  if (1 != EVP_DecryptInit_ex(ctx.get(), nullptr, nullptr, key.data(), 
nonce.data())) {
     throw ParquetException("Couldn't set key and IV");
   }
 
@@ -639,7 +591,7 @@ int32_t 
AesDecryptor::AesDecryptorImpl::GcmDecrypt(span<const uint8_t> ciphertex
     ss << "AAD size " << aad.size() << " overflows int";
     throw ParquetException(ss.str());
   }
-  if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx_, nullptr, &len, 
aad.data(),
+  if ((!aad.empty()) && (1 != EVP_DecryptUpdate(ctx.get(), nullptr, &len, 
aad.data(),
                                                 
static_cast<int>(aad.size())))) {
     throw ParquetException("Couldn't set AAD");
   }
@@ -647,7 +599,7 @@ int32_t 
AesDecryptor::AesDecryptorImpl::GcmDecrypt(span<const uint8_t> ciphertex
   // Decryption
   int decryption_length =
       ciphertext_len - length_buffer_length_ - kNonceLength - kGcmTagLength;
-  if (!EVP_DecryptUpdate(ctx_, plaintext.data(), &len,
+  if (!EVP_DecryptUpdate(ctx.get(), plaintext.data(), &len,
                          ciphertext.data() + length_buffer_length_ + 
kNonceLength,
                          decryption_length)) {
     throw ParquetException("Failed decryption update");
@@ -656,12 +608,12 @@ int32_t 
AesDecryptor::AesDecryptorImpl::GcmDecrypt(span<const uint8_t> ciphertex
   plaintext_len = len;
 
   // Checking the tag (authentication)
-  if (!EVP_CIPHER_CTX_ctrl(ctx_, EVP_CTRL_GCM_SET_TAG, kGcmTagLength, 
tag.data())) {
+  if (!EVP_CIPHER_CTX_ctrl(ctx.get(), EVP_CTRL_GCM_SET_TAG, kGcmTagLength, 
tag.data())) {
     throw ParquetException("Failed authentication");
   }
 
   // Finalization
-  if (1 != EVP_DecryptFinal_ex(ctx_, plaintext.data() + len, &len)) {
+  if (1 != EVP_DecryptFinal_ex(ctx.get(), plaintext.data() + len, &len)) {
     throw ParquetException("Failed decryption finalization");
   }
 
@@ -702,14 +654,16 @@ int32_t 
AesDecryptor::AesDecryptorImpl::CtrDecrypt(span<const uint8_t> ciphertex
   // is set to 1.
   iv[kCtrIvLength - 1] = 1;
 
+  auto ctx = MakeCipherContext();
+
   // Setting key and IV
-  if (1 != EVP_DecryptInit_ex(ctx_, nullptr, nullptr, key.data(), iv.data())) {
+  if (1 != EVP_DecryptInit_ex(ctx.get(), nullptr, nullptr, key.data(), 
iv.data())) {
     throw ParquetException("Couldn't set key and IV");
   }
 
   // Decryption
   int decryption_length = ciphertext_len - length_buffer_length_ - 
kNonceLength;
-  if (!EVP_DecryptUpdate(ctx_, plaintext.data(), &len,
+  if (!EVP_DecryptUpdate(ctx.get(), plaintext.data(), &len,
                          ciphertext.data() + length_buffer_length_ + 
kNonceLength,
                          decryption_length)) {
     throw ParquetException("Failed decryption update");
@@ -718,7 +672,7 @@ int32_t 
AesDecryptor::AesDecryptorImpl::CtrDecrypt(span<const uint8_t> ciphertex
   plaintext_len = len;
 
   // Finalization
-  if (1 != EVP_DecryptFinal_ex(ctx_, plaintext.data() + len, &len)) {
+  if (1 != EVP_DecryptFinal_ex(ctx.get(), plaintext.data() + len, &len)) {
     throw ParquetException("Failed decryption finalization");
   }
 
@@ -730,8 +684,6 @@ int32_t AesDecryptor::AesDecryptorImpl::Decrypt(span<const 
uint8_t> ciphertext,
                                                 span<const uint8_t> key,
                                                 span<const uint8_t> aad,
                                                 span<uint8_t> plaintext) {
-  CheckValid();
-
   if (static_cast<size_t>(key_length_) != key.size()) {
     std::stringstream ss;
     ss << "Wrong key length " << key.size() << ". Should be " << key_length_;
diff --git a/cpp/src/parquet/encryption/encryption_internal.h 
b/cpp/src/parquet/encryption/encryption_internal.h
index d79ff56ad4..0625274956 100644
--- a/cpp/src/parquet/encryption/encryption_internal.h
+++ b/cpp/src/parquet/encryption/encryption_internal.h
@@ -53,10 +53,7 @@ class PARQUET_EXPORT AesEncryptor {
                         bool write_length = true);
 
   static std::unique_ptr<AesEncryptor> Make(ParquetCipher::type alg_id, 
int32_t key_len,
-                                            bool metadata);
-
-  static std::unique_ptr<AesEncryptor> Make(ParquetCipher::type alg_id, 
int32_t key_len,
-                                            bool metadata, bool write_length);
+                                            bool metadata, bool write_length = 
true);
 
   ~AesEncryptor();
 
@@ -77,8 +74,6 @@ class PARQUET_EXPORT AesEncryptor {
                               ::arrow::util::span<const uint8_t> nonce,
                               ::arrow::util::span<uint8_t> encrypted_footer);
 
-  void WipeOut();
-
  private:
   // PIMPL Idiom
   class AesEncryptorImpl;
@@ -88,25 +83,19 @@ class PARQUET_EXPORT AesEncryptor {
 /// Performs AES decryption operations with GCM or CTR ciphers.
 class PARQUET_EXPORT AesDecryptor {
  public:
-  /// Can serve one key length only. Possible values: 16, 24, 32 bytes.
-  /// If contains_length is true, expect ciphertext length prepended to the 
ciphertext
-  explicit AesDecryptor(ParquetCipher::type alg_id, int32_t key_len, bool 
metadata,
-                        bool contains_length = true);
-
-  /// \brief Factory function to create an AesDecryptor
+  /// \brief Construct an AesDecryptor
   ///
   /// \param alg_id the encryption algorithm to use
   /// \param key_len key length. Possible values: 16, 24, 32 bytes.
   /// \param metadata if true then this is a metadata decryptor
-  /// \param all_decryptors A weak reference to all decryptors that need to be 
wiped
-  /// out when decryption is finished
-  /// \return shared pointer to a new AesDecryptor
-  static std::shared_ptr<AesDecryptor> Make(
-      ParquetCipher::type alg_id, int32_t key_len, bool metadata,
-      std::vector<std::weak_ptr<AesDecryptor>>* all_decryptors);
+  /// \param contains_length if true, expect ciphertext length prepended to 
the ciphertext
+  explicit AesDecryptor(ParquetCipher::type alg_id, int32_t key_len, bool 
metadata,
+                        bool contains_length = true);
+
+  static std::unique_ptr<AesDecryptor> Make(ParquetCipher::type alg_id, 
int32_t key_len,
+                                            bool metadata);
 
   ~AesDecryptor();
-  void WipeOut();
 
   /// The size of the plaintext, for this cipher and the specified ciphertext 
length.
   [[nodiscard]] int32_t PlaintextLength(int32_t ciphertext_len) const;
diff --git a/cpp/src/parquet/encryption/encryption_internal_nossl.cc 
b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
index 2a8162ed39..2448d9efa9 100644
--- a/cpp/src/parquet/encryption/encryption_internal_nossl.cc
+++ b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
@@ -38,8 +38,6 @@ int32_t 
AesEncryptor::SignedFooterEncrypt(::arrow::util::span<const uint8_t> foo
   return -1;
 }
 
-void AesEncryptor::WipeOut() { ThrowOpenSSLRequiredException(); }
-
 int32_t AesEncryptor::CiphertextLength(int64_t plaintext_len) const {
   ThrowOpenSSLRequiredException();
   return -1;
@@ -68,16 +66,8 @@ int32_t AesDecryptor::Decrypt(::arrow::util::span<const 
uint8_t> ciphertext,
   return -1;
 }
 
-void AesDecryptor::WipeOut() { ThrowOpenSSLRequiredException(); }
-
 AesDecryptor::~AesDecryptor() {}
 
-std::unique_ptr<AesEncryptor> AesEncryptor::Make(ParquetCipher::type alg_id,
-                                                 int32_t key_len, bool 
metadata) {
-  ThrowOpenSSLRequiredException();
-  return NULLPTR;
-}
-
 std::unique_ptr<AesEncryptor> AesEncryptor::Make(ParquetCipher::type alg_id,
                                                  int32_t key_len, bool 
metadata,
                                                  bool write_length) {
@@ -90,9 +80,8 @@ AesDecryptor::AesDecryptor(ParquetCipher::type alg_id, 
int32_t key_len, bool met
   ThrowOpenSSLRequiredException();
 }
 
-std::shared_ptr<AesDecryptor> AesDecryptor::Make(
-    ParquetCipher::type alg_id, int32_t key_len, bool metadata,
-    std::vector<std::weak_ptr<AesDecryptor>>* all_decryptors) {
+std::unique_ptr<AesDecryptor> AesDecryptor::Make(ParquetCipher::type alg_id,
+                                                 int32_t key_len, bool 
metadata) {
   ThrowOpenSSLRequiredException();
   return NULLPTR;
 }
diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.cc 
b/cpp/src/parquet/encryption/internal_file_decryptor.cc
index 53a2f8c021..715807b426 100644
--- a/cpp/src/parquet/encryption/internal_file_decryptor.cc
+++ b/cpp/src/parquet/encryption/internal_file_decryptor.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "parquet/encryption/internal_file_decryptor.h"
+
 #include "arrow/util/logging.h"
 #include "parquet/encryption/encryption.h"
 #include "parquet/encryption/encryption_internal.h"
@@ -24,7 +25,7 @@
 namespace parquet {
 
 // Decryptor
-Decryptor::Decryptor(std::shared_ptr<encryption::AesDecryptor> aes_decryptor,
+Decryptor::Decryptor(std::unique_ptr<encryption::AesDecryptor> aes_decryptor,
                      const std::string& key, const std::string& file_aad,
                      const std::string& aad, ::arrow::MemoryPool* pool)
     : aes_decryptor_(std::move(aes_decryptor)),
@@ -33,6 +34,8 @@ 
Decryptor::Decryptor(std::shared_ptr<encryption::AesDecryptor> aes_decryptor,
       aad_(aad),
       pool_(pool) {}
 
+Decryptor::~Decryptor() = default;
+
 int32_t Decryptor::PlaintextLength(int32_t ciphertext_len) const {
   return aes_decryptor_->PlaintextLength(ciphertext_len);
 }
@@ -47,34 +50,22 @@ int32_t Decryptor::Decrypt(::arrow::util::span<const 
uint8_t> ciphertext,
 }
 
 // InternalFileDecryptor
-InternalFileDecryptor::InternalFileDecryptor(FileDecryptionProperties* 
properties,
-                                             const std::string& file_aad,
-                                             ParquetCipher::type algorithm,
-                                             const std::string& 
footer_key_metadata,
-                                             ::arrow::MemoryPool* pool)
-    : properties_(properties),
+InternalFileDecryptor::InternalFileDecryptor(
+    std::shared_ptr<FileDecryptionProperties> properties, const std::string& 
file_aad,
+    ParquetCipher::type algorithm, const std::string& footer_key_metadata,
+    ::arrow::MemoryPool* pool)
+    : properties_(std::move(properties)),
       file_aad_(file_aad),
       algorithm_(algorithm),
       footer_key_metadata_(footer_key_metadata),
-      pool_(pool) {
-  if (properties_->is_utilized()) {
-    throw ParquetException(
-        "Re-using decryption properties with explicit keys for another file");
-  }
-  properties_->set_utilized();
-}
+      pool_(pool) {}
 
-void InternalFileDecryptor::WipeOutDecryptionKeys() {
-  std::lock_guard<std::mutex> lock(mutex_);
-  properties_->WipeOutDecryptionKeys();
-  for (auto const& i : all_decryptors_) {
-    if (auto aes_decryptor = i.lock()) {
-      aes_decryptor->WipeOut();
-    }
+std::string InternalFileDecryptor::GetFooterKey() {
+  std::unique_lock lock(mutex_);
+  if (!footer_key_.empty()) {
+    return footer_key_;
   }
-}
 
-std::string InternalFileDecryptor::GetFooterKey() {
   std::string footer_key = properties_->footer_key();
   // ignore footer key metadata if footer key is explicitly set via API
   if (footer_key.empty()) {
@@ -95,93 +86,31 @@ std::string InternalFileDecryptor::GetFooterKey() {
         "Footer key unavailable. Could not verify "
         "plaintext footer metadata");
   }
+
+  // cache footer key to avoid repeated retrieval of key from the key_retriever
+  footer_key_ = footer_key;
   return footer_key;
 }
 
-std::shared_ptr<Decryptor> InternalFileDecryptor::GetFooterDecryptor() {
+std::unique_ptr<Decryptor> InternalFileDecryptor::GetFooterDecryptor() {
   std::string aad = encryption::CreateFooterAad(file_aad_);
   return GetFooterDecryptor(aad, true);
 }
 
-std::shared_ptr<Decryptor> 
InternalFileDecryptor::GetFooterDecryptorForColumnMeta(
-    const std::string& aad) {
-  return GetFooterDecryptor(aad, true);
-}
-
-std::shared_ptr<Decryptor> 
InternalFileDecryptor::GetFooterDecryptorForColumnData(
-    const std::string& aad) {
-  return GetFooterDecryptor(aad, false);
-}
-
-std::shared_ptr<Decryptor> InternalFileDecryptor::GetFooterDecryptor(
+std::unique_ptr<Decryptor> InternalFileDecryptor::GetFooterDecryptor(
     const std::string& aad, bool metadata) {
-  if (metadata) {
-    if (footer_metadata_decryptor_ != nullptr) return 
footer_metadata_decryptor_;
-  } else {
-    if (footer_data_decryptor_ != nullptr) return footer_data_decryptor_;
-  }
+  std::string footer_key = GetFooterKey();
 
-  std::string footer_key = properties_->footer_key();
-  if (footer_key.empty()) {
-    if (footer_key_metadata_.empty())
-      throw ParquetException("No footer key or key metadata");
-    if (properties_->key_retriever() == nullptr)
-      throw ParquetException("No footer key or key retriever");
-    try {
-      footer_key = properties_->key_retriever()->GetKey(footer_key_metadata_);
-    } catch (KeyAccessDeniedException& e) {
-      std::stringstream ss;
-      ss << "Footer key: access denied " << e.what() << "\n";
-      throw ParquetException(ss.str());
-    }
-  }
-  if (footer_key.empty()) {
-    throw ParquetException(
-        "Invalid footer encryption key. "
-        "Could not parse footer metadata");
-  }
-
-  // Create both data and metadata decryptors to avoid redundant retrieval of 
key
-  // from the key_retriever.
   auto key_len = static_cast<int32_t>(footer_key.size());
-  std::shared_ptr<encryption::AesDecryptor> aes_metadata_decryptor;
-  std::shared_ptr<encryption::AesDecryptor> aes_data_decryptor;
-
-  {
-    std::lock_guard<std::mutex> lock(mutex_);
-    aes_metadata_decryptor = encryption::AesDecryptor::Make(
-        algorithm_, key_len, /*metadata=*/true, &all_decryptors_);
-    aes_data_decryptor = encryption::AesDecryptor::Make(
-        algorithm_, key_len, /*metadata=*/false, &all_decryptors_);
-  }
-
-  footer_metadata_decryptor_ = std::make_shared<Decryptor>(
-      std::move(aes_metadata_decryptor), footer_key, file_aad_, aad, pool_);
-  footer_data_decryptor_ = 
std::make_shared<Decryptor>(std::move(aes_data_decryptor),
-                                                       footer_key, file_aad_, 
aad, pool_);
-
-  if (metadata) return footer_metadata_decryptor_;
-  return footer_data_decryptor_;
-}
-
-std::shared_ptr<Decryptor> InternalFileDecryptor::GetColumnMetaDecryptor(
-    const std::string& column_path, const std::string& column_key_metadata,
-    const std::string& aad) {
-  return GetColumnDecryptor(column_path, column_key_metadata, aad, true);
-}
-
-std::shared_ptr<Decryptor> InternalFileDecryptor::GetColumnDataDecryptor(
-    const std::string& column_path, const std::string& column_key_metadata,
-    const std::string& aad) {
-  return GetColumnDecryptor(column_path, column_key_metadata, aad, false);
+  auto aes_decryptor = encryption::AesDecryptor::Make(algorithm_, key_len, 
metadata);
+  return std::make_unique<Decryptor>(std::move(aes_decryptor), footer_key, 
file_aad_, aad,
+                                     pool_);
 }
 
-std::shared_ptr<Decryptor> InternalFileDecryptor::GetColumnDecryptor(
-    const std::string& column_path, const std::string& column_key_metadata,
-    const std::string& aad, bool metadata) {
+std::string InternalFileDecryptor::GetColumnKey(const std::string& column_path,
+                                                const std::string& 
column_key_metadata) {
   std::string column_key = properties_->column_key(column_path);
 
-  column_key = properties_->column_key(column_path);
   // No explicit column key given via API. Retrieve via key metadata.
   if (column_key.empty() && !column_key_metadata.empty() &&
       properties_->key_retriever() != nullptr) {
@@ -196,61 +125,71 @@ std::shared_ptr<Decryptor> 
InternalFileDecryptor::GetColumnDecryptor(
   if (column_key.empty()) {
     throw HiddenColumnException("HiddenColumnException, path=" + column_path);
   }
+  return column_key;
+}
 
+std::unique_ptr<Decryptor> InternalFileDecryptor::GetColumnDecryptor(
+    const std::string& column_path, const std::string& column_key_metadata,
+    const std::string& aad, bool metadata) {
+  std::string column_key = GetColumnKey(column_path, column_key_metadata);
   auto key_len = static_cast<int32_t>(column_key.size());
-  std::lock_guard<std::mutex> lock(mutex_);
-  auto aes_decryptor =
-      encryption::AesDecryptor::Make(algorithm_, key_len, metadata, 
&all_decryptors_);
-  return std::make_shared<Decryptor>(std::move(aes_decryptor), column_key, 
file_aad_, aad,
+  auto aes_decryptor = encryption::AesDecryptor::Make(algorithm_, key_len, 
metadata);
+  return std::make_unique<Decryptor>(std::move(aes_decryptor), column_key, 
file_aad_, aad,
                                      pool_);
 }
 
-namespace {
-
-std::shared_ptr<Decryptor> GetColumnDecryptor(
-    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor,
-    const std::function<std::shared_ptr<Decryptor>(
-        InternalFileDecryptor* file_decryptor, const std::string& column_path,
-        const std::string& column_key_metadata, const std::string& aad)>& func,
-    bool metadata) {
-  if (crypto_metadata == nullptr) {
-    return nullptr;
-  }
-
-  if (file_decryptor == nullptr) {
-    throw ParquetException("RowGroup is noted as encrypted but no file 
decryptor");
-  }
-
+std::function<std::unique_ptr<Decryptor>()>
+InternalFileDecryptor::GetColumnDecryptorFactory(
+    const ColumnCryptoMetaData* crypto_metadata, const std::string& aad, bool 
metadata) {
   if (crypto_metadata->encrypted_with_footer_key()) {
-    return metadata ? file_decryptor->GetFooterDecryptorForColumnMeta()
-                    : file_decryptor->GetFooterDecryptorForColumnData();
+    return [this, aad, metadata]() { return GetFooterDecryptor(aad, metadata); 
};
   }
 
   // The column is encrypted with its own key
   const std::string& column_key_metadata = crypto_metadata->key_metadata();
   const std::string column_path = 
crypto_metadata->path_in_schema()->ToDotString();
-  return func(file_decryptor, column_path, column_key_metadata, /*aad=*/"");
-}
+  std::string column_key = GetColumnKey(column_path, column_key_metadata);
 
-}  // namespace
+  return [this, aad, metadata, column_key = std::move(column_key)]() {
+    auto key_len = static_cast<int32_t>(column_key.size());
+    auto aes_decryptor = encryption::AesDecryptor::Make(algorithm_, key_len, 
metadata);
+    return std::make_unique<Decryptor>(std::move(aes_decryptor), column_key, 
file_aad_,
+                                       aad, pool_);
+  };
+}
 
-std::shared_ptr<Decryptor> GetColumnMetaDecryptor(
-    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor) {
-  return GetColumnDecryptor(crypto_metadata, file_decryptor,
-                            &InternalFileDecryptor::GetColumnMetaDecryptor,
-                            /*metadata=*/true);
+std::function<std::unique_ptr<Decryptor>()>
+InternalFileDecryptor::GetColumnMetaDecryptorFactory(
+    InternalFileDecryptor* file_descryptor, const ColumnCryptoMetaData* 
crypto_metadata,
+    const std::string& aad) {
+  if (crypto_metadata == nullptr) {
+    // Column is not encrypted
+    return [] { return nullptr; };
+  }
+  if (file_descryptor == nullptr) {
+    throw ParquetException("Column is noted as encrypted but no file 
decryptor");
+  }
+  return file_descryptor->GetColumnDecryptorFactory(crypto_metadata, aad,
+                                                    /*metadata=*/true);
 }
 
-std::shared_ptr<Decryptor> GetColumnDataDecryptor(
-    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor) {
-  return GetColumnDecryptor(crypto_metadata, file_decryptor,
-                            &InternalFileDecryptor::GetColumnDataDecryptor,
-                            /*metadata=*/false);
+std::function<std::unique_ptr<Decryptor>()>
+InternalFileDecryptor::GetColumnDataDecryptorFactory(
+    InternalFileDecryptor* file_descryptor, const ColumnCryptoMetaData* 
crypto_metadata,
+    const std::string& aad) {
+  if (crypto_metadata == nullptr) {
+    // Column is not encrypted
+    return [] { return nullptr; };
+  }
+  if (file_descryptor == nullptr) {
+    throw ParquetException("Column is noted as encrypted but no file 
decryptor");
+  }
+  return file_descryptor->GetColumnDecryptorFactory(crypto_metadata, aad,
+                                                    /*metadata=*/false);
 }
 
-void UpdateDecryptor(const std::shared_ptr<Decryptor>& decryptor,
-                     int16_t row_group_ordinal, int16_t column_ordinal,
-                     int8_t module_type) {
+void UpdateDecryptor(Decryptor* decryptor, int16_t row_group_ordinal,
+                     int16_t column_ordinal, int8_t module_type) {
   ARROW_DCHECK(!decryptor->file_aad().empty());
   const std::string aad =
       encryption::CreateModuleAad(decryptor->file_aad(), module_type, 
row_group_ordinal,
diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.h 
b/cpp/src/parquet/encryption/internal_file_decryptor.h
index 08423de7fe..cc0e315e02 100644
--- a/cpp/src/parquet/encryption/internal_file_decryptor.h
+++ b/cpp/src/parquet/encryption/internal_file_decryptor.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include <map>
 #include <memory>
 #include <mutex>
 #include <string>
@@ -35,11 +34,15 @@ class AesEncryptor;
 class ColumnCryptoMetaData;
 class FileDecryptionProperties;
 
+// An object handling decryption using well-known encryption parameters
+//
+// CAUTION: Decryptor objects are not thread-safe.
 class PARQUET_EXPORT Decryptor {
  public:
-  Decryptor(std::shared_ptr<encryption::AesDecryptor> decryptor, const 
std::string& key,
+  Decryptor(std::unique_ptr<encryption::AesDecryptor> decryptor, const 
std::string& key,
             const std::string& file_aad, const std::string& aad,
             ::arrow::MemoryPool* pool);
+  ~Decryptor();
 
   const std::string& file_aad() const { return file_aad_; }
   void UpdateAad(const std::string& aad) { aad_ = aad; }
@@ -51,7 +54,7 @@ class PARQUET_EXPORT Decryptor {
                   ::arrow::util::span<uint8_t> plaintext);
 
  private:
-  std::shared_ptr<encryption::AesDecryptor> aes_decryptor_;
+  std::unique_ptr<encryption::AesDecryptor> aes_decryptor_;
   std::string key_;
   std::string file_aad_;
   std::string aad_;
@@ -60,70 +63,86 @@ class PARQUET_EXPORT Decryptor {
 
 class InternalFileDecryptor {
  public:
-  explicit InternalFileDecryptor(FileDecryptionProperties* properties,
+  explicit InternalFileDecryptor(std::shared_ptr<FileDecryptionProperties> 
properties,
                                  const std::string& file_aad,
                                  ParquetCipher::type algorithm,
                                  const std::string& footer_key_metadata,
                                  ::arrow::MemoryPool* pool);
 
-  std::string& file_aad() { return file_aad_; }
+  const std::string& file_aad() const { return file_aad_; }
 
   std::string GetFooterKey();
 
-  ParquetCipher::type algorithm() { return algorithm_; }
+  ParquetCipher::type algorithm() const { return algorithm_; }
 
-  std::string& footer_key_metadata() { return footer_key_metadata_; }
+  const std::string& footer_key_metadata() const { return 
footer_key_metadata_; }
 
-  FileDecryptionProperties* properties() { return properties_; }
+  const std::shared_ptr<FileDecryptionProperties>& properties() const {
+    return properties_;
+  }
 
-  void WipeOutDecryptionKeys();
+  ::arrow::MemoryPool* pool() const { return pool_; }
 
-  ::arrow::MemoryPool* pool() { return pool_; }
+  // Get a Decryptor instance for the Parquet footer
+  std::unique_ptr<Decryptor> GetFooterDecryptor();
 
-  std::shared_ptr<Decryptor> GetFooterDecryptor();
-  std::shared_ptr<Decryptor> GetFooterDecryptorForColumnMeta(const 
std::string& aad = "");
-  std::shared_ptr<Decryptor> GetFooterDecryptorForColumnData(const 
std::string& aad = "");
-  std::shared_ptr<Decryptor> GetColumnMetaDecryptor(
+  // Get a Decryptor instance for column chunk metadata.
+  std::unique_ptr<Decryptor> GetColumnMetaDecryptor(
       const std::string& column_path, const std::string& column_key_metadata,
-      const std::string& aad = "");
-  std::shared_ptr<Decryptor> GetColumnDataDecryptor(
+      const std::string& aad = "") {
+    return GetColumnDecryptor(column_path, column_key_metadata, aad, 
/*metadata=*/true);
+  }
+
+  // Get a Decryptor instance for column chunk data.
+  std::unique_ptr<Decryptor> GetColumnDataDecryptor(
       const std::string& column_path, const std::string& column_key_metadata,
+      const std::string& aad = "") {
+    return GetColumnDecryptor(column_path, column_key_metadata, aad, 
/*metadata=*/false);
+  }
+
+  // Get a Decryptor factory for column chunk metadata.
+  //
+  // This is typically useful if multi-threaded decryption is expected.
+  // This is a static function as it accepts a null `InternalFileDecryptor*`
+  // argument if the column is not encrypted.
+  static std::function<std::unique_ptr<Decryptor>()> 
GetColumnMetaDecryptorFactory(
+      InternalFileDecryptor*, const ColumnCryptoMetaData* crypto_metadata,
+      const std::string& aad = "");
+  // Get a Decryptor factory for column chunk data.
+  //
+  // This is typically useful if multi-threaded decryption is expected.
+  // This is a static function as it accepts a null `InternalFileDecryptor*`
+  // argument if the column is not encrypted.
+  static std::function<std::unique_ptr<Decryptor>()> 
GetColumnDataDecryptorFactory(
+      InternalFileDecryptor*, const ColumnCryptoMetaData* crypto_metadata,
       const std::string& aad = "");
 
  private:
-  FileDecryptionProperties* properties_;
+  std::shared_ptr<FileDecryptionProperties> properties_;
   // Concatenation of aad_prefix (if exists) and aad_file_unique
   std::string file_aad_;
-
-  std::shared_ptr<Decryptor> footer_metadata_decryptor_;
-  std::shared_ptr<Decryptor> footer_data_decryptor_;
   ParquetCipher::type algorithm_;
   std::string footer_key_metadata_;
-  // Mutex to guard access to all_decryptors_
-  mutable std::mutex mutex_;
-  // A weak reference to all decryptors that need to be wiped out when 
decryption is
-  // finished, guarded by mutex_ for thread safety
-  std::vector<std::weak_ptr<encryption::AesDecryptor>> all_decryptors_;
-
   ::arrow::MemoryPool* pool_;
 
-  std::shared_ptr<Decryptor> GetFooterDecryptor(const std::string& aad, bool 
metadata);
-  std::shared_ptr<Decryptor> GetColumnDecryptor(const std::string& column_path,
-                                                const std::string& 
column_key_metadata,
-                                                const std::string& aad,
-                                                bool metadata = false);
-};
+  // Protects footer_key_ updates
+  std::mutex mutex_;
+  std::string footer_key_;
+
+  std::string GetColumnKey(const std::string& column_path,
+                           const std::string& column_key_metadata);
 
-/// Utility to get column meta decryptor of an encrypted column.
-std::shared_ptr<Decryptor> GetColumnMetaDecryptor(
-    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor);
+  std::unique_ptr<Decryptor> GetFooterDecryptor(const std::string& aad, bool 
metadata);
 
-/// Utility to get column data decryptor of an encrypted column.
-std::shared_ptr<Decryptor> GetColumnDataDecryptor(
-    const ColumnCryptoMetaData* crypto_metadata, InternalFileDecryptor* 
file_decryptor);
+  std::unique_ptr<Decryptor> GetColumnDecryptor(const std::string& column_path,
+                                                const std::string& 
column_key_metadata,
+                                                const std::string& aad, bool 
metadata);
+
+  std::function<std::unique_ptr<Decryptor>()> GetColumnDecryptorFactory(
+      const ColumnCryptoMetaData* crypto_metadata, const std::string& aad, 
bool metadata);
+};
 
-void UpdateDecryptor(const std::shared_ptr<Decryptor>& decryptor,
-                     int16_t row_group_ordinal, int16_t column_ordinal,
-                     int8_t module_type);
+void UpdateDecryptor(Decryptor* decryptor, int16_t row_group_ordinal,
+                     int16_t column_ordinal, int8_t module_type);
 
 }  // namespace parquet
diff --git a/cpp/src/parquet/encryption/internal_file_encryptor.cc 
b/cpp/src/parquet/encryption/internal_file_encryptor.cc
index 94094e6aca..9210ffba9c 100644
--- a/cpp/src/parquet/encryption/internal_file_encryptor.cc
+++ b/cpp/src/parquet/encryption/internal_file_encryptor.cc
@@ -43,27 +43,7 @@ int32_t Encryptor::Encrypt(::arrow::util::span<const 
uint8_t> plaintext,
 // InternalFileEncryptor
 InternalFileEncryptor::InternalFileEncryptor(FileEncryptionProperties* 
properties,
                                              ::arrow::MemoryPool* pool)
-    : properties_(properties), pool_(pool) {
-  if (properties_->is_utilized()) {
-    throw ParquetException("Re-using encryption properties for another file");
-  }
-  properties_->set_utilized();
-}
-
-void InternalFileEncryptor::WipeOutEncryptionKeys() {
-  properties_->WipeOutEncryptionKeys();
-
-  for (auto const& i : meta_encryptor_) {
-    if (i != nullptr) {
-      i->WipeOut();
-    }
-  }
-  for (auto const& i : data_encryptor_) {
-    if (i != nullptr) {
-      i->WipeOut();
-    }
-  }
-}
+    : properties_(properties), pool_(pool) {}
 
 std::shared_ptr<Encryptor> InternalFileEncryptor::GetFooterEncryptor() {
   if (footer_encryptor_ != nullptr) {
diff --git a/cpp/src/parquet/encryption/internal_file_encryptor.h 
b/cpp/src/parquet/encryption/internal_file_encryptor.h
index 5a3d743ce5..a7108ab66f 100644
--- a/cpp/src/parquet/encryption/internal_file_encryptor.h
+++ b/cpp/src/parquet/encryption/internal_file_encryptor.h
@@ -77,7 +77,6 @@ class InternalFileEncryptor {
   std::shared_ptr<Encryptor> GetFooterSigningEncryptor();
   std::shared_ptr<Encryptor> GetColumnMetaEncryptor(const std::string& 
column_path);
   std::shared_ptr<Encryptor> GetColumnDataEncryptor(const std::string& 
column_path);
-  void WipeOutEncryptionKeys();
 
  private:
   FileEncryptionProperties* properties_;
diff --git a/cpp/src/parquet/encryption/key_management_test.cc 
b/cpp/src/parquet/encryption/key_management_test.cc
index 1506a00a14..2e43edee53 100644
--- a/cpp/src/parquet/encryption/key_management_test.cc
+++ b/cpp/src/parquet/encryption/key_management_test.cc
@@ -405,7 +405,7 @@ TEST_F(TestEncryptionKeyManagement, 
ReadParquetMRExternalKeyMaterialFile) {
       kms_connection_config_, decryption_config, file_path, file_system);
 
   parquet::ReaderProperties reader_properties = 
parquet::default_reader_properties();
-  
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
+  reader_properties.file_decryption_properties(file_decryption_properties);
 
   std::shared_ptr<::arrow::io::RandomAccessFile> source;
   PARQUET_ASSIGN_OR_THROW(source, ::arrow::io::ReadableFile::Open(
diff --git a/cpp/src/parquet/encryption/read_configurations_test.cc 
b/cpp/src/parquet/encryption/read_configurations_test.cc
index f450f9274c..2612229028 100644
--- a/cpp/src/parquet/encryption/read_configurations_test.cc
+++ b/cpp/src/parquet/encryption/read_configurations_test.cc
@@ -172,34 +172,27 @@ class TestDecryptionConfiguration
       std::function<void(const std::string& file,
                          const std::shared_ptr<FileDecryptionProperties>&)>
           decrypt_func) {
-    std::string exception_msg;
     std::shared_ptr<FileDecryptionProperties> file_decryption_properties;
-    // if we get decryption_config_num = x then it means the actual number is 
x+1
-    // and since we want decryption_config_num=4 we set the condition to 3
-    if (decryption_config_num != 3) {
+    if (vector_of_decryption_configurations_[decryption_config_num]) {
       file_decryption_properties =
-          
vector_of_decryption_configurations_[decryption_config_num]->DeepClone();
+          vector_of_decryption_configurations_[decryption_config_num];
     }
 
     decrypt_func(std::move(file), std::move(file_decryption_properties));
   }
 
+  std::shared_ptr<FileDecryptionProperties> GetDecryptionProperties(
+      int decryption_config_num) {
+    const auto props = 
vector_of_decryption_configurations_[decryption_config_num];
+    return props;
+  }
+
   void DecryptFile(const std::string& file, int decryption_config_num) {
-    DecryptFileInternal(
-        file, decryption_config_num,
-        [&](const std::string& file,
-            const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
-          decryptor_.DecryptFile(file, file_decryption_properties);
-        });
+    decryptor_.DecryptFile(file, 
GetDecryptionProperties(decryption_config_num));
   }
 
   void DecryptPageIndex(const std::string& file, int decryption_config_num) {
-    DecryptFileInternal(
-        file, decryption_config_num,
-        [&](const std::string& file,
-            const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties) {
-          decryptor_.DecryptPageIndex(file, file_decryption_properties);
-        });
+    decryptor_.DecryptPageIndex(file, 
GetDecryptionProperties(decryption_config_num));
   }
 
   // Check that the decryption result is as expected.
diff --git a/cpp/src/parquet/encryption/test_encryption_util.cc 
b/cpp/src/parquet/encryption/test_encryption_util.cc
index cf863da60a..1864e86f34 100644
--- a/cpp/src/parquet/encryption/test_encryption_util.cc
+++ b/cpp/src/parquet/encryption/test_encryption_util.cc
@@ -349,7 +349,7 @@ void FileDecryptor::DecryptFile(
   std::string exception_msg;
   parquet::ReaderProperties reader_properties = 
parquet::default_reader_properties();
   if (file_decryption_properties) {
-    
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
+    reader_properties.file_decryption_properties(file_decryption_properties);
   }
 
   std::shared_ptr<::arrow::io::RandomAccessFile> source;
@@ -360,7 +360,7 @@ void FileDecryptor::DecryptFile(
   CheckFile(file_reader.get(), file_decryption_properties);
 
   if (file_decryption_properties) {
-    
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
+    reader_properties.file_decryption_properties(file_decryption_properties);
   }
   auto fut = parquet::ParquetFileReader::OpenAsync(source, reader_properties);
   ASSERT_FINISHES_OK(fut);
@@ -520,7 +520,7 @@ void FileDecryptor::DecryptPageIndex(
   std::string exception_msg;
   parquet::ReaderProperties reader_properties = 
parquet::default_reader_properties();
   if (file_decryption_properties) {
-    
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
+    reader_properties.file_decryption_properties(file_decryption_properties);
   }
 
   std::shared_ptr<::arrow::io::RandomAccessFile> source;
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index ed0879c1f1..54df6922a1 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -259,12 +259,11 @@ class SerializedRowGroup : public 
RowGroupReader::Contents {
     }
 
     // The column is encrypted
-    std::shared_ptr<Decryptor> meta_decryptor = GetColumnMetaDecryptor(
-        crypto_metadata.get(), file_metadata_->file_decryptor().get());
-    std::shared_ptr<Decryptor> data_decryptor = GetColumnDataDecryptor(
-        crypto_metadata.get(), file_metadata_->file_decryptor().get());
-    ARROW_DCHECK_NE(meta_decryptor, nullptr);
-    ARROW_DCHECK_NE(data_decryptor, nullptr);
+    auto* file_decryptor = file_metadata_->file_decryptor().get();
+    auto meta_decryptor_factory = 
InternalFileDecryptor::GetColumnMetaDecryptorFactory(
+        file_decryptor, crypto_metadata.get());
+    auto data_decryptor_factory = 
InternalFileDecryptor::GetColumnDataDecryptorFactory(
+        file_decryptor, crypto_metadata.get());
 
     constexpr auto kEncryptedOrdinalLimit = 32767;
     if (ARROW_PREDICT_FALSE(row_group_ordinal_ > kEncryptedOrdinalLimit)) {
@@ -274,9 +273,10 @@ class SerializedRowGroup : public RowGroupReader::Contents 
{
       throw ParquetException("Encrypted files cannot contain more than 32767 
columns");
     }
 
-    CryptoContext ctx(col->has_dictionary_page(),
+    CryptoContext ctx{col->has_dictionary_page(),
                       static_cast<int16_t>(row_group_ordinal_), 
static_cast<int16_t>(i),
-                      meta_decryptor, data_decryptor);
+                      std::move(meta_decryptor_factory),
+                      std::move(data_decryptor_factory)};
     return PageReader::Open(stream, col->num_values(), col->compression(), 
properties_,
                             always_compressed, &ctx);
   }
@@ -314,11 +314,7 @@ class SerializedFile : public ParquetFileReader::Contents {
     }
   }
 
-  void Close() override {
-    if (file_metadata_ && file_metadata_->file_decryptor()) {
-      file_metadata_->file_decryptor()->WipeOutDecryptionKeys();
-    }
-  }
+  void Close() override {}
 
   std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
     std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap;
@@ -474,7 +470,7 @@ class SerializedFile : public ParquetFileReader::Contents {
 
     const uint32_t read_metadata_len = ParseUnencryptedFileMetadata(
         metadata_buffer, metadata_len, std::move(file_decryptor));
-    auto file_decryption_properties = 
properties_.file_decryption_properties().get();
+    auto file_decryption_properties = properties_.file_decryption_properties();
     if (is_encrypted_footer) {
       // Nothing else to do here.
       return;
@@ -583,7 +579,8 @@ class SerializedFile : public ParquetFileReader::Contents {
       int64_t metadata_start = read_size.first;
       metadata_len = read_size.second;
       return source_->ReadAsync(metadata_start, metadata_len)
-          .Then([this, metadata_len, is_encrypted_footer, file_decryptor](
+          .Then([this, metadata_len, is_encrypted_footer,
+                 file_decryptor = std::move(file_decryptor)](
                     const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
             // Continue and read the file footer
             return ParseMetaDataFinal(metadata_buffer, metadata_len, 
is_encrypted_footer,
@@ -602,7 +599,7 @@ class SerializedFile : public ParquetFileReader::Contents {
     BEGIN_PARQUET_CATCH_EXCEPTIONS
     const uint32_t read_metadata_len = ParseUnencryptedFileMetadata(
         metadata_buffer, metadata_len, std::move(file_decryptor));
-    auto file_decryption_properties = 
properties_.file_decryption_properties().get();
+    auto file_decryption_properties = properties_.file_decryption_properties();
     if (is_encrypted_footer) {
       // Nothing else to do here.
       return ::arrow::Status::OK();
@@ -638,11 +635,12 @@ class SerializedFile : public ParquetFileReader::Contents 
{
       const std::shared_ptr<Buffer>& footer_buffer, const uint32_t 
metadata_len,
       std::shared_ptr<InternalFileDecryptor> file_decryptor);
 
-  std::string HandleAadPrefix(FileDecryptionProperties* 
file_decryption_properties,
-                              EncryptionAlgorithm& algo);
+  std::string HandleAadPrefix(
+      const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties,
+      const EncryptionAlgorithm& algo);
 
   void ParseMetaDataOfEncryptedFileWithPlaintextFooter(
-      FileDecryptionProperties* file_decryption_properties,
+      const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties,
       const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len,
       uint32_t read_metadata_len);
 
@@ -679,7 +677,7 @@ 
SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(
                            std::to_string(footer_len) + " bytes but got " +
                            std::to_string(crypto_metadata_buffer->size()) + " 
bytes)");
   }
-  auto file_decryption_properties = 
properties_.file_decryption_properties().get();
+  auto file_decryption_properties = properties_.file_decryption_properties();
   if (file_decryption_properties == nullptr) {
     throw ParquetException(
         "Could not read encrypted metadata, no decryption found in reader's 
properties");
@@ -700,7 +698,7 @@ 
SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(
 }
 
 void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter(
-    FileDecryptionProperties* file_decryption_properties,
+    const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties,
     const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len,
     uint32_t read_metadata_len) {
   // Providing decryption properties in plaintext footer mode is not 
mandatory, for
@@ -734,7 +732,8 @@ void 
SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter(
 }
 
 std::string SerializedFile::HandleAadPrefix(
-    FileDecryptionProperties* file_decryption_properties, EncryptionAlgorithm& 
algo) {
+    const std::shared_ptr<FileDecryptionProperties>& 
file_decryption_properties,
+    const EncryptionAlgorithm& algo) {
   std::string aad_prefix_in_properties = 
file_decryption_properties->aad_prefix();
   std::string aad_prefix = aad_prefix_in_properties;
   bool file_has_aad_prefix = algo.aad.aad_prefix.empty() ? false : true;
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index f80a095a13..10c6cb95a5 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -448,9 +448,6 @@ class FileSerializer : public ParquetFileWriter::Contents {
       WriteEncryptedFileMetadata(*file_metadata_, sink_.get(), 
footer_signing_encryptor,
                                  false);
     }
-    if (file_encryptor_) {
-      file_encryptor_->WipeOutEncryptionKeys();
-    }
   }
 
   void WritePageIndex() {
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 398ff761bd..9f825a54ae 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -749,8 +749,6 @@ class FileMetaData::FileMetaDataImpl {
     int32_t encrypted_len = aes_encryptor->SignedFooterEncrypt(
         serialized_data_span, str2span(key), str2span(aad), nonce,
         encrypted_buffer->mutable_span_as<uint8_t>());
-    // Delete AES encryptor object. It was created only to verify the footer 
signature.
-    aes_encryptor->WipeOut();
     return 0 ==
            memcmp(encrypted_buffer->data() + encrypted_len - 
encryption::kGcmTagLength,
                   tag, encryption::kGcmTagLength);
diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc
index 8cc819f10c..f24032ded5 100644
--- a/cpp/src/parquet/page_index.cc
+++ b/cpp/src/parquet/page_index.cc
@@ -256,10 +256,11 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
     auto descr = row_group_metadata_->schema()->Column(i);
 
     // Get decryptor of column index if encrypted.
-    std::shared_ptr<Decryptor> decryptor = parquet::GetColumnMetaDecryptor(
-        col_chunk->crypto_metadata().get(), file_decryptor_);
+    std::unique_ptr<Decryptor> decryptor =
+        InternalFileDecryptor::GetColumnMetaDecryptorFactory(
+            file_decryptor_, col_chunk->crypto_metadata().get())();
     if (decryptor != nullptr) {
-      UpdateDecryptor(decryptor, row_group_ordinal_, /*column_ordinal=*/i,
+      UpdateDecryptor(decryptor.get(), row_group_ordinal_, 
/*column_ordinal=*/i,
                       encryption::kColumnIndex);
     }
 
@@ -295,10 +296,11 @@ class RowGroupPageIndexReaderImpl : public 
RowGroupPageIndexReader {
     uint32_t length = static_cast<uint32_t>(offset_index_location->length);
 
     // Get decryptor of offset index if encrypted.
-    std::shared_ptr<Decryptor> decryptor =
-        GetColumnMetaDecryptor(col_chunk->crypto_metadata().get(), 
file_decryptor_);
+    std::unique_ptr<Decryptor> decryptor =
+        InternalFileDecryptor::GetColumnMetaDecryptorFactory(
+            file_decryptor_, col_chunk->crypto_metadata().get())();
     if (decryptor != nullptr) {
-      UpdateDecryptor(decryptor, row_group_ordinal_, /*column_ordinal=*/i,
+      UpdateDecryptor(decryptor.get(), row_group_ordinal_, 
/*column_ordinal=*/i,
                       encryption::kOffsetIndex);
     }
 

Reply via email to