AudriusButkevicius commented on PR #41821:
URL: https://github.com/apache/arrow/pull/41821#issuecomment-2143911405
Seems that something is missing.
You can read the metadata file, but seems no files are actually read, so the
table ends up empty (but with the right schema).
```
(arrow) root@base ~/code/arrow # python testx.py
/tmp/tmp8qr3xv_t
Writing
pyarrow.Table
col1: int64
col2: int64
year: int64
----
col1: [[1,2,3]]
col2: [[1,2,3]]
year: [[2020,2020,2021]]
write done
Reading
pyarrow.Table
col1: int64
col2: int64
year: int16
----
col1: []
col2: []
year: []
```
Code that reproduces the issue
<details>
<summary>Test code</summary>
```python
import os
import tempfile
import pyarrow.parquet.encryption as pe
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow as pa
import base64
import polars as pl
class KmsClient(pe.KmsClient):
def unwrap_key(self, wrapped_key, master_key_identifier):
return base64.b64decode(wrapped_key)
def wrap_key(self, key_bytes, master_key_identifier):
return base64.b64encode(key_bytes)
def write(location):
cf = pe.CryptoFactory(lambda *a, **k: KmsClient())
df = pl.DataFrame({
"col1": [1, 2, 3],
"col2": [1, 2, 3],
"year": [2020, 2020, 2021]
})
ecfg = pe.EncryptionConfiguration(
footer_key="TEST",
column_keys={
"TEST": ["col2"]
},
double_wrapping=False,
plaintext_footer=False,
)
table = df.to_arrow()
print("Writing")
print(table)
parquet_encryption_cfg = ds.ParquetEncryptionConfig(
cf, pe.KmsConnectionConfig(), ecfg
)
metadata_collector = []
pq.write_to_dataset(
table,
location,
partitioning=ds.partitioning(
schema=pa.schema([
pa.field("year", pa.int16())
]),
flavor="hive"
),
encryption_config=parquet_encryption_cfg,
metadata_collector=metadata_collector
)
encryption_properties =
cf.file_encryption_properties(pe.KmsConnectionConfig(), ecfg)
pq.write_metadata(
pa.schema(
field
for field in table.schema
if field.name != "year"
),
os.path.join(location, "_metadata"),
metadata_collector,
encryption_properties=encryption_properties,
)
print("write done")
def read(location):
decryption_config = pe.DecryptionConfiguration(cache_lifetime=300)
kms_connection_config = pe.KmsConnectionConfig()
cf = pe.CryptoFactory(lambda *a, **k: KmsClient())
parquet_decryption_cfg = ds.ParquetDecryptionConfig(
cf, kms_connection_config, decryption_config
)
decryption_properties = cf.file_decryption_properties(
kms_connection_config, decryption_config)
pq_scan_opts = ds.ParquetFragmentScanOptions(
decryption_config=parquet_decryption_cfg,
# If using build from master
decryption_properties=decryption_properties
)
pformat =
pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts)
dataset = ds.parquet_dataset(
os.path.join(location, "_metadata"),
format=pformat,
partitioning=ds.partitioning(
schema=pa.schema([
pa.field("year", pa.int16())
]),
flavor="hive"
)
)
print("Reading")
print(dataset.to_table())
if __name__ == '__main__':
location = tempfile.mkdtemp(suffix=None, prefix=None, dir=None)
print(location)
os.makedirs(location, exist_ok=True)
write(location)
print("\n")
read(location)
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]