pitrou commented on code in PR #34616:
URL: https://github.com/apache/arrow/pull/34616#discussion_r1317167793


##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -67,8 +69,24 @@ parquet::ReaderProperties MakeReaderProperties(
     properties.disable_buffered_stream();
   }
   
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
+
+#ifdef PARQUET_REQUIRE_ENCRYPTION

Review Comment:
   Not as critical as for encryption, but if `PARQUET_REQUIRE_ENCRYPTION` we 
might want to disallow passing a `parquet_decryption_config`? Or is there a 
reason to be lenient?



##########
cpp/src/arrow/dataset/file_parquet.h:
##########
@@ -236,6 +241,9 @@ class ARROW_DS_EXPORT ParquetFileWriteOptions : public 
FileWriteOptions {
   /// \brief Parquet Arrow writer properties.
   std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;
 
+  // A configuration structure that provides per file encryption properties 
for a dataset

Review Comment:
   Same.



##########
cpp/src/arrow/dataset/CMakeLists.txt:
##########
@@ -175,6 +175,13 @@ endif()
 
 if(ARROW_PARQUET)
   add_arrow_dataset_test(file_parquet_test)
+  if(PARQUET_REQUIRE_ENCRYPTION AND ARROW_DATASET)
+    add_arrow_dataset_test(dataset_encryption_test
+                           SOURCES
+                           dataset_encryption_test.cc

Review Comment:
   I assume this is specific to Parquet, so can we perhaps call it 
`file_parquet_encryption_test.cc`?
   



##########
cpp/src/arrow/dataset/parquet_encryption_config.h:
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/dataset/type_fwd.h"
+#include "parquet/encryption/crypto_factory.h"
+#include "parquet/encryption/encryption.h"
+#include "parquet/encryption/kms_client.h"

Review Comment:
   I don't think we need to include these, forward declarations would be 
enough, right?
   
   Perhaps we can create and populate a `parquet/encryption/type_fwd.h` that we 
would include here.



##########
python/pyarrow/_parquet_encryption.pxd:
##########
@@ -131,3 +131,22 @@ cdef extern from "arrow/python/parquet_encryption.h" \
             SafeGetFileDecryptionProperties(
             const CKmsConnectionConfig& kms_connection_config,
             const CDecryptionConfiguration& decryption_config)
+
+cdef extern from "arrow/dataset/parquet_encryption_config.h" namespace 
"arrow::dataset" nogil:
+    cdef cppclass CParquetEncryptionConfig 
"arrow::dataset::ParquetEncryptionConfig":
+        CParquetEncryptionConfig() except +
+        void Setup(shared_ptr[CCryptoFactory] crypto_factory,
+                   shared_ptr[CKmsConnectionConfig] kms_connection_config,
+                   shared_ptr[CEncryptionConfiguration] encryption_config)
+
+    cdef cppclass CParquetDecryptionConfig 
"arrow::dataset::ParquetDecryptionConfig":
+        CParquetDecryptionConfig() except +
+        void Setup(shared_ptr[CCryptoFactory] crypto_factory,
+                   shared_ptr[CKmsConnectionConfig] kms_connection_config,
+                   shared_ptr[CDecryptionConfiguration] decryption_config)
+
+
+cdef public shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object 
crypto_factory)

Review Comment:
   It doesn't seem necessary to create and expose public Cython/C APIs for 
this. We could just add the necessary Cython declarations for the caller code 
to unwrap those objects directly, like we do for example here:
   
https://github.com/apache/arrow/blob/ad7f6ef3a97be6a3eb6fe37d0df1b1634db057d2/python/pyarrow/_parquet.pxd#L550-L564
   
   
   



##########
python/pyarrow/_parquet_encryption.pyx:
##########
@@ -466,3 +466,47 @@ cdef class CryptoFactory(_Weakrefable):
 
     def remove_cache_entries_for_all_tokens(self):
         self.factory.get().RemoveCacheEntriesForAllTokens()
+
+    cdef inline shared_ptr[CPyCryptoFactory] unwrap(self) nogil:

Review Comment:
   This is accessing a Python object (`self`) so `nogil` doesn't look right. 
Also, it's not productive to pay the cost of releasing the GIL for such a 
trivial method.



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -33,6 +33,12 @@ from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow.includes.libarrow_dataset_parquet cimport *
 from pyarrow._fs cimport FileSystem
 
+IF PARQUET_ENCRYPTION_ENABLED:

Review Comment:
   Oh, interesting. I didn't know that Cython allowed compile-time conditionals 
and that definitely makes things easier here :-)



##########
python/pyarrow/_parquet_encryption.pxd:
##########
@@ -131,3 +131,22 @@ cdef extern from "arrow/python/parquet_encryption.h" \
             SafeGetFileDecryptionProperties(
             const CKmsConnectionConfig& kms_connection_config,
             const CDecryptionConfiguration& decryption_config)
+
+cdef extern from "arrow/dataset/parquet_encryption_config.h" namespace 
"arrow::dataset" nogil:
+    cdef cppclass CParquetEncryptionConfig 
"arrow::dataset::ParquetEncryptionConfig":
+        CParquetEncryptionConfig() except +

Review Comment:
   You don't need to declare a default constructor, I think.
   Also, I don't think the constructor would raise any C++ exception, so the 
`except +` seems pointless.



##########
python/pyarrow/includes/libarrow_parquet_readwrite.pxd:
##########
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.libarrow_dataset cimport *
+from pyarrow._parquet cimport *
+
+cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:

Review Comment:
   Hmm... I'm not sure what the point is of moving these declarations to a new 
file? 



##########
cpp/src/arrow/dataset/dataset_encryption_test.cc:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string_view>
+
+#include "gtest/gtest.h"
+
+#include "arrow/api.h"
+#include "arrow/dataset/api.h"

Review Comment:
   `api.h` files are generally heavy-weight and can blow up compile times, so 
it would be preferrable to use more specific inclusions depending on what is 
really needed here.



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -693,6 +825,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         if thrift_container_size_limit is not None:
             self.thrift_container_size_limit = thrift_container_size_limit
 
+        IF PARQUET_ENCRYPTION_ENABLED:

Review Comment:
   Is the parameter ignored otherwise? That doesn't seem like a good idea.



##########
cpp/src/arrow/dataset/parquet_encryption_config.h:
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/dataset/type_fwd.h"
+#include "parquet/encryption/crypto_factory.h"
+#include "parquet/encryption/encryption.h"
+#include "parquet/encryption/kms_client.h"

Review Comment:
   And if we do that, it probably means we can move the 
`ParquetEncryptionConfig` and `ParquetDecryptionConfi` directly into 
`dataset/file_parquet.h`?



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -597,6 +709,15 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
             data_page_version=self._properties["data_page_version"],
         )
 
+        IF PARQUET_ENCRYPTION_ENABLED:
+            cdef shared_ptr[CParquetEncryptionConfig] c_config
+            if self._properties["encryption_config"]:
+                config = self._properties["encryption_config"]
+                if not isinstance(config, ParquetEncryptionConfig):
+                    raise ValueError("config must be a 
ParquetEncryptionConfig")

Review Comment:
   1. raise TypeError
   2. make the error message more precise (which config?)
   ```suggestion
                       raise TypeError("encryption_config must be a 
ParquetEncryptionConfig")
   ```



##########
cpp/src/parquet/properties.h:
##########
@@ -770,6 +791,11 @@ class PARQUET_EXPORT WriterProperties {
     }
   }
 
+  // \brief Returns the default column properties

Review Comment:
   Nit: use infinitive not imperative
   ```suggestion
     // \brief Return the default column properties
   ```



##########
python/examples/dataset/write_dataset_encrypted.py:
##########
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+import pyarrow.parquet.encryption as pe
+from pyarrow.tests.parquet.encryption import InMemoryKmsClient
+from datetime import timedelta
+import shutil
+import os

Review Comment:
   Hmm, please let's follow the conventional import order (even though we might 
not always observe it faithfully :-)):
   1. stdlib imports
   2. a blank line
   3. third-party imports
   4. a blank line
   5. pyarrow imports
   



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -56,9 +62,113 @@ from pyarrow._parquet cimport (
 
 cdef Expression _true = Expression._scalar(True)
 
-
 ctypedef CParquetFileWriter* _CParquetFileWriterPtr
 
+IF PARQUET_ENCRYPTION_ENABLED:
+
+    from pyarrow._parquet_encryption cimport *
+
+    cdef class ParquetEncryptionConfig(_Weakrefable):
+        """
+        Configuration for Parquet Encryption.
+
+        Parameters
+        ----------
+        crypto_factory : object

Review Comment:
   It would be more informative to give the expected type here, for example:
   ```suggestion
           crypto_factory : CryptoFactory
   ```
   
   (same for other parameters below)



##########
cpp/src/arrow/dataset/file_parquet.h:
##########
@@ -226,6 +229,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public 
FragmentScanOptions {
   /// ScanOptions. Additionally, dictionary columns come from
   /// ParquetFileFormat::ReaderOptions::dict_columns.
   std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
+  /// A configuration structure that provides per file encryption properties 
for a dataset

Review Comment:
   "per file" seems misleading since `ParquetDecryptionConfig` is 
file-agnostic. The per-file part is derived by the dataset machinery, not 
provided by the user.



##########
cpp/src/arrow/dataset/parquet_encryption_config.h:
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/dataset/type_fwd.h"
+#include "parquet/encryption/crypto_factory.h"
+#include "parquet/encryption/encryption.h"
+#include "parquet/encryption/kms_client.h"
+
+namespace arrow {
+namespace dataset {
+
+/// core class, that translates the parameters of high level encryption
+struct ARROW_DS_EXPORT ParquetEncryptionConfig {
+  void Setup(

Review Comment:
   I don't understand what this `Setup` method is for, since:
   1. this seems trivially the same as [aggregate 
initialization](https://en.cppreference.com/w/cpp/language/aggregate_initialization),
 right?
   2. one can also set the attribute values individually anyway
   



##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -622,10 +640,36 @@ Result<std::shared_ptr<FileWriter>> 
ParquetFileFormat::MakeWriter(
   auto parquet_options = 
checked_pointer_cast<ParquetFileWriteOptions>(options);
 
   std::unique_ptr<parquet::arrow::FileWriter> parquet_writer;
-  ARROW_ASSIGN_OR_RAISE(parquet_writer, parquet::arrow::FileWriter::Open(
-                                            *schema, default_memory_pool(), 
destination,
-                                            parquet_options->writer_properties,
-                                            
parquet_options->arrow_writer_properties));
+
+#ifdef PARQUET_REQUIRE_ENCRYPTION

Review Comment:
   Hmm, so what happens if `PARQUET_REQUIRE_ENCRYPTION` is false and the user 
sets a `parquet_encrypt_config`? We should certainly silently produce 
unencrypted files, so it should perhaps return a `Status::NotImplemented`.
   
   Also, it should probably be tested somewhere.



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -56,9 +62,113 @@ from pyarrow._parquet cimport (
 
 cdef Expression _true = Expression._scalar(True)
 
-
 ctypedef CParquetFileWriter* _CParquetFileWriterPtr
 
+IF PARQUET_ENCRYPTION_ENABLED:
+
+    from pyarrow._parquet_encryption cimport *
+
+    cdef class ParquetEncryptionConfig(_Weakrefable):
+        """
+        Configuration for Parquet Encryption.
+
+        Parameters
+        ----------
+        crypto_factory : object
+            Factory for creating cryptographic instances.
+        kms_connection_config : object
+            Configuration for connecting to Key Management Service.
+        encryption_config : object
+            Configuration for encryption settings.
+
+        Raises
+        ------
+        ValueError
+            If encryption_config is None.
+        """
+        cdef:
+            shared_ptr[CParquetEncryptionConfig] c_config
+
+        # Avoid mistakenly creating attributes
+        __slots__ = ()
+
+        def __cinit__(self, object crypto_factory, object 
kms_connection_config,
+                      object encryption_config):
+
+            cdef shared_ptr[CEncryptionConfiguration] c_encryption_config
+
+            if encryption_config is None:
+                raise ValueError(
+                    "encryption_config cannot be None")
+
+            self.c_config.reset(new CParquetEncryptionConfig())
+
+            c_encryption_config = 
pyarrow_unwrap_encryptionconfig(encryption_config)
+
+            
self.c_config.get().Setup(pyarrow_unwrap_cryptofactory(crypto_factory),
+                                      pyarrow_unwrap_kmsconnectionconfig(
+                kms_connection_config),
+                c_encryption_config)
+
+        @staticmethod
+        cdef wrap(shared_ptr[CParquetEncryptionConfig] c_config):
+            cdef ParquetEncryptionConfig python_config = 
ParquetEncryptionConfig.__new__(ParquetEncryptionConfig)
+            python_config.c_config = c_config
+            return python_config
+
+        cdef shared_ptr[CParquetEncryptionConfig] unwrap(self):
+            return self.c_config
+
+    cdef class ParquetDecryptionConfig(_Weakrefable):
+        """

Review Comment:
   Similar comments below as for `ParquetEncryptionConfig`.



##########
python/pyarrow/includes/libarrow_parquet_readwrite_encryption.pxd:
##########
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.libarrow_dataset cimport *
+from pyarrow._parquet cimport *
+from pyarrow._parquet_encryption cimport *
+
+cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
+    cdef cppclass CParquetFileWriteOptions \

Review Comment:
   Same question: why not declare these in `libarrow_dataset_parquet.pxd`?



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -56,9 +62,113 @@ from pyarrow._parquet cimport (
 
 cdef Expression _true = Expression._scalar(True)
 
-
 ctypedef CParquetFileWriter* _CParquetFileWriterPtr
 
+IF PARQUET_ENCRYPTION_ENABLED:
+
+    from pyarrow._parquet_encryption cimport *
+
+    cdef class ParquetEncryptionConfig(_Weakrefable):
+        """
+        Configuration for Parquet Encryption.
+
+        Parameters
+        ----------
+        crypto_factory : object
+            Factory for creating cryptographic instances.
+        kms_connection_config : object
+            Configuration for connecting to Key Management Service.
+        encryption_config : object
+            Configuration for encryption settings.
+
+        Raises
+        ------
+        ValueError
+            If encryption_config is None.

Review Comment:
   But why only for encryption_config? Why not check all input args for None?



##########
cpp/src/arrow/dataset/dataset_encryption_test.cc:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string_view>
+
+#include "gtest/gtest.h"
+
+#include "arrow/api.h"
+#include "arrow/dataset/api.h"
+#include "arrow/dataset/parquet_encryption_config.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/io/api.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/encryption/test_in_memory_kms.h"
+
+constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
+constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
+constexpr std::string_view kFooterKeyName = "footer_key";
+constexpr std::string_view kColumnMasterKey = "1234567890123450";
+constexpr std::string_view kColumnMasterKeysId = "col_key";
+constexpr std::string_view kColumnKeyMapping = "col_key: a";
+constexpr std::string_view kBaseDir = "";
+
+using arrow::internal::checked_pointer_cast;
+
+namespace arrow {
+namespace dataset {
+
+class DatasetEncryptionTest : public ::testing::Test {
+ public:
+  // This function creates a mock file system using the current time point, 
creates a
+  // directory with the given base directory path, and writes a dataset to it 
using
+  // provided Parquet file write options. The dataset is partitioned using a 
Hive
+  // partitioning scheme. The function also checks if the written files exist 
in the file
+  // system.
+  static void SetUpTestSuite() {
+    // Creates a mock file system using the current time point.
+    EXPECT_OK_AND_ASSIGN(file_system, fs::internal::MockFileSystem::Make(
+                                          std::chrono::system_clock::now(), 
{}));
+    ASSERT_OK(file_system->CreateDir(std::string(kBaseDir)));
+
+    // Prepare table data.
+    auto table_schema = schema({field("a", int64()), field("b", int64()),
+                                field("c", int64()), field("part", utf8())});
+    table = TableFromJSON(table_schema, {R"([
+       [ 0, 9, 1, "a" ],
+       [ 1, 8, 2, "b" ],
+       [ 2, 7, 1, "c" ],
+       [ 3, 6, 2, "d" ],
+       [ 4, 5, 1, "e" ],
+       [ 5, 4, 2, "f" ],
+       [ 6, 3, 1, "g" ],
+       [ 7, 2, 2, "h" ],
+       [ 8, 1, 1, "i" ],
+       [ 9, 0, 2, "j" ]
+     ])"});
+
+    // Use a Hive-style partitioning scheme.
+    partitioning = std::make_shared<HivePartitioning>(schema({field("part", 
utf8())}));
+
+    // Prepare encryption properties.
+    std::unordered_map<std::string, std::string> key_map;
+    key_map.emplace(kColumnMasterKeysId, kColumnMasterKey);
+    key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);
+
+    crypto_factory = std::make_shared<parquet::encryption::CryptoFactory>();
+    auto kms_client_factory =
+        
std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
+            /*wrap_locally=*/true, key_map);
+    crypto_factory->RegisterKmsClientFactory(std::move(kms_client_factory));
+    kms_connection_config = 
std::make_shared<parquet::encryption::KmsConnectionConfig>();
+
+    // Set write options with encryption configuration.
+    auto encryption_config =
+        std::make_shared<parquet::encryption::EncryptionConfiguration>(
+            std::string(kFooterKeyName));
+    encryption_config->column_keys = kColumnKeyMapping;
+    auto parquet_encryption_config = 
std::make_shared<ParquetEncryptionConfig>();
+    parquet_encryption_config->Setup(crypto_factory, kms_connection_config,
+                                     std::move(encryption_config));
+
+    auto file_format = std::make_shared<ParquetFileFormat>();
+    auto parquet_file_write_options =
+        
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
+    parquet_file_write_options->parquet_encryption_config =
+        std::move(parquet_encryption_config);
+
+    // Write dataset.
+    auto dataset = std::make_shared<InMemoryDataset>(table);
+    EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+    EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+    FileSystemDatasetWriteOptions write_options;
+    write_options.file_write_options = parquet_file_write_options;
+    write_options.filesystem = file_system;
+    write_options.base_dir = kBaseDir;
+    write_options.partitioning = partitioning;
+    write_options.basename_template = "part{i}.parquet";
+    ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
+
+    // Verify that the files exist
+    std::vector<std::string> files = {"part=a/part0.parquet", 
"part=b/part0.parquet",
+                                      "part=c/part0.parquet", 
"part=d/part0.parquet",
+                                      "part=e/part0.parquet", 
"part=f/part0.parquet",
+                                      "part=g/part0.parquet", 
"part=h/part0.parquet",
+                                      "part=i/part0.parquet", 
"part=j/part0.parquet"};
+    for (const auto& file_path : files) {
+      ASSERT_OK_AND_ASSIGN(auto result, file_system->GetFileInfo(file_path));
+      ASSERT_EQ(result.type(), fs::FileType::File);
+    }
+  }
+
+ protected:
+  inline static std::shared_ptr<fs::FileSystem> file_system;

Review Comment:
   Our convention is to append underscores in members of classes (except for 
plain light-weight structs). So `file_system_` here, for example.



##########
cpp/src/arrow/dataset/dataset_encryption_test.cc:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string_view>
+
+#include "gtest/gtest.h"
+
+#include "arrow/api.h"
+#include "arrow/dataset/api.h"
+#include "arrow/dataset/parquet_encryption_config.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/io/api.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/encryption/test_in_memory_kms.h"
+
+constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
+constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
+constexpr std::string_view kFooterKeyName = "footer_key";
+constexpr std::string_view kColumnMasterKey = "1234567890123450";
+constexpr std::string_view kColumnMasterKeysId = "col_key";

Review Comment:
   Nit, but should this be `kColumnMasterKeyId` (not "keys")?



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -56,9 +62,113 @@ from pyarrow._parquet cimport (
 
 cdef Expression _true = Expression._scalar(True)
 
-
 ctypedef CParquetFileWriter* _CParquetFileWriterPtr
 
+IF PARQUET_ENCRYPTION_ENABLED:
+
+    from pyarrow._parquet_encryption cimport *
+
+    cdef class ParquetEncryptionConfig(_Weakrefable):
+        """
+        Configuration for Parquet Encryption.
+
+        Parameters
+        ----------
+        crypto_factory : object
+            Factory for creating cryptographic instances.
+        kms_connection_config : object
+            Configuration for connecting to Key Management Service.
+        encryption_config : object
+            Configuration for encryption settings.
+
+        Raises
+        ------
+        ValueError
+            If encryption_config is None.
+        """
+        cdef:
+            shared_ptr[CParquetEncryptionConfig] c_config
+
+        # Avoid mistakenly creating attributes
+        __slots__ = ()
+
+        def __cinit__(self, object crypto_factory, object 
kms_connection_config,
+                      object encryption_config):

Review Comment:
   "object" is superfluous in this context as it's the default.
   



##########
cpp/src/arrow/dataset/dataset_encryption_test.cc:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string_view>
+
+#include "gtest/gtest.h"
+
+#include "arrow/api.h"
+#include "arrow/dataset/api.h"
+#include "arrow/dataset/parquet_encryption_config.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/io/api.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/encryption/test_in_memory_kms.h"
+
+constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
+constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
+constexpr std::string_view kFooterKeyName = "footer_key";
+constexpr std::string_view kColumnMasterKey = "1234567890123450";
+constexpr std::string_view kColumnMasterKeysId = "col_key";
+constexpr std::string_view kColumnKeyMapping = "col_key: a";
+constexpr std::string_view kBaseDir = "";
+
+using arrow::internal::checked_pointer_cast;
+
+namespace arrow {
+namespace dataset {
+
+class DatasetEncryptionTest : public ::testing::Test {
+ public:
+  // This function creates a mock file system using the current time point, 
creates a
+  // directory with the given base directory path, and writes a dataset to it 
using
+  // provided Parquet file write options. The dataset is partitioned using a 
Hive
+  // partitioning scheme. The function also checks if the written files exist 
in the file
+  // system.
+  static void SetUpTestSuite() {
+    // Creates a mock file system using the current time point.
+    EXPECT_OK_AND_ASSIGN(file_system, fs::internal::MockFileSystem::Make(
+                                          std::chrono::system_clock::now(), 
{}));
+    ASSERT_OK(file_system->CreateDir(std::string(kBaseDir)));
+
+    // Prepare table data.
+    auto table_schema = schema({field("a", int64()), field("b", int64()),
+                                field("c", int64()), field("part", utf8())});
+    table = TableFromJSON(table_schema, {R"([
+       [ 0, 9, 1, "a" ],
+       [ 1, 8, 2, "b" ],
+       [ 2, 7, 1, "c" ],
+       [ 3, 6, 2, "d" ],
+       [ 4, 5, 1, "e" ],
+       [ 5, 4, 2, "f" ],
+       [ 6, 3, 1, "g" ],
+       [ 7, 2, 2, "h" ],
+       [ 8, 1, 1, "i" ],
+       [ 9, 0, 2, "j" ]

Review Comment:
   Perhaps we can make this non-trivial by having non-unique `part` values?
   (not sure that would change much for this test, but still)



##########
cpp/src/arrow/dataset/dataset_encryption_test.cc:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string_view>
+
+#include "gtest/gtest.h"
+
+#include "arrow/api.h"
+#include "arrow/dataset/api.h"
+#include "arrow/dataset/parquet_encryption_config.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/io/api.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/encryption/test_in_memory_kms.h"
+
+constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
+constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
+constexpr std::string_view kFooterKeyName = "footer_key";
+constexpr std::string_view kColumnMasterKey = "1234567890123450";
+constexpr std::string_view kColumnMasterKeysId = "col_key";
+constexpr std::string_view kColumnKeyMapping = "col_key: a";
+constexpr std::string_view kBaseDir = "";
+
+using arrow::internal::checked_pointer_cast;
+
+namespace arrow {
+namespace dataset {
+
+class DatasetEncryptionTest : public ::testing::Test {
+ public:
+  // This function creates a mock file system using the current time point, 
creates a
+  // directory with the given base directory path, and writes a dataset to it 
using
+  // provided Parquet file write options. The dataset is partitioned using a 
Hive
+  // partitioning scheme. The function also checks if the written files exist 
in the file
+  // system.
+  static void SetUpTestSuite() {
+    // Creates a mock file system using the current time point.
+    EXPECT_OK_AND_ASSIGN(file_system, fs::internal::MockFileSystem::Make(
+                                          std::chrono::system_clock::now(), 
{}));
+    ASSERT_OK(file_system->CreateDir(std::string(kBaseDir)));
+
+    // Prepare table data.
+    auto table_schema = schema({field("a", int64()), field("b", int64()),
+                                field("c", int64()), field("part", utf8())});
+    table = TableFromJSON(table_schema, {R"([
+       [ 0, 9, 1, "a" ],
+       [ 1, 8, 2, "b" ],
+       [ 2, 7, 1, "c" ],
+       [ 3, 6, 2, "d" ],
+       [ 4, 5, 1, "e" ],
+       [ 5, 4, 2, "f" ],
+       [ 6, 3, 1, "g" ],
+       [ 7, 2, 2, "h" ],
+       [ 8, 1, 1, "i" ],
+       [ 9, 0, 2, "j" ]
+     ])"});
+
+    // Use a Hive-style partitioning scheme.
+    partitioning = std::make_shared<HivePartitioning>(schema({field("part", 
utf8())}));
+
+    // Prepare encryption properties.
+    std::unordered_map<std::string, std::string> key_map;
+    key_map.emplace(kColumnMasterKeysId, kColumnMasterKey);
+    key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);
+
+    crypto_factory = std::make_shared<parquet::encryption::CryptoFactory>();
+    auto kms_client_factory =
+        
std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
+            /*wrap_locally=*/true, key_map);
+    crypto_factory->RegisterKmsClientFactory(std::move(kms_client_factory));
+    kms_connection_config = 
std::make_shared<parquet::encryption::KmsConnectionConfig>();
+
+    // Set write options with encryption configuration.
+    auto encryption_config =
+        std::make_shared<parquet::encryption::EncryptionConfiguration>(
+            std::string(kFooterKeyName));
+    encryption_config->column_keys = kColumnKeyMapping;
+    auto parquet_encryption_config = 
std::make_shared<ParquetEncryptionConfig>();
+    parquet_encryption_config->Setup(crypto_factory, kms_connection_config,
+                                     std::move(encryption_config));
+
+    auto file_format = std::make_shared<ParquetFileFormat>();
+    auto parquet_file_write_options =
+        
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
+    parquet_file_write_options->parquet_encryption_config =
+        std::move(parquet_encryption_config);
+
+    // Write dataset.
+    auto dataset = std::make_shared<InMemoryDataset>(table);
+    EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+    EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+    FileSystemDatasetWriteOptions write_options;
+    write_options.file_write_options = parquet_file_write_options;
+    write_options.filesystem = file_system;
+    write_options.base_dir = kBaseDir;
+    write_options.partitioning = partitioning;
+    write_options.basename_template = "part{i}.parquet";
+    ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
+
+    // Verify that the files exist
+    std::vector<std::string> files = {"part=a/part0.parquet", 
"part=b/part0.parquet",
+                                      "part=c/part0.parquet", 
"part=d/part0.parquet",
+                                      "part=e/part0.parquet", 
"part=f/part0.parquet",
+                                      "part=g/part0.parquet", 
"part=h/part0.parquet",
+                                      "part=i/part0.parquet", 
"part=j/part0.parquet"};
+    for (const auto& file_path : files) {
+      ASSERT_OK_AND_ASSIGN(auto result, file_system->GetFileInfo(file_path));
+      ASSERT_EQ(result.type(), fs::FileType::File);
+    }
+  }
+
+ protected:
+  inline static std::shared_ptr<fs::FileSystem> file_system;
+  inline static std::shared_ptr<Table> table;
+  inline static std::shared_ptr<HivePartitioning> partitioning;
+  inline static std::shared_ptr<parquet::encryption::CryptoFactory> 
crypto_factory;
+  inline static std::shared_ptr<parquet::encryption::KmsConnectionConfig>
+      kms_connection_config;
+};
+
+// This test demonstrates the process of writing a partitioned Parquet file 
with the same
+// encryption properties applied to each file within the dataset. The 
encryption
+// properties are determined based on the selected columns. After writing the 
dataset, the
+// test reads the data back and verifies that it can be successfully decrypted 
and
+// scanned.
+TEST_F(DatasetEncryptionTest, WriteReadDatasetWithEncryption) {
+  // Create decryption properties.
+  auto decryption_config =
+      std::make_shared<parquet::encryption::DecryptionConfiguration>();
+  auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
+  parquet_decryption_config->Setup(crypto_factory, kms_connection_config,
+                                   std::move(decryption_config));
+
+  // Set scan options.
+  auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
+  parquet_scan_options->parquet_decryption_config = 
std::move(parquet_decryption_config);
+
+  auto file_format = std::make_shared<ParquetFileFormat>();
+  file_format->default_fragment_scan_options = std::move(parquet_scan_options);
+
+  // Get FileInfo objects for all files under the base directory
+  fs::FileSelector selector;
+  selector.base_dir = kBaseDir;
+  selector.recursive = true;
+
+  FileSystemFactoryOptions factory_options;
+  factory_options.partitioning = partitioning;
+  factory_options.partition_base_dir = kBaseDir;
+  ASSERT_OK_AND_ASSIGN(auto dataset_factory,
+                       FileSystemDatasetFactory::Make(file_system, selector, 
file_format,
+                                                      factory_options));
+
+  // Read dataset into table
+  ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
+  ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+  ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+  ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());
+
+  // Verify the data was read correctly
+  ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

Review Comment:
   Let's also validate the table.
   ```suggestion
     ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
     ASSERT_OK(combined_table->ValidateFull());
   ```



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -56,9 +62,113 @@ from pyarrow._parquet cimport (
 
 cdef Expression _true = Expression._scalar(True)
 
-
 ctypedef CParquetFileWriter* _CParquetFileWriterPtr
 
+IF PARQUET_ENCRYPTION_ENABLED:
+
+    from pyarrow._parquet_encryption cimport *
+
+    cdef class ParquetEncryptionConfig(_Weakrefable):
+        """
+        Configuration for Parquet Encryption.
+
+        Parameters
+        ----------
+        crypto_factory : object
+            Factory for creating cryptographic instances.
+        kms_connection_config : object
+            Configuration for connecting to Key Management Service.
+        encryption_config : object
+            Configuration for encryption settings.

Review Comment:
   "Configuration for encryption settings" is redundant and also a bit vague.
   
   



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -631,7 +752,13 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
             coerce_timestamps=None,
             allow_truncated_timestamps=False,
             use_compliant_nested_type=True,
+            encryption_config=None,
         )
+        IF PARQUET_ENCRYPTION_ENABLED:
+            if self._properties["encryption_config"] is not None:
+                print("Encryption is not enabled in this build of pyarrow. "
+                      "Please reinstall pyarrow with encryption enabled.")
+

Review Comment:
   Also, let's make sure this is tested to avoid any regressions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to