ajantha-bhat commented on a change in pull request #3375:
URL: https://github.com/apache/iceberg/pull/3375#discussion_r742476789



##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which 
are not known to the t
 CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 
'tablelocation/data')
 ```
 
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more 
metadata stored in manifest files, and small data files causes an unnecessary 
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the 
`rewriteDataFiles` action. This will combine small files into larger files to 
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table`       | ✔️  | string | Name of the table to update |
+| `strategy`    |    | string | Name of the strategy - binpack or sort. 
Defaults to binpack strategy |
+| `sort_order`  |    | string | Comma separated sort_order_column. Where 
sort_order_column is a space separated sort order info per column (ColumnName 
SortDirection NullOrder). <br/> All three members are mandatory to provide for 
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be FIRST or 
LAST |
+| `options`     | ️   | map<string, string> | Options to be used for actions|
+| `where`       | ️   | string | predicate as a string used for filtering the 
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg 
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/>  [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg 
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg 
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by 
this command |
+| `added_data_files_count`     | int | Number of new data files which were 
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table 
partitioning.
+```sql
+CALL catalog_name.system.rewrite_data_files('db.sample')
+```
+
+Rewrite the data files in table `db.sample` and use sort strategy with id 
column as sort order column.
+```sql
+CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 
'sort', sort_order => 'id DESC LAST')

Review comment:
       I know. But my procedure argument is already space delimited. so having 
NULLS LAST will cause parsing problem. So, I called as LAST. Any suggestions ?

##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which 
are not known to the t
 CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 
'tablelocation/data')
 ```
 
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more 
metadata stored in manifest files, and small data files causes an unnecessary 
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the 
`rewriteDataFiles` action. This will combine small files into larger files to 
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table`       | ✔️  | string | Name of the table to update |
+| `strategy`    |    | string | Name of the strategy - binpack or sort. 
Defaults to binpack strategy |
+| `sort_order`  |    | string | Comma separated sort_order_column. Where 
sort_order_column is a space separated sort order info per column (ColumnName 
SortDirection NullOrder). <br/> All three members are mandatory to provide for 
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be FIRST or 
LAST |
+| `options`     | ️   | map<string, string> | Options to be used for actions|
+| `where`       | ️   | string | predicate as a string used for filtering the 
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg 
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/>  [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg 
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg 
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by 
this command |
+| `added_data_files_count`     | int | Number of new data files which were 
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table 
partitioning.
+```sql
+CALL catalog_name.system.rewrite_data_files('db.sample')
+```
+
+Rewrite the data files in table `db.sample` and use sort strategy with id 
column as sort order column.
+```sql
+CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 
'sort', sort_order => 'id DESC LAST')

Review comment:
       I also made sort direction and null order mandatory. so that code looks 
clean at the parsing side. Else as it is string parsing, optional arguments 
needs lot of checking and validation. 
   
   Multicolumn example, I can update it. 

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewriteDataFilesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testRewriteDataFilesInEmptyTable() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(0, 0)),
+        output);
+  }
+
+  @Test
+  public void testRewriteDataFilesOnPartitionTable() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
PARTITIONED BY (data)", tableName);
+
+    // create 5 files for each partition (data = 'a' and data = 'b')
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);

Review comment:
       Thanks for sharing. I will use it If I need bulk data later on. 
   
   For my scenarios single row insert was enough as my logic is mainly to 
create multiple files. Doesn't really matter about number of rows in each file 
for current test cases. 

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewriteDataFilesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testRewriteDataFilesInEmptyTable() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(0, 0)),
+        output);
+  }
+
+  @Test
+  public void testRewriteDataFilesOnPartitionTable() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
PARTITIONED BY (data)", tableName);
+
+    // create 5 files for each partition (data = 'a' and data = 'b')
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    List<Object[]> df1 = sql("SELECT * FROM %s order by id", tableName);
+
+    List<Object[]> output = sql(
+            "CALL %s.system.rewrite_data_files(tAbLe => '%s')", catalogName, 
tableIdent);

Review comment:
       haha. Yes. It is test for case sensitivity.
   
   I can have a separate case of case sensitivity. 

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewriteDataFilesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testRewriteDataFilesInEmptyTable() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    List<Object[]> output = sql(
+        "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(0, 0)),
+        output);
+  }
+
+  @Test
+  public void testRewriteDataFilesOnPartitionTable() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
PARTITIONED BY (data)", tableName);
+
+    // create 5 files for each partition (data = 'a' and data = 'b')
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    List<Object[]> df1 = sql("SELECT * FROM %s order by id", tableName);
+
+    List<Object[]> output = sql(
+            "CALL %s.system.rewrite_data_files(tAbLe => '%s')", catalogName, 
tableIdent);

Review comment:
       haha. Yes. It is test for case sensitivity.
   
   I can have a separate case for case sensitivity. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to