nmtruong93 opened a new issue, #8008:
URL: https://github.com/apache/iceberg/issues/8008
### Query engine
Spark 3.3
Glue 4.0
### Question
The code below:
```
def create_iceberg_table(self, source_table, target_path,
partition_cols=None, ordered_by_cols=None,
identifier_cols=None):
"""
Create iceberg table in Athena
:param source_table: str, comes from
coalesce_df.createOrReplaceTempView(source_table)
:param target_path: str, format: catalog_name.db.table_name.
Example:
AwsDataCatalog.prod_pfs_lakehouse_iceberg.staging_aiml_transform_products
:param partition_cols: str | list, the partition columns
:param ordered_by_cols: str | list, the global order by columns.
Default is asc and nulls first.
Reference:
https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--write-ordered-by
For example:
- 'order_id'
- 'order_id ASC NULLS LAST'
- ['order_id', 'product_id']
- ['order_id ASC NULLS LAST', 'product_id DESC
NULLS LAST']
:param identifier_cols: str | list, the identifier columns. Default
is None.
Reference:
https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--set-identifier-fields
For example:
- 'order_id'
- ['order_id', 'product_id']
:return:
"""
partition_str = ""
if partition_cols:
partition_str = f"PARTITIONED BY ({partition_cols if
isinstance(partition_cols, str) else ', '.join(partition_cols)})"
created_sql = f"""
CREATE TABLE {target_path}
USING iceberg
{partition_str}
TBLPROPERTIES (
'format-version' = '2',
'read.split.target-size' = '134217728', -- 128MB
'read.split.planning-lookback' = '10',
'read.split.open-file-cost' = '4194304', -- 4MB
'read.parquet.vectorization.enabled' = 'true',
'read.parquet.vectorization.batch-size' = '5000',
'read.orc.vectorization.enabled' = 'true',
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
'write.parquet.row-group-size-bytes' =
'134217728', -- 128MB
'write.parquet.page-size-bytes' = '1048576', --
1MB
'write.target-file-size-bytes' = '536870912',
-- 512MB
'write.distribution-mode' = 'range',
'write.delete.distribution-mode' = 'range',
'write.update.distribution-mode' = 'range',
'write.merge.distribution-mode' = 'range',
'write.metadata.delete-after-commit.enabled' =
'true',
'write.metadata.previous-versions-max' = '3',
'write.spark.fanout.enabled' = 'false',
'write.object-storage.enabled' = 'true',
'write.delete.mode' = 'copy-on-write',
'write.update.mode' = 'merge-on-read',
'write.merge-mode' = 'merge-on-read',
'commit.retry.total-timeout-ms' = '1800000', --
30 minutes
'commit.manifest.target-size-bytes' = '8388608',
-- 8MB
'commit.manifest.min-count-to-merge' = '100',
'commit.manifest-merge.enabled' = 'true',
'history.expire.max-snapshot-age-ms' =
'259200000' -- 3 days
)
AS SELECT * FROM {source_table} LIMIT 0
"""
self.spark.sql(created_sql)
if ordered_by_cols:
ordered_by_str = f"{ordered_by_cols if
isinstance(ordered_by_cols, str) else ', '.join(ordered_by_cols)}"
self.spark.sql(f"ALTER TABLE {target_path} WRITE ORDERED BY
{ordered_by_str}") # default null first
if identifier_cols:
identifier_str = f"{identifier_cols if
isinstance(identifier_cols, str) else ', '.join(identifier_cols)}"
self.spark.sql(f"ALTER TABLE {target_path} SET IDENTIFIER FIELDS
{identifier_str}")
self.spark.sql(f"INSERT INTO {target_path} SELECT * FROM
{source_table}")
return None
```
Using the code:
```
self.create_iceberg_table(source_table, self.target_path,
partition_cols='tenant_id',
ordered_by_cols='product_id ASC NULLS
FIRST')
```
I expected data will be physically sorted by `product_id` within partition
`tenant_id` and specific partition `tenant_id` will appear in only 1 folder,
but the partition `tenant_id` appear in many different folders
What I expected in the physical S3 folder:
- data/hash_id_1/tenant_id=example1/...parquet.
- data/hash_id_2/tenant_id=example2/...parquet.
- data/hash_id_3/tenant_id=example3/...parquet.
What I see:
- data/hash_id_1/tenant_id=example1/...parquet.
- data/hash_id_2/tenant_id=example1/...parquet.
- data/hash_id_3/tenant_id=example1/...parquet.
- data/hash_id_4/tenant_id=example2/...parquet.
Additional information: I choose `'write.distribution-mode' = 'range',`
because I need to upsert data using MERGE, and I think with sorted within
partitions will speed up the MERGE operation.
How can I get my expected results?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]