pitrou commented on code in PR #34616:
URL: https://github.com/apache/arrow/pull/34616#discussion_r1346108609


##########
cpp/src/arrow/dataset/file_parquet_test.cc:
##########
@@ -424,6 +425,34 @@ TEST_F(TestParquetFileSystemDataset, 
WriteWithEmptyPartitioningSchema) {
   TestWriteWithEmptyPartitioningSchema();
 }
 
+TEST_F(TestParquetFileSystemDataset, WriteWithEncryptionConfigNotSupported) {
+#ifndef PARQUET_REQUIRE_ENCRYPTION
+  // Create a dummy ParquetEncryptionConfig
+  std::shared_ptr<ParquetEncryptionConfig> encryption_config =
+      std::make_shared<ParquetEncryptionConfig>();
+
+  auto options =
+      
checked_pointer_cast<ParquetFileWriteOptions>(format_->DefaultWriteOptions());
+
+  // Set the encryption config in the options
+  options->parquet_encryption_config = encryption_config;
+
+  // Setup mock filesystem and test data
+  auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+  std::shared_ptr<Schema> test_schema = schema({field("x", int32())});
+  std::shared_ptr<RecordBatch> batch = RecordBatchFromJSON(test_schema, 
"[[0]]");
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::OutputStream> out_stream,
+                       mock_fs->OpenOutputStream("/foo.parquet"));
+  std::cout << "B" << std::endl;

Review Comment:
   I suppose these debug prints, could you remove them (there's another one 
below)? :-)



##########
cpp/src/arrow/dataset/file_parquet.h:
##########
@@ -226,6 +229,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public 
FragmentScanOptions {
   /// ScanOptions. Additionally, dictionary columns come from
   /// ParquetFileFormat::ReaderOptions::dict_columns.
   std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
+  /// A configuration structure that provides encryption properties for a 
dataset

Review Comment:
   ```suggestion
     /// A configuration structure that provides decryption properties for a 
dataset
   ```



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -711,6 +889,20 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
     cdef ArrowReaderProperties* arrow_reader_properties(self):
         return self.parquet_options.arrow_reader_properties.get()
 
+    IF PARQUET_ENCRYPTION_ENABLED:
+        @property
+        def parquet_decryption_config(self):

Review Comment:
   > PARQUET_ENCRYPTION_ENABLED is a compile-time flag that would be set 
intentionally, having the code throw an exception at runtime could lead to 
confusion.
   
   With the current code, a runtime exception would be raised anyway, it would 
just be a generic `AttributeError`. A specific exception could have a 
descriptive error message telling the user why the attributes are not available.



##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -67,8 +72,24 @@ parquet::ReaderProperties MakeReaderProperties(
     properties.disable_buffered_stream();
   }
   
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
+
+#ifdef PARQUET_REQUIRE_ENCRYPTION
+  auto parquet_decrypt_config = 
parquet_scan_options->parquet_decryption_config;
+
+  if (parquet_decrypt_config != nullptr) {
+    auto file_decryption_prop =
+        parquet_decrypt_config->crypto_factory->GetFileDecryptionProperties(
+            *parquet_decrypt_config->kms_connection_config,
+            *parquet_decrypt_config->decryption_config, path, filesystem);
+
+    parquet_scan_options->reader_properties->file_decryption_properties(
+        std::move(file_decryption_prop));
+  }
+#endif

Review Comment:
   Should we raise `NotImplemented` in the same way as for encryption if 
Parquet encryption was not enabled?
   For example (untested):
   ```suggestion
   #else
     if (parquet_scan_options->parquet_decryption_config != nullptr) {
       return Status::NotImplemented("Encryption is not supported in this 
build.");
     }
   #endif
   ```



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -56,9 +63,180 @@ from pyarrow._parquet cimport (
 
 cdef Expression _true = Expression._scalar(True)
 
-
 ctypedef CParquetFileWriter* _CParquetFileWriterPtr
 
+IF PARQUET_ENCRYPTION_ENABLED:
+    cdef class ParquetEncryptionConfig(_Weakrefable):
+        """
+        Core configuration class encapsulating parameters for high-level 
encryption
+        within the Parquet framework.
+
+        The ParquetEncryptionConfig class serves as a bridge for passing 
encryption-related
+        parameters to the appropriate components within the Parquet library. 
It maintains references
+        to objects that define the encryption strategy, Key Management Service 
(KMS) configuration,
+        and specific encryption configurations for Parquet data.
+
+        Parameters
+        ----------
+        crypto_factory : pyarrow.parquet.encryption.CryptoFactory
+            Shared pointer to a `CryptoFactory` object. The `CryptoFactory` is 
responsible for
+            creating cryptographic components, such as encryptors and 
decryptors.
+        kms_connection_config : pyarrow.parquet.encryption.KmsConnectionConfig
+            Shared pointer to a `KmsConnectionConfig` object. This object 
holds the configuration
+            parameters necessary for connecting to a Key Management Service 
(KMS).
+        encryption_config : pyarrow.parquet.encryption.EncryptionConfiguration
+            Shared pointer to an `EncryptionConfiguration` object. This object 
defines specific
+            encryption settings for Parquet data, including the keys assigned 
to different columns.
+
+        Raises
+        ------
+        ValueError
+            Raised if `encryption_config` is None.
+        """
+        cdef:
+            shared_ptr[CParquetEncryptionConfig] c_config
+
+        # Avoid mistakenly creating attributes
+        __slots__ = ()
+
+        def __cinit__(self, CryptoFactory crypto_factory, KmsConnectionConfig 
kms_connection_config,
+                      EncryptionConfiguration encryption_config):
+
+            cdef shared_ptr[CEncryptionConfiguration] c_encryption_config
+
+            if crypto_factory is None:
+                raise ValueError("crypto_factory cannot be None")
+
+            if kms_connection_config is None:
+                raise ValueError("kms_connection_config cannot be None")
+
+            if encryption_config is None:
+                raise ValueError("encryption_config cannot be None")
+
+            self.c_config.reset(new CParquetEncryptionConfig())
+
+            c_encryption_config = 
ParquetEncryptionConfig.unwrap_encryptionconfig(
+                encryption_config)
+
+            self.c_config.get().crypto_factory = 
ParquetEncryptionConfig.unwrap_cryptofactory(crypto_factory)
+            self.c_config.get().kms_connection_config = 
ParquetEncryptionConfig.unwrap_kmsconnectionconfig(
+                kms_connection_config)
+            self.c_config.get().encryption_config = c_encryption_config
+
+        @staticmethod
+        cdef wrap(shared_ptr[CParquetEncryptionConfig] c_config):
+            cdef ParquetEncryptionConfig python_config = 
ParquetEncryptionConfig.__new__(ParquetEncryptionConfig)
+            python_config.c_config = c_config
+            return python_config
+
+        cdef shared_ptr[CParquetEncryptionConfig] unwrap(self):
+            return self.c_config
+
+        @staticmethod
+        cdef shared_ptr[CCryptoFactory] unwrap_cryptofactory(object 
crypto_factory) except *:
+            if isinstance(crypto_factory, CryptoFactory):
+                pycf = (<CryptoFactory> crypto_factory).unwrap()
+                return static_pointer_cast[CCryptoFactory, 
CPyCryptoFactory](pycf)
+            raise TypeError("Expected CryptoFactory, got %s" % 
type(crypto_factory))
+
+        @staticmethod
+        cdef shared_ptr[CKmsConnectionConfig] 
unwrap_kmsconnectionconfig(object kmsconnectionconfig):
+            if isinstance(kmsconnectionconfig, KmsConnectionConfig):
+                return (<KmsConnectionConfig> kmsconnectionconfig).unwrap()
+            raise TypeError("Expected KmsConnectionConfig, got %s" %
+                            type(kmsconnectionconfig))
+
+        @staticmethod
+        cdef shared_ptr[CEncryptionConfiguration] 
unwrap_encryptionconfig(object encryptionconfig):
+            if isinstance(encryptionconfig, EncryptionConfiguration):
+                return (<EncryptionConfiguration> encryptionconfig).unwrap()
+            raise TypeError("Expected EncryptionConfiguration, got %s" %
+                            type(encryptionconfig))
+
+    cdef class ParquetDecryptionConfig(_Weakrefable):
+        """
+        Core configuration class encapsulating parameters for high-level 
decryption
+        within the Parquet framework.
+
+        ParquetDecryptionConfig is designed to pass decryption-related 
parameters to
+        the appropriate decryption components within the Parquet library. It 
holds references to
+        objects that define the decryption strategy, Key Management Service 
(KMS) configuration,
+        and specific decryption configurations for reading encrypted Parquet 
data.
+
+        Parameters
+        ----------
+        crypto_factory : pyarrow.parquet.encryption.CryptoFactory
+            Shared pointer to a `CryptoFactory` object, pivotal in creating 
cryptographic
+            components for the decryption process.
+        kms_connection_config : pyarrow.parquet.encryption.KmsConnectionConfig
+            Shared pointer to a `KmsConnectionConfig` object, containing 
parameters necessary
+            for connecting to a Key Management Service (KMS) during decryption.
+        decryption_config : pyarrow.parquet.encryption.DecryptionConfiguration
+            Shared pointer to a `DecryptionConfiguration` object, specifying 
decryption settings
+            for reading encrypted Parquet data.
+
+        Raises
+        ------
+        ValueError
+            Raised if `decryption_config` is None.
+        """
+
+        cdef:
+            shared_ptr[CParquetDecryptionConfig] c_config
+
+        # Avoid mistakingly creating attributes
+        __slots__ = ()
+
+        def __cinit__(self, CryptoFactory crypto_factory, KmsConnectionConfig 
kms_connection_config,
+                      DecryptionConfiguration decryption_config):
+
+            cdef shared_ptr[CDecryptionConfiguration] c_decryption_config
+
+            if decryption_config is None:
+                raise ValueError(
+                    "decryption_config cannot be None")
+
+            self.c_config.reset(new CParquetDecryptionConfig())
+
+            c_decryption_config = 
ParquetDecryptionConfig.unwrap_decryptionconfig(
+                decryption_config)
+
+            self.c_config.get().crypto_factory = 
ParquetDecryptionConfig.unwrap_cryptofactory(crypto_factory)
+            self.c_config.get().kms_connection_config = 
ParquetDecryptionConfig.unwrap_kmsconnectionconfig(
+                kms_connection_config)
+            self.c_config.get().decryption_config = c_decryption_config
+
+        @staticmethod
+        cdef wrap(shared_ptr[CParquetDecryptionConfig] c_config):
+            cdef ParquetDecryptionConfig python_config = 
ParquetDecryptionConfig.__new__(ParquetDecryptionConfig)
+            python_config.c_config = c_config
+            return python_config
+
+        cdef shared_ptr[CParquetDecryptionConfig] unwrap(self):
+            return self.c_config
+
+        @staticmethod
+        cdef shared_ptr[CCryptoFactory] unwrap_cryptofactory(object 
crypto_factory) except *:
+            if isinstance(crypto_factory, CryptoFactory):
+                pycf = (<CryptoFactory> crypto_factory).unwrap()
+                return static_pointer_cast[CCryptoFactory, 
CPyCryptoFactory](pycf)
+            raise TypeError("Expected CryptoFactory, got %s" % 
type(crypto_factory))
+
+        @staticmethod
+        cdef shared_ptr[CKmsConnectionConfig] 
unwrap_kmsconnectionconfig(object kmsconnectionconfig) except *:
+            if isinstance(kmsconnectionconfig, KmsConnectionConfig):
+                return (<KmsConnectionConfig> kmsconnectionconfig).unwrap()
+            raise TypeError("Expected KmsConnectionConfig, got %s" %
+                            type(kmsconnectionconfig))
+
+        @staticmethod
+        cdef shared_ptr[CDecryptionConfiguration] 
unwrap_decryptionconfig(object decryptionconfig) except *:
+            if isinstance(decryptionconfig, DecryptionConfiguration):
+                return (<DecryptionConfiguration> decryptionconfig).unwrap()
+
+            raise TypeError("Expected DecryptionConfiguration, got %s" %
+                            type(decryptionconfig))

Review Comment:
   I may be mistaken, but it seems these three static methods are the same as 
in from `ParquetEncryptionConfig`? To avoid duplication, I think you can simply 
make them top-level functions instead of methods.



##########
cpp/src/arrow/dataset/parquet_encryption_config.h:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/dataset/type_fwd.h"
+
+namespace parquet::encryption {
+class CryptoFactory;
+struct KmsConnectionConfig;
+struct EncryptionConfiguration;
+struct DecryptionConfiguration;
+}  // namespace parquet::encryption
+
+namespace arrow {
+namespace dataset {
+
+struct ARROW_DS_EXPORT ParquetEncryptionConfig {
+  std::shared_ptr<parquet::encryption::CryptoFactory> crypto_factory;
+  std::shared_ptr<parquet::encryption::KmsConnectionConfig> 
kms_connection_config;
+  std::shared_ptr<parquet::encryption::EncryptionConfiguration> 
encryption_config;
+  /// \brief Core configuration class encapsulating parameters for high-level 
encryption

Review Comment:
   Nice docstrings, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to