adamreeve commented on code in PR #44990:
URL: https://github.com/apache/arrow/pull/44990#discussion_r1882859516
##########
cpp/src/arrow/dataset/file_parquet_encryption_test.cc:
##########
@@ -51,8 +53,14 @@ using arrow::internal::checked_pointer_cast;
namespace arrow {
namespace dataset {
+// Tests come in these variations
+enum CompressionParam {
Review Comment:
I think you mean Encryption :smile:
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -307,8 +307,15 @@ class SerializedPageReader : public PageReader {
// Please refer to the encryption specification for more details:
//
https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data
- // The ordinal fields in the context below are used for AAD suffix
calculation.
+ // The CryptoContext used by this PageReader.
CryptoContext crypto_ctx_;
+ // This PageReader has its own copy of crypto_ctx_->meta_decryptor and
+ // crypto_ctx_->data_decryptor in order to be thread-safe. Do not mutate
(update) the
+ // instances of crypto_ctx_.
Review Comment:
There are also uses of `UpdateDecryptor` in `RowGroupPageIndexReaderImpl`.
I'm not sure whether that's used concurrently anywhere within Arrow code, but
it's part of the public API so could be used concurrently by users and it would
make sense to make a similar change there.
##########
cpp/src/arrow/dataset/file_parquet_encryption_test.cc:
##########
@@ -151,21 +167,53 @@ class DatasetEncryptionTestBase : public ::testing::Test {
// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
- // Reuse the dataset above to scan it twice to make sure decryption works
correctly.
- for (size_t i = 0; i < 2; ++i) {
- // Read dataset into table
- ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
- ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
- ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());
-
- // Verify the data was read correctly
- ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
- // Validate the table
- ASSERT_OK(combined_table->ValidateFull());
- AssertTablesEqual(*combined_table, *table_);
+ if (concurrently) {
+ // start with a single thread so we are more likely to build up a queue
of jobs
+ ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(1));
+ std::vector<Future<std::shared_ptr<Table>>> threads;
+
+ // Read dataset above multiple times concurrently to see that is
thread-safe.
+ for (size_t i = 0; i < 100; ++i) {
+ threads.push_back(
+ DeferNotOk(pool->Submit(DatasetEncryptionTestBase::read,
dataset)));
+ }
+
+ // ramp up parallelism
+ ASSERT_OK(pool->SetCapacity(16));
+ // ensure there are sufficient jobs to see concurrent processing
+ ASSERT_GT(pool->GetNumTasks(), 16);
Review Comment:
Is there a potential race condition here if some tasks have already
finished? I guess it's unlikely to be an issue as the tasks should take a while
to run, but I don't think the check is necessary.
##########
cpp/src/arrow/dataset/file_parquet_encryption_test.cc:
##########
@@ -151,21 +167,53 @@ class DatasetEncryptionTestBase : public ::testing::Test {
// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
- // Reuse the dataset above to scan it twice to make sure decryption works
correctly.
- for (size_t i = 0; i < 2; ++i) {
- // Read dataset into table
- ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
- ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
- ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());
-
- // Verify the data was read correctly
- ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
- // Validate the table
- ASSERT_OK(combined_table->ValidateFull());
- AssertTablesEqual(*combined_table, *table_);
+ if (concurrently) {
+ // start with a single thread so we are more likely to build up a queue
of jobs
+ ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(1));
+ std::vector<Future<std::shared_ptr<Table>>> threads;
+
+ // Read dataset above multiple times concurrently to see that is
thread-safe.
+ for (size_t i = 0; i < 100; ++i) {
+ threads.push_back(
+ DeferNotOk(pool->Submit(DatasetEncryptionTestBase::read,
dataset)));
+ }
+
+ // ramp up parallelism
+ ASSERT_OK(pool->SetCapacity(16));
+ // ensure there are sufficient jobs to see concurrent processing
+ ASSERT_GT(pool->GetNumTasks(), 16);
+ printf("%d", pool->GetNumTasks());
Review Comment:
This printf should be removed
##########
cpp/src/arrow/dataset/file_parquet_encryption_test.cc:
##########
@@ -90,7 +98,15 @@ class DatasetEncryptionTestBase : public ::testing::Test {
auto encryption_config =
std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
- encryption_config->column_keys = kColumnKeyMapping;
+
+ if (GetParam() == COLUMN_KEY) {
+ encryption_config->column_keys = kColumnKeyMapping;
+ } else if (GetParam() == UNIFORM) {
+ encryption_config->uniform_encryption = true;
+ } else {
+ FAIL() << "Unsupported compression type " << GetParam();
Review Comment:
```suggestion
FAIL() << "Unsupported encryption type " << GetParam();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]