This is an automated email from the ASF dual-hosted git repository.

areeve pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5112de2322 GH-31869: [Python][Parquet] Implement external key material 
features in Python (#48009)
5112de2322 is described below

commit 5112de23222dda6fa449f0488265ee0679fd227b
Author: Patrick Parsons <[email protected]>
AuthorDate: Tue Nov 11 22:46:58 2025 -0500

    GH-31869: [Python][Parquet] Implement external key material features in 
Python (#48009)
    
    ### Rationale for this change
    Enables external key material and rotation for individual parquet files in 
PyArrow.  This change does not address any parquet dataset encryption 
functionality.
    
    ### What changes are included in this PR?
    This PR enables external key material for parquet encryption from PyArrow:
    Optional parquet_file_path and FileSystem paramters to CryptoFactory - 
mirroring the interface for CryptoFactory in C++
    
    1. Exposes the rotate_master_keys method of CryptoFactory
    
    2. Adds Cython classes for FileKeyMaterialStore, 
FileSystemKeyMaterialStore, and KeyMaterial - but does not expose these from 
PyArrow encryption.  I included these changes only so that a unit test may 
verify an external store without leaking the implementation details for the 
store into the test.
    
    ### Are these changes tested?
    Yes - I've modified an existing test (previously marked pytest.xfail) to do 
a basic read write test and verify creation of the external key material store 
and added a test for CryptoFactory.rotate_master_keys.
    
    ### Are there any user-facing changes?
    
    1. Users may optionally supply a parquet file path and FileSystem to 
CryptoFactory methods that provide en/decryption_properties.  Doing so in 
conjunction with setting EncryptionConfiguration.internal_key_material=False 
enables external key material from pyarrow.
    2. PyArrow CryptoFactory now has a rotate_master_keys method exposing key 
rotation functionality from C++ CryptoFactory.
    
    * GitHub Issue: #31869
    
    Lead-authored-by: Patrick Parsons 
<[email protected]>
    Co-authored-by: Adam Reeve <[email protected]>
    Signed-off-by: Adam Reeve <[email protected]>
---
 .../encryption/file_system_key_material_store.cc   |   1 -
 .../encryption/file_system_key_material_store.h    |   4 +
 python/pyarrow/_parquet_encryption.pxd             |   8 +
 python/pyarrow/_parquet_encryption.pyx             | 218 ++++++++++++++++++++-
 python/pyarrow/includes/libparquet_encryption.pxd  |  63 +++++-
 .../pyarrow/src/arrow/python/parquet_encryption.cc |  27 ++-
 .../pyarrow/src/arrow/python/parquet_encryption.h  |  18 +-
 python/pyarrow/tests/parquet/conftest.py           |   5 +
 python/pyarrow/tests/parquet/encryption.py         |  66 ++++++-
 python/pyarrow/tests/parquet/test_encryption.py    | 154 ++++++++++++---
 10 files changed, 519 insertions(+), 45 deletions(-)

diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.cc 
b/cpp/src/parquet/encryption/file_system_key_material_store.cc
index cbecf2645d..7a7db3fa62 100644
--- a/cpp/src/parquet/encryption/file_system_key_material_store.cc
+++ b/cpp/src/parquet/encryption/file_system_key_material_store.cc
@@ -25,7 +25,6 @@
 
 #include "parquet/encryption/file_system_key_material_store.h"
 #include "parquet/encryption/key_material.h"
-#include "parquet/exception.h"
 
 namespace parquet::encryption {
 
diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.h 
b/cpp/src/parquet/encryption/file_system_key_material_store.h
index 3babfdbf82..ecbadb90a9 100644
--- a/cpp/src/parquet/encryption/file_system_key_material_store.h
+++ b/cpp/src/parquet/encryption/file_system_key_material_store.h
@@ -24,6 +24,7 @@
 #include "arrow/filesystem/filesystem.h"
 
 #include "parquet/encryption/file_key_material_store.h"
+#include "parquet/exception.h"
 
 namespace parquet::encryption {
 
@@ -59,6 +60,9 @@ class PARQUET_EXPORT FileSystemKeyMaterialStore : public 
FileKeyMaterialStore {
       LoadKeyMaterialMap();
     }
     auto found = key_material_map_.find(key_id_in_file);
+    if (found == key_material_map_.end()) {
+      throw ParquetException("Invalid key id");
+    }
     return found->second;
   }
 
diff --git a/python/pyarrow/_parquet_encryption.pxd 
b/python/pyarrow/_parquet_encryption.pxd
index d52669501a..48939fe277 100644
--- a/python/pyarrow/_parquet_encryption.pxd
+++ b/python/pyarrow/_parquet_encryption.pxd
@@ -49,6 +49,14 @@ cdef class KmsConnectionConfig(_Weakrefable):
     @staticmethod
     cdef wrap(const CKmsConnectionConfig& config)
 
+cdef class KeyMaterial(_Weakrefable):
+    cdef shared_ptr[CKeyMaterial] key_material
+
+    @staticmethod
+    cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material)
+
+cdef class FileSystemKeyMaterialStore(_Weakrefable):
+    cdef shared_ptr[CFileSystemKeyMaterialStore] store
 
 cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object 
crypto_factory) except *
 cdef shared_ptr[CKmsConnectionConfig] 
pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except *
diff --git a/python/pyarrow/_parquet_encryption.pyx 
b/python/pyarrow/_parquet_encryption.pyx
index f95464e303..6185d5f239 100644
--- a/python/pyarrow/_parquet_encryption.pyx
+++ b/python/pyarrow/_parquet_encryption.pyx
@@ -25,9 +25,11 @@ from cython.operator cimport dereference as deref
 
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
+from pyarrow.lib cimport check_status
 from pyarrow.lib cimport _Weakrefable
 from pyarrow.lib import tobytes, frombytes
-
+from pyarrow._fs cimport FileSystem
+from pyarrow.fs import _resolve_filesystem_and_path
 
 cdef ParquetCipher cipher_from_name(name):
     name = name.upper()
@@ -364,6 +366,13 @@ cdef void _cb_create_kms_client(
     out[0] = (<KmsClient> result).unwrap()
 
 
+cdef inline shared_ptr[CFileSystem] _unwrap_fs(filesystem: FileSystem | None):
+    if isinstance(filesystem, FileSystem):
+        return filesystem.unwrap()
+    else:
+        return <shared_ptr[CFileSystem]>nullptr
+
+
 cdef class CryptoFactory(_Weakrefable):
     """ A factory that produces the low-level FileEncryptionProperties and
     FileDecryptionProperties objects, from the high-level parameters."""
@@ -402,7 +411,9 @@ cdef class CryptoFactory(_Weakrefable):
 
     def file_encryption_properties(self,
                                    KmsConnectionConfig kms_connection_config,
-                                   EncryptionConfiguration encryption_config):
+                                   EncryptionConfiguration encryption_config,
+                                   parquet_file_path=None,
+                                   FileSystem filesystem=None):
         """Create file encryption properties.
 
         Parameters
@@ -413,6 +424,17 @@ cdef class CryptoFactory(_Weakrefable):
         encryption_config : EncryptionConfiguration
             Configuration of the encryption, such as which columns to encrypt
 
+        parquet_file_path : str, pathlib.Path, or None, default None
+            Path to the parquet file to be encrypted. Only required when the
+            internal_key_material attribute of EncryptionConfiguration is set
+            to False. Used to derive the path for storing key material 
+            specific to this parquet file.
+
+        filesystem : FileSystem or None, default None
+            Used only when internal_key_material is set to False on 
+            EncryptionConfiguration. If None, the file system will be inferred
+            based on parquet_file_path. 
+
         Returns
         -------
         file_encryption_properties : FileEncryptionProperties
@@ -421,11 +443,23 @@ cdef class CryptoFactory(_Weakrefable):
         cdef:
             CResult[shared_ptr[CFileEncryptionProperties]] \
                 file_encryption_properties_result
+            c_string c_parquet_file_path
+            shared_ptr[CFileSystem] c_filesystem
+
+        filesystem, parquet_file_path = _resolve_filesystem_and_path(
+            parquet_file_path, filesystem)
+        if parquet_file_path is not None:
+            c_parquet_file_path = tobytes(parquet_file_path)
+        else:
+            c_parquet_file_path = tobytes("")
+        c_filesystem = _unwrap_fs(filesystem)
+
         with nogil:
             file_encryption_properties_result = \
                 self.factory.get().SafeGetFileEncryptionProperties(
                     deref(kms_connection_config.unwrap().get()),
-                    deref(encryption_config.unwrap().get()))
+                    deref(encryption_config.unwrap().get()),
+                    c_parquet_file_path, c_filesystem)
         file_encryption_properties = GetResultValue(
             file_encryption_properties_result)
         return FileEncryptionProperties.wrap(file_encryption_properties)
@@ -433,7 +467,9 @@ cdef class CryptoFactory(_Weakrefable):
     def file_decryption_properties(
             self,
             KmsConnectionConfig kms_connection_config,
-            DecryptionConfiguration decryption_config=None):
+            DecryptionConfiguration decryption_config=None,
+            parquet_file_path=None,
+            FileSystem filesystem=None):
         """Create file decryption properties.
 
         Parameters
@@ -445,6 +481,15 @@ cdef class CryptoFactory(_Weakrefable):
             Configuration of the decryption, such as cache timeout.
             Can be None.
 
+        parquet_file_path : str, pathlib.Path, or None, default None
+            Path to the parquet file to be decrypted. Only required when
+            the parquet file uses external key material.  Used to derive
+            the path to the external key material file.
+
+        filesystem : FileSystem or None, default None
+            Used only when the parquet file uses external key material. If
+            None, the file system will be inferred based on parquet_file_path. 
+
         Returns
         -------
         file_decryption_properties : FileDecryptionProperties
@@ -454,6 +499,17 @@ cdef class CryptoFactory(_Weakrefable):
             CDecryptionConfiguration c_decryption_config
             CResult[shared_ptr[CFileDecryptionProperties]] \
                 c_file_decryption_properties
+            c_string c_parquet_file_path
+            shared_ptr[CFileSystem] c_filesystem
+
+        filesystem, parquet_file_path = _resolve_filesystem_and_path(
+            parquet_file_path, filesystem)
+        if parquet_file_path is not None:
+            c_parquet_file_path = tobytes(parquet_file_path)
+        else:
+            c_parquet_file_path = tobytes("")
+        c_filesystem = _unwrap_fs(filesystem)
+
         if decryption_config is None:
             c_decryption_config = CDecryptionConfiguration()
         else:
@@ -462,7 +518,7 @@ cdef class CryptoFactory(_Weakrefable):
             c_file_decryption_properties = \
                 self.factory.get().SafeGetFileDecryptionProperties(
                     deref(kms_connection_config.unwrap().get()),
-                    c_decryption_config)
+                    c_decryption_config, c_parquet_file_path, c_filesystem)
         file_decryption_properties = GetResultValue(
             c_file_decryption_properties)
         return FileDecryptionProperties.wrap(file_decryption_properties)
@@ -473,9 +529,161 @@ cdef class CryptoFactory(_Weakrefable):
     def remove_cache_entries_for_all_tokens(self):
         self.factory.get().RemoveCacheEntriesForAllTokens()
 
+    def rotate_master_keys(
+            self,
+            KmsConnectionConfig kms_connection_config,
+            parquet_file_path,
+            FileSystem filesystem=None,
+            double_wrapping=True,
+            cache_lifetime_seconds=600):
+        """ Rotates master encryption keys for a Parquet file that uses
+        external key material.
+
+        Parameters
+        ----------
+        kms_connection_config : KmsConnectionConfig
+            Configuration of connection to KMS
+
+        parquet_file_path : str or pathlib.Path
+            Path to a parquet file using external key material.
+
+        filesystem : FileSystem or None, default None
+            Used only when the parquet file uses external key material. If
+            None, the file system will be inferred based on parquet_file_path. 
+
+        double_wrapping : bool, default True
+            In the single wrapping mode, encrypts data encryption keys with
+            new master keys. In the double wrapping mode, generates new
+            KEKs (key encryption keys) and uses these to encrypt the data keys,
+            and encrypts the KEKs with the new master keys.
+
+        cache_lifetime_seconds : int or float, default 600
+            During key rotation, KMS Client and Key Encryption Keys will be
+            cached for this duration.
+        """
+        cdef:
+            c_string c_parquet_file_path
+            shared_ptr[CFileSystem] c_filesystem
+
+        if parquet_file_path != "":
+            filesystem, parquet_file_path = _resolve_filesystem_and_path(
+                parquet_file_path, filesystem)
+
+        c_parquet_file_path = tobytes(parquet_file_path)
+        c_filesystem = _unwrap_fs(filesystem)
+
+        status = self.factory.get().SafeRotateMasterKeys(
+            deref(kms_connection_config.unwrap().get()),
+            c_parquet_file_path,
+            c_filesystem,
+            double_wrapping,
+            cache_lifetime_seconds)
+
+        check_status(status)
+
     cdef inline shared_ptr[CPyCryptoFactory] unwrap(self):
         return self.factory
 
+cdef class KeyMaterial(_Weakrefable):
+
+    @property
+    def is_footer_key(self):
+        return self.key_material.get().is_footer_key()
+
+    @property
+    def is_double_wrapped(self):
+        return self.key_material.get().is_double_wrapped()
+
+    @property
+    def master_key_id(self):
+        return frombytes(self.key_material.get().master_key_id())
+
+    @property
+    def wrapped_dek(self):
+        return frombytes(self.key_material.get().wrapped_dek())
+
+    @property
+    def kek_id(self):
+        return frombytes(self.key_material.get().kek_id())
+
+    @property
+    def wrapped_kek(self):
+        return frombytes(self.key_material.get().wrapped_kek())
+
+    @property
+    def kms_instance_id(self):
+        return frombytes(self.key_material.get().kms_instance_id())
+
+    @property
+    def kms_instance_url(self):
+        return frombytes(self.key_material.get().kms_instance_url())
+
+    @staticmethod
+    cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material):
+        wrapper = KeyMaterial()
+        wrapper.key_material = key_material
+        return wrapper
+
+    @staticmethod
+    def parse(
+            const c_string key_material_string):
+        cdef:
+            shared_ptr[CKeyMaterial] c_key_material
+        c_key_material = make_shared[CKeyMaterial](move(
+            CKeyMaterial.Parse(key_material_string)
+        ))
+        return KeyMaterial.wrap(c_key_material)
+
+cdef class FileSystemKeyMaterialStore(_Weakrefable):
+
+    def get_key_material(self, key_id):
+        cdef:
+            c_string c_key_id = tobytes(key_id)
+            c_string c_key_material_string
+
+        c_key_material_string = self.store.get().GetKeyMaterial(c_key_id)
+        if c_key_material_string.empty():
+            raise KeyError("Invalid key id")
+        return KeyMaterial.parse(c_key_material_string)
+
+    def get_key_id_set(self):
+        return self.store.get().GetKeyIDSet()
+
+    @classmethod
+    def for_file(cls, parquet_file_path,
+                 FileSystem filesystem=None):
+        """Creates a FileSystemKeyMaterialStore for a parquet file that
+        was created with external key material.
+
+        Parameters
+        ----------
+        parquet_file_path : str or pathlib.Path
+            Path to a parquet file using external key material.
+
+        filesystem : FileSystem, default None
+            FileSystem where the parquet file is located. If None,
+            will be inferred based on parquet_file_path. 
+
+        Returns
+        -------
+        FileSystemKeyMaterialStore
+            A FileSystemKeyMaterialStore wrapping the external key material.
+        """
+        cdef:
+            c_string c_parquet_file_path
+            shared_ptr[CFileSystem] c_filesystem
+            shared_ptr[CFileSystemKeyMaterialStore] c_store
+            FileSystemKeyMaterialStore store = cls()
+
+        filesystem, parquet_file_path = _resolve_filesystem_and_path(
+            parquet_file_path, filesystem)
+        c_parquet_file_path = tobytes(parquet_file_path)
+        c_filesystem = _unwrap_fs(filesystem)
+
+        c_store = CFileSystemKeyMaterialStore.Make(
+            c_parquet_file_path, c_filesystem, False)
+        store.store = c_store
+        return store
 
 cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object 
crypto_factory) except *:
     if isinstance(crypto_factory, CryptoFactory):
diff --git a/python/pyarrow/includes/libparquet_encryption.pxd 
b/python/pyarrow/includes/libparquet_encryption.pxd
index 7e031925af..7024f14ac2 100644
--- a/python/pyarrow/includes/libparquet_encryption.pxd
+++ b/python/pyarrow/includes/libparquet_encryption.pxd
@@ -19,6 +19,7 @@
 
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport CSecureString
+from pyarrow.includes.libarrow_fs cimport CFileSystem
 from pyarrow._parquet cimport (ParquetCipher,
                                CFileEncryptionProperties,
                                CFileDecryptionProperties,
@@ -91,12 +92,57 @@ cdef extern from "parquet/encryption/crypto_factory.h" \
             shared_ptr[CKmsClientFactory] kms_client_factory) except +
         shared_ptr[CFileEncryptionProperties] GetFileEncryptionProperties(
             const CKmsConnectionConfig& kms_connection_config,
-            const CEncryptionConfiguration& encryption_config) except +*
+            const CEncryptionConfiguration& encryption_config,
+            const c_string parquet_file_path,
+            const shared_ptr[CFileSystem] file_system) except +*
         shared_ptr[CFileDecryptionProperties] GetFileDecryptionProperties(
             const CKmsConnectionConfig& kms_connection_config,
-            const CDecryptionConfiguration& decryption_config) except +*
+            const CDecryptionConfiguration& decryption_config,
+            const c_string parquet_file_path,
+            const shared_ptr[CFileSystem] file_system) except +*
         void RemoveCacheEntriesForToken(const c_string& access_token) except +
         void RemoveCacheEntriesForAllTokens() except +
+        void RotateMasterKeys(const CKmsConnectionConfig& 
kms_connection_config,
+                              const c_string parquet_file_path,
+                              const shared_ptr[CFileSystem] file_system,
+                              c_bool double_wrapping,
+                              double cache_lifetime_seconds)
+
+cdef extern from "parquet/encryption/file_key_material_store.h" \
+        namespace "parquet::encryption" nogil:
+    cdef cppclass CFileKeyMaterialStore\
+            "parquet::encryption::FileKeyMaterialStore":
+        @staticmethod
+        c_string GetKeyMaterial(c_string key_id_in_file) except +
+        vector[c_string] GetKeyIDSet() except +
+
+cdef extern from "parquet/encryption/file_system_key_material_store.h" \
+        namespace "parquet::encryption" nogil:
+    cdef cppclass CFileSystemKeyMaterialStore\
+            "parquet::encryption::FileSystemKeyMaterialStore":
+
+        @staticmethod
+        shared_ptr[CFileSystemKeyMaterialStore] Make(c_string 
parquet_file_path,
+                                                     shared_ptr[CFileSystem] 
file_system,
+                                                     c_bool use_tmp_prefix) 
except +
+
+        c_string GetKeyMaterial(c_string key_id_in_file) except +
+
+        vector[c_string] GetKeyIDSet() except +
+
+cdef extern from "parquet/encryption/key_material.h" \
+        namespace "parquet::encryption" nogil:
+    cdef cppclass CKeyMaterial "parquet::encryption::KeyMaterial":
+        @staticmethod
+        CKeyMaterial Parse(const c_string& key_material_string)
+        c_bool is_footer_key()
+        c_bool is_double_wrapped()
+        const c_string& master_key_id()
+        const c_string& wrapped_dek()
+        const c_string& kek_id()
+        const c_string& wrapped_kek()
+        const c_string& kms_instance_id()
+        const c_string& kms_instance_url()
 
 cdef extern from "arrow/python/parquet_encryption.h" \
         namespace "arrow::py::parquet::encryption" nogil:
@@ -125,8 +171,17 @@ cdef extern from "arrow/python/parquet_encryption.h" \
         CResult[shared_ptr[CFileEncryptionProperties]] \
             SafeGetFileEncryptionProperties(
             const CKmsConnectionConfig& kms_connection_config,
-            const CEncryptionConfiguration& encryption_config)
+            const CEncryptionConfiguration& encryption_config,
+            const c_string parquet_file_path,
+            const shared_ptr[CFileSystem] filesystem)
         CResult[shared_ptr[CFileDecryptionProperties]] \
             SafeGetFileDecryptionProperties(
             const CKmsConnectionConfig& kms_connection_config,
-            const CDecryptionConfiguration& decryption_config)
+            const CDecryptionConfiguration& decryption_config,
+            const c_string parquet_file_path,
+            const shared_ptr[CFileSystem] filesystem)
+        CStatus SafeRotateMasterKeys(const CKmsConnectionConfig& 
kms_connection_config,
+                                     const c_string parquet_file_path,
+                                     const shared_ptr[CFileSystem] filesystem,
+                                     c_bool double_wrapping,
+                                     double cache_lifetime_seconds)
diff --git a/python/pyarrow/src/arrow/python/parquet_encryption.cc 
b/python/pyarrow/src/arrow/python/parquet_encryption.cc
index 1016cdd3a3..4fcce64cdb 100644
--- a/python/pyarrow/src/arrow/python/parquet_encryption.cc
+++ b/python/pyarrow/src/arrow/python/parquet_encryption.cc
@@ -79,17 +79,32 @@ std::shared_ptr<::parquet::encryption::KmsClient> 
PyKmsClientFactory::CreateKmsC
 arrow::Result<std::shared_ptr<::parquet::FileEncryptionProperties>>
 PyCryptoFactory::SafeGetFileEncryptionProperties(
     const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
-    const ::parquet::encryption::EncryptionConfiguration& encryption_config) {
-  PARQUET_CATCH_AND_RETURN(
-      this->GetFileEncryptionProperties(kms_connection_config, 
encryption_config));
+    const ::parquet::encryption::EncryptionConfiguration& encryption_config,
+    const std::string& parquet_file_path,
+    const std::shared_ptr<::arrow::fs::FileSystem>& filesystem) {
+  PARQUET_CATCH_AND_RETURN(this->GetFileEncryptionProperties(
+      kms_connection_config, encryption_config, parquet_file_path, 
filesystem));
 }
 
 arrow::Result<std::shared_ptr<::parquet::FileDecryptionProperties>>
 PyCryptoFactory::SafeGetFileDecryptionProperties(
     const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
-    const ::parquet::encryption::DecryptionConfiguration& decryption_config) {
-  PARQUET_CATCH_AND_RETURN(
-      this->GetFileDecryptionProperties(kms_connection_config, 
decryption_config));
+    const ::parquet::encryption::DecryptionConfiguration& decryption_config,
+    const std::string& parquet_file_path,
+    const std::shared_ptr<::arrow::fs::FileSystem>& filesystem) {
+  PARQUET_CATCH_AND_RETURN(this->GetFileDecryptionProperties(
+      kms_connection_config, decryption_config, parquet_file_path, 
filesystem));
+}
+
+arrow::Status PyCryptoFactory::SafeRotateMasterKeys(
+    const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
+    const std::string& parquet_file_path,
+    const std::shared_ptr<::arrow::fs::FileSystem>& filesystem, bool 
double_wrapping,
+    double cache_lifetime_seconds) {
+  PARQUET_CATCH_NOT_OK(this->RotateMasterKeys(kms_connection_config, 
parquet_file_path,
+                                              filesystem, double_wrapping,
+                                              cache_lifetime_seconds));
+  return arrow::Status::OK();
 }
 
 }  // namespace encryption
diff --git a/python/pyarrow/src/arrow/python/parquet_encryption.h 
b/python/pyarrow/src/arrow/python/parquet_encryption.h
index 3e57a76194..b485b8b115 100644
--- a/python/pyarrow/src/arrow/python/parquet_encryption.h
+++ b/python/pyarrow/src/arrow/python/parquet_encryption.h
@@ -18,12 +18,14 @@
 #pragma once
 
 #include <string>
-
 #include "arrow/python/common.h"
 #include "arrow/python/visibility.h"
+#include "arrow/result.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/secure_string.h"
 #include "parquet/encryption/crypto_factory.h"
+#include "parquet/encryption/file_system_key_material_store.h"
+#include "parquet/encryption/key_material.h"
 #include "parquet/encryption/kms_client.h"
 #include "parquet/encryption/kms_client_factory.h"
 
@@ -116,7 +118,9 @@ class ARROW_PYTHON_PARQUET_ENCRYPTION_EXPORT PyCryptoFactory
   arrow::Result<std::shared_ptr<::parquet::FileEncryptionProperties>>
   SafeGetFileEncryptionProperties(
       const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
-      const ::parquet::encryption::EncryptionConfiguration& encryption_config);
+      const ::parquet::encryption::EncryptionConfiguration& encryption_config,
+      const std::string& parquet_file_path,
+      const std::shared_ptr<::arrow::fs::FileSystem>& filesystem);
 
   /// The returned FileDecryptionProperties object will use the cache inside 
this
   /// CryptoFactory object, so please keep this
@@ -125,7 +129,15 @@ class ARROW_PYTHON_PARQUET_ENCRYPTION_EXPORT 
PyCryptoFactory
   arrow::Result<std::shared_ptr<::parquet::FileDecryptionProperties>>
   SafeGetFileDecryptionProperties(
       const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
-      const ::parquet::encryption::DecryptionConfiguration& decryption_config);
+      const ::parquet::encryption::DecryptionConfiguration& decryption_config,
+      const std::string& parquet_file_path,
+      const std::shared_ptr<::arrow::fs::FileSystem>& filesystem);
+
+  arrow::Status SafeRotateMasterKeys(
+      const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
+      const std::string& parquet_file_path,
+      const std::shared_ptr<::arrow::fs::FileSystem>& filesystem, bool 
double_wrapping,
+      double cache_lifetime_seconds);
 };
 
 }  // namespace encryption
diff --git a/python/pyarrow/tests/parquet/conftest.py 
b/python/pyarrow/tests/parquet/conftest.py
index b5d2216d70..d9685d6b8b 100644
--- a/python/pyarrow/tests/parquet/conftest.py
+++ b/python/pyarrow/tests/parquet/conftest.py
@@ -103,3 +103,8 @@ def s3_example_fs(s3_server):
     fs.create_dir("mybucket")
 
     yield fs, uri, path
+
+
[email protected](scope="class")
+def reusable_tempdir(tmp_path_factory):
+    return tmp_path_factory.mktemp('pyarrow-parquet')
diff --git a/python/pyarrow/tests/parquet/encryption.py 
b/python/pyarrow/tests/parquet/encryption.py
index d07f8ae273..a103404995 100644
--- a/python/pyarrow/tests/parquet/encryption.py
+++ b/python/pyarrow/tests/parquet/encryption.py
@@ -15,8 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 import base64
-
 import pyarrow.parquet.encryption as pe
+from pyarrow._parquet_encryption import FileSystemKeyMaterialStore
+import re
 
 
 class InMemoryKmsClient(pe.KmsClient):
@@ -51,6 +52,58 @@ class InMemoryKmsClient(pe.KmsClient):
                          master_key_bytes, decrypted_key)
 
 
+def parse_wrapped_key(wrapped_key: str) -> tuple[str, int, bytes]:
+    """Parses a wrapped key string into a tuple: (key id, version, key) given
+    input in the form: <key id>:v<version>:<bas64 encoded key>"""
+    ptn = re.compile("(.+?):v([0-9]+?):(.+)")
+    if m := ptn.fullmatch(wrapped_key):
+        id, version, b64key = m.groups()
+        version = int(version)
+        key = base64.b64decode(b64key)
+        return (id, version, key)
+    else:
+        raise ValueError("Cannot parse wrapped key", wrapped_key)
+
+
+MASTER_KEY_VERSION = "master_key_version"
+
+
+class MockVersioningKmsClient(pe.KmsClient):
+    """This is a mock class implementation of KmsClient, built for testing
+    only.
+
+    During tests that involve CryptoFactory.rotate_master_keys, separate
+    instances of this client will be created when writing, rotating keys, and
+    reading back parquet data. To help unit tests verify that external key
+    material was stored correctly at each step, this client wraps keys with a
+    master_key_identifier and a version number. To ensure each client wraps
+    with the correct version, the current version is persisted in the
+    key_access_token attribute of the KmsConnectionConfig shared by all clients
+    """
+
+    def __init__(self, connection_config) -> None:
+        pe.KmsClient.__init__(self)
+        self.connection_config = connection_config
+
+    @property
+    def master_key_version(self) -> int:
+        return int(self.connection_config.key_access_token)
+
+    def wrap_key(self, key_bytes: bytes, master_key_identifier: str) -> str:
+        b64key = base64.b64encode(key_bytes).decode('utf-8')
+        return f"{master_key_identifier}:v{self.master_key_version}:{b64key}"
+
+    def unwrap_key(
+            self,
+            wrapped_key: str,
+            master_key_identifier: str) -> bytes:
+        key_id, _, key = parse_wrapped_key(wrapped_key)
+        if key_id != master_key_identifier:
+            raise ValueError("Mismatched master key identifiers:",
+                             key_id, master_key_identifier)
+        return key
+
+
 def verify_file_encrypted(path):
     """Verify that the file is encrypted by looking at its first 4 bytes.
     If it's the magic string PARE
@@ -59,3 +112,14 @@ def verify_file_encrypted(path):
         magic_str = file.read(4)
         # Verify magic string for parquet with encrypted footer is PARE
         assert magic_str == b'PARE'
+
+
+def read_external_keys_to_dict(path):
+    """Reads an external key material store given a parquet file path and
+    returns a dict mapping master_key_id to KeyMaterial objects"""
+    store = FileSystemKeyMaterialStore.for_file(path)
+    keys = dict()
+    for id in store.get_key_id_set():
+        key_material = store.get_key_material(id)
+        keys[key_material.master_key_id] = key_material
+    return keys
diff --git a/python/pyarrow/tests/parquet/test_encryption.py 
b/python/pyarrow/tests/parquet/test_encryption.py
index a11a4935a1..4e2fb069bd 100644
--- a/python/pyarrow/tests/parquet/test_encryption.py
+++ b/python/pyarrow/tests/parquet/test_encryption.py
@@ -16,7 +16,6 @@
 # under the License.
 import pytest
 from datetime import timedelta
-
 import pyarrow as pa
 try:
     import pyarrow.parquet as pq
@@ -25,8 +24,11 @@ except ImportError:
     pq = None
     pe = None
 else:
-    from pyarrow.tests.parquet.encryption import (
-        InMemoryKmsClient, verify_file_encrypted)
+    from pyarrow.tests.parquet.encryption import (InMemoryKmsClient,
+                                                  MockVersioningKmsClient,
+                                                  verify_file_encrypted,
+                                                  read_external_keys_to_dict,
+                                                  parse_wrapped_key)
 
 
 PARQUET_NAME = 'encrypted_table.in_mem.parquet'
@@ -65,6 +67,17 @@ def basic_encryption_config():
     return basic_encryption_config
 
 
[email protected](scope='module')
+def external_encryption_config():
+    external_encryption_config = pe.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME,
+        column_keys={
+            COL_KEY_NAME: ["a", "b"],
+        },
+        internal_key_material=False)
+    return external_encryption_config
+
+
 def setup_encryption_environment(custom_kms_conf):
     """
     Sets up and returns the KMS connection configuration and crypto factory
@@ -163,8 +176,12 @@ def test_uniform_encrypted_parquet_write_read(tempdir, 
data_table):
 
 def write_encrypted_parquet(path, table, encryption_config,
                             kms_connection_config, crypto_factory):
-    file_encryption_properties = crypto_factory.file_encryption_properties(
-        kms_connection_config, encryption_config)
+    if encryption_config.internal_key_material:
+        file_encryption_properties = crypto_factory.file_encryption_properties(
+            kms_connection_config, encryption_config)
+    else:
+        file_encryption_properties = crypto_factory.file_encryption_properties(
+            kms_connection_config, encryption_config, path)
     assert file_encryption_properties is not None
     with pq.ParquetWriter(
             path, table.schema,
@@ -173,9 +190,15 @@ def write_encrypted_parquet(path, table, encryption_config,
 
 
 def read_encrypted_parquet(path, decryption_config,
-                           kms_connection_config, crypto_factory):
-    file_decryption_properties = crypto_factory.file_decryption_properties(
-        kms_connection_config, decryption_config)
+                           kms_connection_config, crypto_factory,
+                           internal_key_material=True):
+    if internal_key_material:
+        file_decryption_properties = crypto_factory.file_decryption_properties(
+            kms_connection_config, decryption_config)
+    else:
+        file_decryption_properties = crypto_factory.file_decryption_properties(
+            kms_connection_config, decryption_config, path)
+
     assert file_decryption_properties is not None
     meta = pq.read_metadata(
         path, decryption_properties=file_decryption_properties)
@@ -514,31 +537,112 @@ def 
test_encrypted_parquet_write_read_plain_footer_single_wrapping(
     # assert table.num_rows == result_table.num_rows
 
 
[email protected](reason="External key material not supported yet")
-def test_encrypted_parquet_write_external(tempdir, data_table):
-    """Write an encrypted parquet, with external key
-    material.
-    Currently it's not implemented, so should throw
-    an exception"""
+def test_encrypted_parquet_write_read_external(tempdir, data_table,
+                                               external_encryption_config):
+    """Write an encrypted parquet file with external key material, verify
+    it's encrypted, then read both the table and external store.
+    """
     path = tempdir / PARQUET_NAME
 
-    # Encrypt the file with the footer key
+    kms_connection_config, crypto_factory = write_encrypted_file(
+        path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY,
+        external_encryption_config)
+
+    verify_file_encrypted(path)
+
+    decryption_config = pe.DecryptionConfiguration()
+    result_table = read_encrypted_parquet(
+        path, decryption_config, kms_connection_config, crypto_factory,
+        internal_key_material=False)
+    store = pa._parquet_encryption.FileSystemKeyMaterialStore.for_file(path)
+
+    assert len(key_ids := store.get_key_id_set()) == (
+        len(external_encryption_config.column_keys[COL_KEY_NAME]) + 1)
+    assert all([store.get_key_material(k) is not None for k in key_ids])
+    assert data_table.equals(result_table)
+
+
[email protected](
+    ("double_wrap_initial", "double_wrap_rotated"), [
+        pytest.param(True, True, id="double wrapping"),
+        pytest.param(False, True, id="single to double wrapped"),
+        pytest.param(True, False, id="double to singe wrapped"),
+        pytest.param(False, False, id="single wrapping")])
+def test_external_key_material_rotation(
+        reusable_tempdir,
+        data_table,
+        double_wrap_initial,
+        double_wrap_rotated):
+    """Tests CryptoFactory.rotate_master_keys
+
+    Note: The CryptoFactory.rotate_master_keys() double_wrapping keword arg
+    may be either True (the default) or False regardless of whether
+    EncryptionConfig.double_wrapping was set to true (also the default) when
+    the external key material store was written. This means double wrapping may
+    be set one way initially and then applied or removed during rotation.
+    """
+    path = reusable_tempdir / PARQUET_NAME
     encryption_config = pe.EncryptionConfiguration(
         footer_key=FOOTER_KEY_NAME,
-        column_keys={},
-        internal_key_material=False)
+        column_keys={COL_KEY_NAME: ["a", "b"]},
+        internal_key_material=False,
+        double_wrapping=double_wrap_initial)
 
-    kms_connection_config = pe.KmsConnectionConfig(
-        custom_kms_conf={FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8")}
-    )
+    # initial master key version - see MockVersioningKmsClient docstring
+    kms_connection_config = pe.KmsConnectionConfig(key_access_token="1")
 
     def kms_factory(kms_connection_configuration):
-        return InMemoryKmsClient(kms_connection_configuration)
-
+        return MockVersioningKmsClient(kms_connection_configuration)
     crypto_factory = pe.CryptoFactory(kms_factory)
-    # Write with encryption properties
-    write_encrypted_parquet(path, data_table, encryption_config,
-                            kms_connection_config, crypto_factory)
+    write_encrypted_parquet(
+        path,
+        data_table,
+        encryption_config,
+        kms_connection_config,
+        crypto_factory)
+    before_keys = read_external_keys_to_dict(path)
+
+    # "rotate" kms master key
+    kms_connection_config.refresh_key_access_token("2")
+
+    crypto_factory.rotate_master_keys(
+        kms_connection_config,
+        path,
+        double_wrapping=double_wrap_rotated)
+
+    after_keys = read_external_keys_to_dict(path)
+    verify_file_encrypted(path)
+    table_read_after_rotation = read_encrypted_parquet(
+        path,
+        pe.DecryptionConfiguration(),
+        kms_connection_config,
+        crypto_factory,
+        internal_key_material=False)
+    assert FOOTER_KEY_NAME in before_keys
+    assert COL_KEY_NAME in before_keys
+    assert FOOTER_KEY_NAME in after_keys
+    assert COL_KEY_NAME in after_keys
+
+    def check_rotated_external_keys(master_key_id: str) -> None:
+        before_key_mat = before_keys[master_key_id]
+        if double_wrap_initial:
+            before_key_wrapped = before_key_mat.wrapped_kek
+        else:
+            before_key_wrapped = before_key_mat.wrapped_dek
+        _, before_ver, _ = parse_wrapped_key(before_key_wrapped)
+
+        after_key_mat = after_keys[master_key_id]
+        if double_wrap_rotated:
+            after_key_wrapped = after_key_mat.wrapped_kek
+        else:
+            after_key_wrapped = after_key_mat.wrapped_dek
+        _, after_ver, _ = parse_wrapped_key(after_key_wrapped)
+
+        # CryptoFactory rewrapped keys if after version is later than before
+        assert before_ver < after_ver
+    check_rotated_external_keys(FOOTER_KEY_NAME)
+    check_rotated_external_keys(COL_KEY_NAME)
+    assert data_table.equals(table_read_after_rotation)
 
 
 def test_encrypted_parquet_loop(tempdir, data_table, basic_encryption_config):


Reply via email to