jackye1995 commented on a change in pull request #3375: URL: https://github.com/apache/iceberg/pull/3375#discussion_r742473649
########## File path: spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + + +public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase { + + public TestRewriteDataFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testRewriteDataFilesInEmptyTable() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + List<Object[]> output = sql( + "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent); + assertEquals("Procedure output must match", + ImmutableList.of(row(0, 0)), + output); + } + + @Test + public void testRewriteDataFilesOnPartitionTable() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", tableName); + + // create 5 files for each partition (data = 'a' and data = 'b') + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); Review comment: I would suggest ingesting data using https://github.com/apache/iceberg/blob/master/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java#L923-L992 instead of insert statements so that we can test with some descent size data files. ########## File path: site/docs/spark-procedures.md ########## @@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which are not known to the t CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data') ``` +### `rewrite_data_files` + +Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs. + +Iceberg can compact data files in parallel using Spark with the `rewriteDataFiles` action. This will combine small files into larger files to reduce metadata overhead and runtime file open cost. + +#### Usage + +| Argument Name | Required? | Type | Description | +|---------------|-----------|------|-------------| +| `table` | ✔️ | string | Name of the table to update | +| `strategy` | | string | Name of the strategy - binpack or sort. Defaults to binpack strategy | +| `sort_order` | | string | Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). <br/> All three members are mandatory to provide for sort_order_column. SortDirection can be ASC or DESC. NullOrder can be FIRST or LAST | +| `options` | ️ | map<string, string> | Options to be used for actions| +| `where` | ️ | string | predicate as a string used for filtering the files.| + + +See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg }}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary), +<br/> [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg }}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary) +and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg }}/org/apache/iceberg/actions/SortStrategy.html#field.summary) +for list of all the supported options for this action. + +#### Output + +| Output Name | Type | Description | +| ------------|------|-------------| +| `rewritten_data_files_count` | int | Number of data which were re-written by this command | +| `added_data_files_count` | int | Number of new data files which were written by this command | + +#### Examples + +Rewrite the data files in table `db.sample` and align data files with table partitioning. +```sql +CALL catalog_name.system.rewrite_data_files('db.sample') +``` + +Rewrite the data files in table `db.sample` and use sort strategy with id column as sort order column. +```sql +CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC LAST') Review comment: can we make this example with multi-column sort order? like `category ASC NULLS LAST, id DESC`? btw, I think it's `NULLS LAST` instead of `LAST`? ########## File path: spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + + +public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase { + + public TestRewriteDataFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testRewriteDataFilesInEmptyTable() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + List<Object[]> output = sql( + "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent); + assertEquals("Procedure output must match", + ImmutableList.of(row(0, 0)), + output); + } + + @Test + public void testRewriteDataFilesOnPartitionTable() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", tableName); + + // create 5 files for each partition (data = 'a' and data = 'b') + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + List<Object[]> df1 = sql("SELECT * FROM %s order by id", tableName); + + List<Object[]> output = sql( + "CALL %s.system.rewrite_data_files(tAbLe => '%s')", catalogName, tableIdent); Review comment: is this `tAbLe` trying to test case insensitive? If so, I think we should have a specific test case for it, and use normal lower case for other tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
