This is an automated email from the ASF dual-hosted git repository.
areeve pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5112de2322 GH-31869: [Python][Parquet] Implement external key material
features in Python (#48009)
5112de2322 is described below
commit 5112de23222dda6fa449f0488265ee0679fd227b
Author: Patrick Parsons <[email protected]>
AuthorDate: Tue Nov 11 22:46:58 2025 -0500
GH-31869: [Python][Parquet] Implement external key material features in
Python (#48009)
### Rationale for this change
Enables external key material and rotation for individual parquet files in
PyArrow. This change does not address any parquet dataset encryption
functionality.
### What changes are included in this PR?
This PR enables external key material for parquet encryption from PyArrow:
Optional parquet_file_path and FileSystem paramters to CryptoFactory -
mirroring the interface for CryptoFactory in C++
1. Exposes the rotate_master_keys method of CryptoFactory
2. Adds Cython classes for FileKeyMaterialStore,
FileSystemKeyMaterialStore, and KeyMaterial - but does not expose these from
PyArrow encryption. I included these changes only so that a unit test may
verify an external store without leaking the implementation details for the
store into the test.
### Are these changes tested?
Yes - I've modified an existing test (previously marked pytest.xfail) to do
a basic read write test and verify creation of the external key material store
and added a test for CryptoFactory.rotate_master_keys.
### Are there any user-facing changes?
1. Users may optionally supply a parquet file path and FileSystem to
CryptoFactory methods that provide en/decryption_properties. Doing so in
conjunction with setting EncryptionConfiguration.internal_key_material=False
enables external key material from pyarrow.
2. PyArrow CryptoFactory now has a rotate_master_keys method exposing key
rotation functionality from C++ CryptoFactory.
* GitHub Issue: #31869
Lead-authored-by: Patrick Parsons
<[email protected]>
Co-authored-by: Adam Reeve <[email protected]>
Signed-off-by: Adam Reeve <[email protected]>
---
.../encryption/file_system_key_material_store.cc | 1 -
.../encryption/file_system_key_material_store.h | 4 +
python/pyarrow/_parquet_encryption.pxd | 8 +
python/pyarrow/_parquet_encryption.pyx | 218 ++++++++++++++++++++-
python/pyarrow/includes/libparquet_encryption.pxd | 63 +++++-
.../pyarrow/src/arrow/python/parquet_encryption.cc | 27 ++-
.../pyarrow/src/arrow/python/parquet_encryption.h | 18 +-
python/pyarrow/tests/parquet/conftest.py | 5 +
python/pyarrow/tests/parquet/encryption.py | 66 ++++++-
python/pyarrow/tests/parquet/test_encryption.py | 154 ++++++++++++---
10 files changed, 519 insertions(+), 45 deletions(-)
diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.cc
b/cpp/src/parquet/encryption/file_system_key_material_store.cc
index cbecf2645d..7a7db3fa62 100644
--- a/cpp/src/parquet/encryption/file_system_key_material_store.cc
+++ b/cpp/src/parquet/encryption/file_system_key_material_store.cc
@@ -25,7 +25,6 @@
#include "parquet/encryption/file_system_key_material_store.h"
#include "parquet/encryption/key_material.h"
-#include "parquet/exception.h"
namespace parquet::encryption {
diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.h
b/cpp/src/parquet/encryption/file_system_key_material_store.h
index 3babfdbf82..ecbadb90a9 100644
--- a/cpp/src/parquet/encryption/file_system_key_material_store.h
+++ b/cpp/src/parquet/encryption/file_system_key_material_store.h
@@ -24,6 +24,7 @@
#include "arrow/filesystem/filesystem.h"
#include "parquet/encryption/file_key_material_store.h"
+#include "parquet/exception.h"
namespace parquet::encryption {
@@ -59,6 +60,9 @@ class PARQUET_EXPORT FileSystemKeyMaterialStore : public
FileKeyMaterialStore {
LoadKeyMaterialMap();
}
auto found = key_material_map_.find(key_id_in_file);
+ if (found == key_material_map_.end()) {
+ throw ParquetException("Invalid key id");
+ }
return found->second;
}
diff --git a/python/pyarrow/_parquet_encryption.pxd
b/python/pyarrow/_parquet_encryption.pxd
index d52669501a..48939fe277 100644
--- a/python/pyarrow/_parquet_encryption.pxd
+++ b/python/pyarrow/_parquet_encryption.pxd
@@ -49,6 +49,14 @@ cdef class KmsConnectionConfig(_Weakrefable):
@staticmethod
cdef wrap(const CKmsConnectionConfig& config)
+cdef class KeyMaterial(_Weakrefable):
+ cdef shared_ptr[CKeyMaterial] key_material
+
+ @staticmethod
+ cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material)
+
+cdef class FileSystemKeyMaterialStore(_Weakrefable):
+ cdef shared_ptr[CFileSystemKeyMaterialStore] store
cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object
crypto_factory) except *
cdef shared_ptr[CKmsConnectionConfig]
pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except *
diff --git a/python/pyarrow/_parquet_encryption.pyx
b/python/pyarrow/_parquet_encryption.pyx
index f95464e303..6185d5f239 100644
--- a/python/pyarrow/_parquet_encryption.pyx
+++ b/python/pyarrow/_parquet_encryption.pyx
@@ -25,9 +25,11 @@ from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
+from pyarrow.lib cimport check_status
from pyarrow.lib cimport _Weakrefable
from pyarrow.lib import tobytes, frombytes
-
+from pyarrow._fs cimport FileSystem
+from pyarrow.fs import _resolve_filesystem_and_path
cdef ParquetCipher cipher_from_name(name):
name = name.upper()
@@ -364,6 +366,13 @@ cdef void _cb_create_kms_client(
out[0] = (<KmsClient> result).unwrap()
+cdef inline shared_ptr[CFileSystem] _unwrap_fs(filesystem: FileSystem | None):
+ if isinstance(filesystem, FileSystem):
+ return filesystem.unwrap()
+ else:
+ return <shared_ptr[CFileSystem]>nullptr
+
+
cdef class CryptoFactory(_Weakrefable):
""" A factory that produces the low-level FileEncryptionProperties and
FileDecryptionProperties objects, from the high-level parameters."""
@@ -402,7 +411,9 @@ cdef class CryptoFactory(_Weakrefable):
def file_encryption_properties(self,
KmsConnectionConfig kms_connection_config,
- EncryptionConfiguration encryption_config):
+ EncryptionConfiguration encryption_config,
+ parquet_file_path=None,
+ FileSystem filesystem=None):
"""Create file encryption properties.
Parameters
@@ -413,6 +424,17 @@ cdef class CryptoFactory(_Weakrefable):
encryption_config : EncryptionConfiguration
Configuration of the encryption, such as which columns to encrypt
+ parquet_file_path : str, pathlib.Path, or None, default None
+ Path to the parquet file to be encrypted. Only required when the
+ internal_key_material attribute of EncryptionConfiguration is set
+ to False. Used to derive the path for storing key material
+ specific to this parquet file.
+
+ filesystem : FileSystem or None, default None
+ Used only when internal_key_material is set to False on
+ EncryptionConfiguration. If None, the file system will be inferred
+ based on parquet_file_path.
+
Returns
-------
file_encryption_properties : FileEncryptionProperties
@@ -421,11 +443,23 @@ cdef class CryptoFactory(_Weakrefable):
cdef:
CResult[shared_ptr[CFileEncryptionProperties]] \
file_encryption_properties_result
+ c_string c_parquet_file_path
+ shared_ptr[CFileSystem] c_filesystem
+
+ filesystem, parquet_file_path = _resolve_filesystem_and_path(
+ parquet_file_path, filesystem)
+ if parquet_file_path is not None:
+ c_parquet_file_path = tobytes(parquet_file_path)
+ else:
+ c_parquet_file_path = tobytes("")
+ c_filesystem = _unwrap_fs(filesystem)
+
with nogil:
file_encryption_properties_result = \
self.factory.get().SafeGetFileEncryptionProperties(
deref(kms_connection_config.unwrap().get()),
- deref(encryption_config.unwrap().get()))
+ deref(encryption_config.unwrap().get()),
+ c_parquet_file_path, c_filesystem)
file_encryption_properties = GetResultValue(
file_encryption_properties_result)
return FileEncryptionProperties.wrap(file_encryption_properties)
@@ -433,7 +467,9 @@ cdef class CryptoFactory(_Weakrefable):
def file_decryption_properties(
self,
KmsConnectionConfig kms_connection_config,
- DecryptionConfiguration decryption_config=None):
+ DecryptionConfiguration decryption_config=None,
+ parquet_file_path=None,
+ FileSystem filesystem=None):
"""Create file decryption properties.
Parameters
@@ -445,6 +481,15 @@ cdef class CryptoFactory(_Weakrefable):
Configuration of the decryption, such as cache timeout.
Can be None.
+ parquet_file_path : str, pathlib.Path, or None, default None
+ Path to the parquet file to be decrypted. Only required when
+ the parquet file uses external key material. Used to derive
+ the path to the external key material file.
+
+ filesystem : FileSystem or None, default None
+ Used only when the parquet file uses external key material. If
+ None, the file system will be inferred based on parquet_file_path.
+
Returns
-------
file_decryption_properties : FileDecryptionProperties
@@ -454,6 +499,17 @@ cdef class CryptoFactory(_Weakrefable):
CDecryptionConfiguration c_decryption_config
CResult[shared_ptr[CFileDecryptionProperties]] \
c_file_decryption_properties
+ c_string c_parquet_file_path
+ shared_ptr[CFileSystem] c_filesystem
+
+ filesystem, parquet_file_path = _resolve_filesystem_and_path(
+ parquet_file_path, filesystem)
+ if parquet_file_path is not None:
+ c_parquet_file_path = tobytes(parquet_file_path)
+ else:
+ c_parquet_file_path = tobytes("")
+ c_filesystem = _unwrap_fs(filesystem)
+
if decryption_config is None:
c_decryption_config = CDecryptionConfiguration()
else:
@@ -462,7 +518,7 @@ cdef class CryptoFactory(_Weakrefable):
c_file_decryption_properties = \
self.factory.get().SafeGetFileDecryptionProperties(
deref(kms_connection_config.unwrap().get()),
- c_decryption_config)
+ c_decryption_config, c_parquet_file_path, c_filesystem)
file_decryption_properties = GetResultValue(
c_file_decryption_properties)
return FileDecryptionProperties.wrap(file_decryption_properties)
@@ -473,9 +529,161 @@ cdef class CryptoFactory(_Weakrefable):
def remove_cache_entries_for_all_tokens(self):
self.factory.get().RemoveCacheEntriesForAllTokens()
+ def rotate_master_keys(
+ self,
+ KmsConnectionConfig kms_connection_config,
+ parquet_file_path,
+ FileSystem filesystem=None,
+ double_wrapping=True,
+ cache_lifetime_seconds=600):
+ """ Rotates master encryption keys for a Parquet file that uses
+ external key material.
+
+ Parameters
+ ----------
+ kms_connection_config : KmsConnectionConfig
+ Configuration of connection to KMS
+
+ parquet_file_path : str or pathlib.Path
+ Path to a parquet file using external key material.
+
+ filesystem : FileSystem or None, default None
+ Used only when the parquet file uses external key material. If
+ None, the file system will be inferred based on parquet_file_path.
+
+ double_wrapping : bool, default True
+ In the single wrapping mode, encrypts data encryption keys with
+ new master keys. In the double wrapping mode, generates new
+ KEKs (key encryption keys) and uses these to encrypt the data keys,
+ and encrypts the KEKs with the new master keys.
+
+ cache_lifetime_seconds : int or float, default 600
+ During key rotation, KMS Client and Key Encryption Keys will be
+ cached for this duration.
+ """
+ cdef:
+ c_string c_parquet_file_path
+ shared_ptr[CFileSystem] c_filesystem
+
+ if parquet_file_path != "":
+ filesystem, parquet_file_path = _resolve_filesystem_and_path(
+ parquet_file_path, filesystem)
+
+ c_parquet_file_path = tobytes(parquet_file_path)
+ c_filesystem = _unwrap_fs(filesystem)
+
+ status = self.factory.get().SafeRotateMasterKeys(
+ deref(kms_connection_config.unwrap().get()),
+ c_parquet_file_path,
+ c_filesystem,
+ double_wrapping,
+ cache_lifetime_seconds)
+
+ check_status(status)
+
cdef inline shared_ptr[CPyCryptoFactory] unwrap(self):
return self.factory
+cdef class KeyMaterial(_Weakrefable):
+
+ @property
+ def is_footer_key(self):
+ return self.key_material.get().is_footer_key()
+
+ @property
+ def is_double_wrapped(self):
+ return self.key_material.get().is_double_wrapped()
+
+ @property
+ def master_key_id(self):
+ return frombytes(self.key_material.get().master_key_id())
+
+ @property
+ def wrapped_dek(self):
+ return frombytes(self.key_material.get().wrapped_dek())
+
+ @property
+ def kek_id(self):
+ return frombytes(self.key_material.get().kek_id())
+
+ @property
+ def wrapped_kek(self):
+ return frombytes(self.key_material.get().wrapped_kek())
+
+ @property
+ def kms_instance_id(self):
+ return frombytes(self.key_material.get().kms_instance_id())
+
+ @property
+ def kms_instance_url(self):
+ return frombytes(self.key_material.get().kms_instance_url())
+
+ @staticmethod
+ cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material):
+ wrapper = KeyMaterial()
+ wrapper.key_material = key_material
+ return wrapper
+
+ @staticmethod
+ def parse(
+ const c_string key_material_string):
+ cdef:
+ shared_ptr[CKeyMaterial] c_key_material
+ c_key_material = make_shared[CKeyMaterial](move(
+ CKeyMaterial.Parse(key_material_string)
+ ))
+ return KeyMaterial.wrap(c_key_material)
+
+cdef class FileSystemKeyMaterialStore(_Weakrefable):
+
+ def get_key_material(self, key_id):
+ cdef:
+ c_string c_key_id = tobytes(key_id)
+ c_string c_key_material_string
+
+ c_key_material_string = self.store.get().GetKeyMaterial(c_key_id)
+ if c_key_material_string.empty():
+ raise KeyError("Invalid key id")
+ return KeyMaterial.parse(c_key_material_string)
+
+ def get_key_id_set(self):
+ return self.store.get().GetKeyIDSet()
+
+ @classmethod
+ def for_file(cls, parquet_file_path,
+ FileSystem filesystem=None):
+ """Creates a FileSystemKeyMaterialStore for a parquet file that
+ was created with external key material.
+
+ Parameters
+ ----------
+ parquet_file_path : str or pathlib.Path
+ Path to a parquet file using external key material.
+
+ filesystem : FileSystem, default None
+ FileSystem where the parquet file is located. If None,
+ will be inferred based on parquet_file_path.
+
+ Returns
+ -------
+ FileSystemKeyMaterialStore
+ A FileSystemKeyMaterialStore wrapping the external key material.
+ """
+ cdef:
+ c_string c_parquet_file_path
+ shared_ptr[CFileSystem] c_filesystem
+ shared_ptr[CFileSystemKeyMaterialStore] c_store
+ FileSystemKeyMaterialStore store = cls()
+
+ filesystem, parquet_file_path = _resolve_filesystem_and_path(
+ parquet_file_path, filesystem)
+ c_parquet_file_path = tobytes(parquet_file_path)
+ c_filesystem = _unwrap_fs(filesystem)
+
+ c_store = CFileSystemKeyMaterialStore.Make(
+ c_parquet_file_path, c_filesystem, False)
+ store.store = c_store
+ return store
cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object
crypto_factory) except *:
if isinstance(crypto_factory, CryptoFactory):
diff --git a/python/pyarrow/includes/libparquet_encryption.pxd
b/python/pyarrow/includes/libparquet_encryption.pxd
index 7e031925af..7024f14ac2 100644
--- a/python/pyarrow/includes/libparquet_encryption.pxd
+++ b/python/pyarrow/includes/libparquet_encryption.pxd
@@ -19,6 +19,7 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport CSecureString
+from pyarrow.includes.libarrow_fs cimport CFileSystem
from pyarrow._parquet cimport (ParquetCipher,
CFileEncryptionProperties,
CFileDecryptionProperties,
@@ -91,12 +92,57 @@ cdef extern from "parquet/encryption/crypto_factory.h" \
shared_ptr[CKmsClientFactory] kms_client_factory) except +
shared_ptr[CFileEncryptionProperties] GetFileEncryptionProperties(
const CKmsConnectionConfig& kms_connection_config,
- const CEncryptionConfiguration& encryption_config) except +*
+ const CEncryptionConfiguration& encryption_config,
+ const c_string parquet_file_path,
+ const shared_ptr[CFileSystem] file_system) except +*
shared_ptr[CFileDecryptionProperties] GetFileDecryptionProperties(
const CKmsConnectionConfig& kms_connection_config,
- const CDecryptionConfiguration& decryption_config) except +*
+ const CDecryptionConfiguration& decryption_config,
+ const c_string parquet_file_path,
+ const shared_ptr[CFileSystem] file_system) except +*
void RemoveCacheEntriesForToken(const c_string& access_token) except +
void RemoveCacheEntriesForAllTokens() except +
+ void RotateMasterKeys(const CKmsConnectionConfig&
kms_connection_config,
+ const c_string parquet_file_path,
+ const shared_ptr[CFileSystem] file_system,
+ c_bool double_wrapping,
+ double cache_lifetime_seconds)
+
+cdef extern from "parquet/encryption/file_key_material_store.h" \
+ namespace "parquet::encryption" nogil:
+ cdef cppclass CFileKeyMaterialStore\
+ "parquet::encryption::FileKeyMaterialStore":
+ @staticmethod
+ c_string GetKeyMaterial(c_string key_id_in_file) except +
+ vector[c_string] GetKeyIDSet() except +
+
+cdef extern from "parquet/encryption/file_system_key_material_store.h" \
+ namespace "parquet::encryption" nogil:
+ cdef cppclass CFileSystemKeyMaterialStore\
+ "parquet::encryption::FileSystemKeyMaterialStore":
+
+ @staticmethod
+ shared_ptr[CFileSystemKeyMaterialStore] Make(c_string
parquet_file_path,
+ shared_ptr[CFileSystem]
file_system,
+ c_bool use_tmp_prefix)
except +
+
+ c_string GetKeyMaterial(c_string key_id_in_file) except +
+
+ vector[c_string] GetKeyIDSet() except +
+
+cdef extern from "parquet/encryption/key_material.h" \
+ namespace "parquet::encryption" nogil:
+ cdef cppclass CKeyMaterial "parquet::encryption::KeyMaterial":
+ @staticmethod
+ CKeyMaterial Parse(const c_string& key_material_string)
+ c_bool is_footer_key()
+ c_bool is_double_wrapped()
+ const c_string& master_key_id()
+ const c_string& wrapped_dek()
+ const c_string& kek_id()
+ const c_string& wrapped_kek()
+ const c_string& kms_instance_id()
+ const c_string& kms_instance_url()
cdef extern from "arrow/python/parquet_encryption.h" \
namespace "arrow::py::parquet::encryption" nogil:
@@ -125,8 +171,17 @@ cdef extern from "arrow/python/parquet_encryption.h" \
CResult[shared_ptr[CFileEncryptionProperties]] \
SafeGetFileEncryptionProperties(
const CKmsConnectionConfig& kms_connection_config,
- const CEncryptionConfiguration& encryption_config)
+ const CEncryptionConfiguration& encryption_config,
+ const c_string parquet_file_path,
+ const shared_ptr[CFileSystem] filesystem)
CResult[shared_ptr[CFileDecryptionProperties]] \
SafeGetFileDecryptionProperties(
const CKmsConnectionConfig& kms_connection_config,
- const CDecryptionConfiguration& decryption_config)
+ const CDecryptionConfiguration& decryption_config,
+ const c_string parquet_file_path,
+ const shared_ptr[CFileSystem] filesystem)
+ CStatus SafeRotateMasterKeys(const CKmsConnectionConfig&
kms_connection_config,
+ const c_string parquet_file_path,
+ const shared_ptr[CFileSystem] filesystem,
+ c_bool double_wrapping,
+ double cache_lifetime_seconds)
diff --git a/python/pyarrow/src/arrow/python/parquet_encryption.cc
b/python/pyarrow/src/arrow/python/parquet_encryption.cc
index 1016cdd3a3..4fcce64cdb 100644
--- a/python/pyarrow/src/arrow/python/parquet_encryption.cc
+++ b/python/pyarrow/src/arrow/python/parquet_encryption.cc
@@ -79,17 +79,32 @@ std::shared_ptr<::parquet::encryption::KmsClient>
PyKmsClientFactory::CreateKmsC
arrow::Result<std::shared_ptr<::parquet::FileEncryptionProperties>>
PyCryptoFactory::SafeGetFileEncryptionProperties(
const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
- const ::parquet::encryption::EncryptionConfiguration& encryption_config) {
- PARQUET_CATCH_AND_RETURN(
- this->GetFileEncryptionProperties(kms_connection_config,
encryption_config));
+ const ::parquet::encryption::EncryptionConfiguration& encryption_config,
+ const std::string& parquet_file_path,
+ const std::shared_ptr<::arrow::fs::FileSystem>& filesystem) {
+ PARQUET_CATCH_AND_RETURN(this->GetFileEncryptionProperties(
+ kms_connection_config, encryption_config, parquet_file_path,
filesystem));
}
arrow::Result<std::shared_ptr<::parquet::FileDecryptionProperties>>
PyCryptoFactory::SafeGetFileDecryptionProperties(
const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
- const ::parquet::encryption::DecryptionConfiguration& decryption_config) {
- PARQUET_CATCH_AND_RETURN(
- this->GetFileDecryptionProperties(kms_connection_config,
decryption_config));
+ const ::parquet::encryption::DecryptionConfiguration& decryption_config,
+ const std::string& parquet_file_path,
+ const std::shared_ptr<::arrow::fs::FileSystem>& filesystem) {
+ PARQUET_CATCH_AND_RETURN(this->GetFileDecryptionProperties(
+ kms_connection_config, decryption_config, parquet_file_path,
filesystem));
+}
+
+arrow::Status PyCryptoFactory::SafeRotateMasterKeys(
+ const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
+ const std::string& parquet_file_path,
+ const std::shared_ptr<::arrow::fs::FileSystem>& filesystem, bool
double_wrapping,
+ double cache_lifetime_seconds) {
+ PARQUET_CATCH_NOT_OK(this->RotateMasterKeys(kms_connection_config,
parquet_file_path,
+ filesystem, double_wrapping,
+ cache_lifetime_seconds));
+ return arrow::Status::OK();
}
} // namespace encryption
diff --git a/python/pyarrow/src/arrow/python/parquet_encryption.h
b/python/pyarrow/src/arrow/python/parquet_encryption.h
index 3e57a76194..b485b8b115 100644
--- a/python/pyarrow/src/arrow/python/parquet_encryption.h
+++ b/python/pyarrow/src/arrow/python/parquet_encryption.h
@@ -18,12 +18,14 @@
#pragma once
#include <string>
-
#include "arrow/python/common.h"
#include "arrow/python/visibility.h"
+#include "arrow/result.h"
#include "arrow/util/macros.h"
#include "arrow/util/secure_string.h"
#include "parquet/encryption/crypto_factory.h"
+#include "parquet/encryption/file_system_key_material_store.h"
+#include "parquet/encryption/key_material.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/encryption/kms_client_factory.h"
@@ -116,7 +118,9 @@ class ARROW_PYTHON_PARQUET_ENCRYPTION_EXPORT PyCryptoFactory
arrow::Result<std::shared_ptr<::parquet::FileEncryptionProperties>>
SafeGetFileEncryptionProperties(
const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
- const ::parquet::encryption::EncryptionConfiguration& encryption_config);
+ const ::parquet::encryption::EncryptionConfiguration& encryption_config,
+ const std::string& parquet_file_path,
+ const std::shared_ptr<::arrow::fs::FileSystem>& filesystem);
/// The returned FileDecryptionProperties object will use the cache inside
this
/// CryptoFactory object, so please keep this
@@ -125,7 +129,15 @@ class ARROW_PYTHON_PARQUET_ENCRYPTION_EXPORT
PyCryptoFactory
arrow::Result<std::shared_ptr<::parquet::FileDecryptionProperties>>
SafeGetFileDecryptionProperties(
const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
- const ::parquet::encryption::DecryptionConfiguration& decryption_config);
+ const ::parquet::encryption::DecryptionConfiguration& decryption_config,
+ const std::string& parquet_file_path,
+ const std::shared_ptr<::arrow::fs::FileSystem>& filesystem);
+
+ arrow::Status SafeRotateMasterKeys(
+ const ::parquet::encryption::KmsConnectionConfig& kms_connection_config,
+ const std::string& parquet_file_path,
+ const std::shared_ptr<::arrow::fs::FileSystem>& filesystem, bool
double_wrapping,
+ double cache_lifetime_seconds);
};
} // namespace encryption
diff --git a/python/pyarrow/tests/parquet/conftest.py
b/python/pyarrow/tests/parquet/conftest.py
index b5d2216d70..d9685d6b8b 100644
--- a/python/pyarrow/tests/parquet/conftest.py
+++ b/python/pyarrow/tests/parquet/conftest.py
@@ -103,3 +103,8 @@ def s3_example_fs(s3_server):
fs.create_dir("mybucket")
yield fs, uri, path
+
+
[email protected](scope="class")
+def reusable_tempdir(tmp_path_factory):
+ return tmp_path_factory.mktemp('pyarrow-parquet')
diff --git a/python/pyarrow/tests/parquet/encryption.py
b/python/pyarrow/tests/parquet/encryption.py
index d07f8ae273..a103404995 100644
--- a/python/pyarrow/tests/parquet/encryption.py
+++ b/python/pyarrow/tests/parquet/encryption.py
@@ -15,8 +15,9 @@
# specific language governing permissions and limitations
# under the License.
import base64
-
import pyarrow.parquet.encryption as pe
+from pyarrow._parquet_encryption import FileSystemKeyMaterialStore
+import re
class InMemoryKmsClient(pe.KmsClient):
@@ -51,6 +52,58 @@ class InMemoryKmsClient(pe.KmsClient):
master_key_bytes, decrypted_key)
+def parse_wrapped_key(wrapped_key: str) -> tuple[str, int, bytes]:
+ """Parses a wrapped key string into a tuple: (key id, version, key) given
+ input in the form: <key id>:v<version>:<bas64 encoded key>"""
+ ptn = re.compile("(.+?):v([0-9]+?):(.+)")
+ if m := ptn.fullmatch(wrapped_key):
+ id, version, b64key = m.groups()
+ version = int(version)
+ key = base64.b64decode(b64key)
+ return (id, version, key)
+ else:
+ raise ValueError("Cannot parse wrapped key", wrapped_key)
+
+
+MASTER_KEY_VERSION = "master_key_version"
+
+
+class MockVersioningKmsClient(pe.KmsClient):
+ """This is a mock class implementation of KmsClient, built for testing
+ only.
+
+ During tests that involve CryptoFactory.rotate_master_keys, separate
+ instances of this client will be created when writing, rotating keys, and
+ reading back parquet data. To help unit tests verify that external key
+ material was stored correctly at each step, this client wraps keys with a
+ master_key_identifier and a version number. To ensure each client wraps
+ with the correct version, the current version is persisted in the
+ key_access_token attribute of the KmsConnectionConfig shared by all clients
+ """
+
+ def __init__(self, connection_config) -> None:
+ pe.KmsClient.__init__(self)
+ self.connection_config = connection_config
+
+ @property
+ def master_key_version(self) -> int:
+ return int(self.connection_config.key_access_token)
+
+ def wrap_key(self, key_bytes: bytes, master_key_identifier: str) -> str:
+ b64key = base64.b64encode(key_bytes).decode('utf-8')
+ return f"{master_key_identifier}:v{self.master_key_version}:{b64key}"
+
+ def unwrap_key(
+ self,
+ wrapped_key: str,
+ master_key_identifier: str) -> bytes:
+ key_id, _, key = parse_wrapped_key(wrapped_key)
+ if key_id != master_key_identifier:
+ raise ValueError("Mismatched master key identifiers:",
+ key_id, master_key_identifier)
+ return key
+
+
def verify_file_encrypted(path):
"""Verify that the file is encrypted by looking at its first 4 bytes.
If it's the magic string PARE
@@ -59,3 +112,14 @@ def verify_file_encrypted(path):
magic_str = file.read(4)
# Verify magic string for parquet with encrypted footer is PARE
assert magic_str == b'PARE'
+
+
+def read_external_keys_to_dict(path):
+ """Reads an external key material store given a parquet file path and
+ returns a dict mapping master_key_id to KeyMaterial objects"""
+ store = FileSystemKeyMaterialStore.for_file(path)
+ keys = dict()
+ for id in store.get_key_id_set():
+ key_material = store.get_key_material(id)
+ keys[key_material.master_key_id] = key_material
+ return keys
diff --git a/python/pyarrow/tests/parquet/test_encryption.py
b/python/pyarrow/tests/parquet/test_encryption.py
index a11a4935a1..4e2fb069bd 100644
--- a/python/pyarrow/tests/parquet/test_encryption.py
+++ b/python/pyarrow/tests/parquet/test_encryption.py
@@ -16,7 +16,6 @@
# under the License.
import pytest
from datetime import timedelta
-
import pyarrow as pa
try:
import pyarrow.parquet as pq
@@ -25,8 +24,11 @@ except ImportError:
pq = None
pe = None
else:
- from pyarrow.tests.parquet.encryption import (
- InMemoryKmsClient, verify_file_encrypted)
+ from pyarrow.tests.parquet.encryption import (InMemoryKmsClient,
+ MockVersioningKmsClient,
+ verify_file_encrypted,
+ read_external_keys_to_dict,
+ parse_wrapped_key)
PARQUET_NAME = 'encrypted_table.in_mem.parquet'
@@ -65,6 +67,17 @@ def basic_encryption_config():
return basic_encryption_config
[email protected](scope='module')
+def external_encryption_config():
+ external_encryption_config = pe.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ internal_key_material=False)
+ return external_encryption_config
+
+
def setup_encryption_environment(custom_kms_conf):
"""
Sets up and returns the KMS connection configuration and crypto factory
@@ -163,8 +176,12 @@ def test_uniform_encrypted_parquet_write_read(tempdir,
data_table):
def write_encrypted_parquet(path, table, encryption_config,
kms_connection_config, crypto_factory):
- file_encryption_properties = crypto_factory.file_encryption_properties(
- kms_connection_config, encryption_config)
+ if encryption_config.internal_key_material:
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config)
+ else:
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config, path)
assert file_encryption_properties is not None
with pq.ParquetWriter(
path, table.schema,
@@ -173,9 +190,15 @@ def write_encrypted_parquet(path, table, encryption_config,
def read_encrypted_parquet(path, decryption_config,
- kms_connection_config, crypto_factory):
- file_decryption_properties = crypto_factory.file_decryption_properties(
- kms_connection_config, decryption_config)
+ kms_connection_config, crypto_factory,
+ internal_key_material=True):
+ if internal_key_material:
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config, decryption_config)
+ else:
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config, decryption_config, path)
+
assert file_decryption_properties is not None
meta = pq.read_metadata(
path, decryption_properties=file_decryption_properties)
@@ -514,31 +537,112 @@ def
test_encrypted_parquet_write_read_plain_footer_single_wrapping(
# assert table.num_rows == result_table.num_rows
[email protected](reason="External key material not supported yet")
-def test_encrypted_parquet_write_external(tempdir, data_table):
- """Write an encrypted parquet, with external key
- material.
- Currently it's not implemented, so should throw
- an exception"""
+def test_encrypted_parquet_write_read_external(tempdir, data_table,
+ external_encryption_config):
+ """Write an encrypted parquet file with external key material, verify
+ it's encrypted, then read both the table and external store.
+ """
path = tempdir / PARQUET_NAME
- # Encrypt the file with the footer key
+ kms_connection_config, crypto_factory = write_encrypted_file(
+ path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY,
+ external_encryption_config)
+
+ verify_file_encrypted(path)
+
+ decryption_config = pe.DecryptionConfiguration()
+ result_table = read_encrypted_parquet(
+ path, decryption_config, kms_connection_config, crypto_factory,
+ internal_key_material=False)
+ store = pa._parquet_encryption.FileSystemKeyMaterialStore.for_file(path)
+
+ assert len(key_ids := store.get_key_id_set()) == (
+ len(external_encryption_config.column_keys[COL_KEY_NAME]) + 1)
+ assert all([store.get_key_material(k) is not None for k in key_ids])
+ assert data_table.equals(result_table)
+
+
[email protected](
+ ("double_wrap_initial", "double_wrap_rotated"), [
+ pytest.param(True, True, id="double wrapping"),
+ pytest.param(False, True, id="single to double wrapped"),
+ pytest.param(True, False, id="double to singe wrapped"),
+ pytest.param(False, False, id="single wrapping")])
+def test_external_key_material_rotation(
+ reusable_tempdir,
+ data_table,
+ double_wrap_initial,
+ double_wrap_rotated):
+ """Tests CryptoFactory.rotate_master_keys
+
+ Note: The CryptoFactory.rotate_master_keys() double_wrapping keword arg
+ may be either True (the default) or False regardless of whether
+ EncryptionConfig.double_wrapping was set to true (also the default) when
+ the external key material store was written. This means double wrapping may
+ be set one way initially and then applied or removed during rotation.
+ """
+ path = reusable_tempdir / PARQUET_NAME
encryption_config = pe.EncryptionConfiguration(
footer_key=FOOTER_KEY_NAME,
- column_keys={},
- internal_key_material=False)
+ column_keys={COL_KEY_NAME: ["a", "b"]},
+ internal_key_material=False,
+ double_wrapping=double_wrap_initial)
- kms_connection_config = pe.KmsConnectionConfig(
- custom_kms_conf={FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8")}
- )
+ # initial master key version - see MockVersioningKmsClient docstring
+ kms_connection_config = pe.KmsConnectionConfig(key_access_token="1")
def kms_factory(kms_connection_configuration):
- return InMemoryKmsClient(kms_connection_configuration)
-
+ return MockVersioningKmsClient(kms_connection_configuration)
crypto_factory = pe.CryptoFactory(kms_factory)
- # Write with encryption properties
- write_encrypted_parquet(path, data_table, encryption_config,
- kms_connection_config, crypto_factory)
+ write_encrypted_parquet(
+ path,
+ data_table,
+ encryption_config,
+ kms_connection_config,
+ crypto_factory)
+ before_keys = read_external_keys_to_dict(path)
+
+ # "rotate" kms master key
+ kms_connection_config.refresh_key_access_token("2")
+
+ crypto_factory.rotate_master_keys(
+ kms_connection_config,
+ path,
+ double_wrapping=double_wrap_rotated)
+
+ after_keys = read_external_keys_to_dict(path)
+ verify_file_encrypted(path)
+ table_read_after_rotation = read_encrypted_parquet(
+ path,
+ pe.DecryptionConfiguration(),
+ kms_connection_config,
+ crypto_factory,
+ internal_key_material=False)
+ assert FOOTER_KEY_NAME in before_keys
+ assert COL_KEY_NAME in before_keys
+ assert FOOTER_KEY_NAME in after_keys
+ assert COL_KEY_NAME in after_keys
+
+ def check_rotated_external_keys(master_key_id: str) -> None:
+ before_key_mat = before_keys[master_key_id]
+ if double_wrap_initial:
+ before_key_wrapped = before_key_mat.wrapped_kek
+ else:
+ before_key_wrapped = before_key_mat.wrapped_dek
+ _, before_ver, _ = parse_wrapped_key(before_key_wrapped)
+
+ after_key_mat = after_keys[master_key_id]
+ if double_wrap_rotated:
+ after_key_wrapped = after_key_mat.wrapped_kek
+ else:
+ after_key_wrapped = after_key_mat.wrapped_dek
+ _, after_ver, _ = parse_wrapped_key(after_key_wrapped)
+
+ # CryptoFactory rewrapped keys if after version is later than before
+ assert before_ver < after_ver
+ check_rotated_external_keys(FOOTER_KEY_NAME)
+ check_rotated_external_keys(COL_KEY_NAME)
+ assert data_table.equals(table_read_after_rotation)
def test_encrypted_parquet_loop(tempdir, data_table, basic_encryption_config):