nmtruong93 opened a new issue, #8008:
URL: https://github.com/apache/iceberg/issues/8008

   ### Query engine
   
   Spark 3.3
   Glue 4.0
   
   
   ### Question
   
   The code below:
   ```
       def create_iceberg_table(self, source_table, target_path, 
partition_cols=None, ordered_by_cols=None,
                                identifier_cols=None):
           """
           Create iceberg table in Athena
           :param source_table: str, comes from 
coalesce_df.createOrReplaceTempView(source_table)
           :param target_path: str, format: catalog_name.db.table_name.
               Example: 
AwsDataCatalog.prod_pfs_lakehouse_iceberg.staging_aiml_transform_products
           :param partition_cols: str | list, the partition columns
           :param ordered_by_cols: str | list, the global order by columns. 
Default is asc and nulls first.
                   Reference: 
https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--write-ordered-by
                   For example:
                               - 'order_id'
                               - 'order_id ASC NULLS LAST'
                               - ['order_id', 'product_id']
                               - ['order_id ASC NULLS LAST', 'product_id DESC 
NULLS LAST']
           :param identifier_cols: str | list, the identifier columns. Default 
is None.
                   Reference: 
https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--set-identifier-fields
                   For example:
                               - 'order_id'
                               - ['order_id', 'product_id']
           :return:
           """
           partition_str = ""
           if partition_cols:
               partition_str = f"PARTITIONED BY ({partition_cols if 
isinstance(partition_cols, str) else ', '.join(partition_cols)})"
   
           created_sql = f"""
                           CREATE TABLE {target_path}
                           USING iceberg
                           {partition_str}
                           TBLPROPERTIES (
                               'format-version' = '2',
                               'read.split.target-size' = '134217728',  -- 128MB
                               'read.split.planning-lookback' = '10',
                               'read.split.open-file-cost' = '4194304',  -- 4MB
                               'read.parquet.vectorization.enabled' = 'true',
                               'read.parquet.vectorization.batch-size' = '5000',
                               'read.orc.vectorization.enabled' = 'true',
                               'write.format.default' = 'parquet',
                               'write.parquet.compression-codec' = 'snappy',
                               'write.parquet.row-group-size-bytes' = 
'134217728',  -- 128MB
                               'write.parquet.page-size-bytes' = '1048576',  -- 
1MB
                               'write.target-file-size-bytes' = '536870912',  
-- 512MB
                               'write.distribution-mode' = 'range',
                               'write.delete.distribution-mode' = 'range',
                               'write.update.distribution-mode' =  'range',
                               'write.merge.distribution-mode' = 'range',
                               'write.metadata.delete-after-commit.enabled' = 
'true',
                               'write.metadata.previous-versions-max' = '3',
                               'write.spark.fanout.enabled' = 'false',
                               'write.object-storage.enabled' = 'true',
                               'write.delete.mode' = 'copy-on-write',
                               'write.update.mode' = 'merge-on-read',
                               'write.merge-mode' = 'merge-on-read',
                               'commit.retry.total-timeout-ms' = '1800000',  -- 
30 minutes
                               'commit.manifest.target-size-bytes' = '8388608', 
 -- 8MB
                               'commit.manifest.min-count-to-merge' = '100',
                               'commit.manifest-merge.enabled' = 'true',
                               'history.expire.max-snapshot-age-ms' = 
'259200000'  -- 3 days
                           )
                       AS SELECT * FROM {source_table} LIMIT 0
                       """
           self.spark.sql(created_sql)
   
           if ordered_by_cols:
               ordered_by_str = f"{ordered_by_cols if 
isinstance(ordered_by_cols, str) else ', '.join(ordered_by_cols)}"
               self.spark.sql(f"ALTER TABLE {target_path} WRITE ORDERED BY 
{ordered_by_str}")    # default null first
   
           if identifier_cols:
               identifier_str = f"{identifier_cols if 
isinstance(identifier_cols, str) else ', '.join(identifier_cols)}"
               self.spark.sql(f"ALTER TABLE {target_path} SET IDENTIFIER FIELDS 
{identifier_str}")
   
           self.spark.sql(f"INSERT INTO {target_path} SELECT * FROM 
{source_table}")
   
           return None
   ```
   Using the code:
   ```
           self.create_iceberg_table(source_table, self.target_path, 
partition_cols='tenant_id',
                                     ordered_by_cols='product_id ASC NULLS 
FIRST')
   ```
   I expected data will be physically sorted by `product_id` within partition 
`tenant_id` and specific partition `tenant_id` will appear in only 1 folder, 
but the partition `tenant_id` appear in many different folders
   
   What I expected in the physical S3 folder:
   - data/hash_id_1/tenant_id=example1/...parquet.
   - data/hash_id_2/tenant_id=example2/...parquet.
   - data/hash_id_3/tenant_id=example3/...parquet.
   
   What I see:
   
   - data/hash_id_1/tenant_id=example1/...parquet.
   - data/hash_id_2/tenant_id=example1/...parquet.
   - data/hash_id_3/tenant_id=example1/...parquet.
   - data/hash_id_4/tenant_id=example2/...parquet.
   
   Additional information: I choose `'write.distribution-mode' = 'range',` 
because I need to upsert data using MERGE, and I think with sorted within 
partitions will speed up the MERGE operation.
   
   How can I get my expected results?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to