RussellSpitzer commented on a change in pull request #3375:
URL: https://github.com/apache/iceberg/pull/3375#discussion_r743771414
##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which
are not known to the t
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location =>
'tablelocation/data')
```
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more
metadata stored in manifest files, and small data files causes an unnecessary
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the
`rewriteDataFiles` action. This will combine small files into larger files to
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to update |
+| `strategy` | | string | Name of the strategy - binpack or sort.
Defaults to binpack strategy |
+| `sort_order` | | string | Comma separated sort_order_column. Where
sort_order_column is a space separated sort order info per column (ColumnName
SortDirection NullOrder). <br/> All three members are mandatory to provide for
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be
NULLS_FIRST or NULLS_LAST |
+| `options` | ️ | map<string, string> | Options to be used for actions|
+| `where` | ️ | string | predicate as a string used for filtering the
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/> [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by
this command |
+| `added_data_files_count` | int | Number of new data files which were
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table
partitioning.
Review comment:
I do not believe we added in the code for aligning partitioning yet,
Here I would probably describe it as
"Rewrite datafiles in the table 'x' using the default rewrite algorithm of
bin-packing to combine small files and split large files to write new files
using the default write size of the table"
Or something like that
##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which
are not known to the t
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location =>
'tablelocation/data')
```
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more
metadata stored in manifest files, and small data files causes an unnecessary
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the
`rewriteDataFiles` action. This will combine small files into larger files to
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to update |
+| `strategy` | | string | Name of the strategy - binpack or sort.
Defaults to binpack strategy |
+| `sort_order` | | string | Comma separated sort_order_column. Where
sort_order_column is a space separated sort order info per column (ColumnName
SortDirection NullOrder). <br/> All three members are mandatory to provide for
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be
NULLS_FIRST or NULLS_LAST |
+| `options` | ️ | map<string, string> | Options to be used for actions|
+| `where` | ️ | string | predicate as a string used for filtering the
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/> [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by
this command |
+| `added_data_files_count` | int | Number of new data files which were
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table
partitioning.
+```sql
+CALL catalog_name.system.rewrite_data_files('db.sample')
+```
+
+Rewrite the data files in table `db.sample` and use sort strategy with id,
name column as sort order column.
Review comment:
Rewrites the data files by sorting all the data on id and name using the
same defaults as bin-pack to determine which files to rewrite.
##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which
are not known to the t
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location =>
'tablelocation/data')
```
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more
metadata stored in manifest files, and small data files causes an unnecessary
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the
`rewriteDataFiles` action. This will combine small files into larger files to
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to update |
+| `strategy` | | string | Name of the strategy - binpack or sort.
Defaults to binpack strategy |
+| `sort_order` | | string | Comma separated sort_order_column. Where
sort_order_column is a space separated sort order info per column (ColumnName
SortDirection NullOrder). <br/> All three members are mandatory to provide for
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be
NULLS_FIRST or NULLS_LAST |
+| `options` | ️ | map<string, string> | Options to be used for actions|
+| `where` | ️ | string | predicate as a string used for filtering the
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/> [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by
this command |
+| `added_data_files_count` | int | Number of new data files which were
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table
partitioning.
+```sql
+CALL catalog_name.system.rewrite_data_files('db.sample')
+```
+
+Rewrite the data files in table `db.sample` and use sort strategy with id,
name column as sort order column.
+```sql
+CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy =>
'sort', sort_order => 'id DESC NULLS_LAST,name ASC NULLS_FIRST')
+```
+
+Rewrite the data files in table `db.sample` and use default binpack strategy
with option of `min-input-files` as 2.
Review comment:
Rewrites the data file in the table using bin-pack strategy in any
partition where more than 2 or more files need to be rewritten.
##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which
are not known to the t
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location =>
'tablelocation/data')
```
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more
metadata stored in manifest files, and small data files causes an unnecessary
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the
`rewriteDataFiles` action. This will combine small files into larger files to
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to update |
+| `strategy` | | string | Name of the strategy - binpack or sort.
Defaults to binpack strategy |
+| `sort_order` | | string | Comma separated sort_order_column. Where
sort_order_column is a space separated sort order info per column (ColumnName
SortDirection NullOrder). <br/> All three members are mandatory to provide for
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be
NULLS_FIRST or NULLS_LAST |
+| `options` | ️ | map<string, string> | Options to be used for actions|
+| `where` | ️ | string | predicate as a string used for filtering the
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/> [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by
this command |
+| `added_data_files_count` | int | Number of new data files which were
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table
partitioning.
+```sql
+CALL catalog_name.system.rewrite_data_files('db.sample')
+```
+
+Rewrite the data files in table `db.sample` and use sort strategy with id,
name column as sort order column.
+```sql
+CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy =>
'sort', sort_order => 'id DESC NULLS_LAST,name ASC NULLS_FIRST')
+```
+
+Rewrite the data files in table `db.sample` and use default binpack strategy
with option of `min-input-files` as 2.
+```sql
+CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options =>
map('min-input-files','2'))
+```
+
+Rewrite the data files in table `db.sample` and select the files that are only
having id = 3 and name = 'foo' for rewriting.
Review comment:
Rewrite files only in partitions where id =3 and name = foo *
Since we can't apply a generic predicate here
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+ public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testRewriteDataFilesInEmptyTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(0, 0)),
+ output);
+ }
+
+ @Test
+ public void testRewriteDataFilesOnPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)", tableName);
+
+ // create 5 files for each partition (data = 'a' and data = 'b')
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
Review comment:
I would agree that we should just do a similar "createTable" sort of
thing, in some internal tests I wrote I just copied the functions I added to
rewrite
```
private void createTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) using iceberg",
tableName);
List<ThreeColumnRecord> records1 = Lists.newArrayList();
List<Integer> data = IntStream.range(0,
2000).boxed().collect(Collectors.toList());
Collections.shuffle(data, new Random(42));
data.forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar"
+ i)));
Dataset<Row> df = spark.createDataFrame(records1,
ThreeColumnRecord.class).repartition(20);
try {
df.writeTo(tableName).append();
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
}
private void createPartitionedTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) using iceberg
PARTITIONED BY (c1)", tableName);
List<ThreeColumnRecord> records1 = Lists.newArrayList();
List<Integer> data = IntStream.range(0,
2000).boxed().collect(Collectors.toList());
Collections.shuffle(data, new Random(42));
data.forEach(i -> records1.add(new ThreeColumnRecord(i % 5, "foo" + i,
"bar" + i)));
Dataset<Row> df = spark.createDataFrame(records1,
ThreeColumnRecord.class).repartition(10);
try {
df.writeTo(tableName).append();
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
}
private void createBucketedTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) using iceberg
PARTITIONED BY (bucket(4, c3))", tableName);
List<ThreeColumnRecord> records1 = Lists.newArrayList();
List<Integer> data = IntStream.range(0,
2000).boxed().collect(Collectors.toList());
Collections.shuffle(data, new Random(42));
data.forEach(i -> records1.add(new ThreeColumnRecord(i % 5, "foo" + i,
"bar" + i)));
Dataset<Row> df = spark.createDataFrame(records1,
ThreeColumnRecord.class).repartition(10);
try {
df.writeTo(tableName).append();
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
}```
Although I don't think bucketed would work here
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+ public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testRewriteDataFilesInEmptyTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(0, 0)),
+ output);
+ }
+
+ @Test
+ public void testRewriteDataFilesOnPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)", tableName);
+
+ // create 5 files for each partition (data = 'a' and data = 'b')
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s order by id", tableName);
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 10 data files and add 2 data files
(one per partition) ",
+ ImmutableList.of(row(10, 2)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
Review comment:
We could just compare the actual rows here like is done in the Rewrite
action
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+ public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testRewriteDataFilesInEmptyTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(0, 0)),
+ output);
+ }
+
+ @Test
+ public void testRewriteDataFilesOnPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)", tableName);
+
+ // create 5 files for each partition (data = 'a' and data = 'b')
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s order by id", tableName);
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 10 data files and add 2 data files
(one per partition) ",
+ ImmutableList.of(row(10, 2)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesOnNonPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 10 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 10 data files and add 1 data files",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithOptions() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 2 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // set the min-input-files = 2, instead of default 5 to allow compaction
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', options =>
map('min-input-files','2'))",
+ catalogName, tableIdent);
+
+ assertEquals("Action should rewrite 2 data files and add 1 data files",
+ ImmutableList.of(row(2, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithSortStrategy() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 2 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // set the min-input-files = 2, instead of default 5 to allow compaction
+ // set sort_order = id DESC LAST
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', options =>
map('min-input-files','2'), " +
+ "strategy => 'sort', sort_order => 'id DESC NULLS_LAST')",
+ catalogName, tableIdent);
+
+ assertEquals("Action should rewrite 2 data files and add 1 data files",
+ ImmutableList.of(row(2, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithFilter() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 10 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // select only 5 files for compaction (files having id = 1)
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s'," +
+ " where => 'id = 1 and data is not null')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 5 data files (containing id = 1) and
add 1 data files",
+ ImmutableList.of(row(5, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
Review comment:
I think we need tests for partitioned tables as well and tests which
filter on partition values
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+ public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testRewriteDataFilesInEmptyTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(0, 0)),
+ output);
+ }
+
+ @Test
+ public void testRewriteDataFilesOnPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)", tableName);
+
+ // create 5 files for each partition (data = 'a' and data = 'b')
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s order by id", tableName);
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 10 data files and add 2 data files
(one per partition) ",
+ ImmutableList.of(row(10, 2)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesOnNonPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 10 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 10 data files and add 1 data files",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithOptions() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 2 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // set the min-input-files = 2, instead of default 5 to allow compaction
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', options =>
map('min-input-files','2'))",
+ catalogName, tableIdent);
+
+ assertEquals("Action should rewrite 2 data files and add 1 data files",
+ ImmutableList.of(row(2, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithSortStrategy() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 2 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // set the min-input-files = 2, instead of default 5 to allow compaction
+ // set sort_order = id DESC LAST
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', options =>
map('min-input-files','2'), " +
+ "strategy => 'sort', sort_order => 'id DESC NULLS_LAST')",
+ catalogName, tableIdent);
+
+ assertEquals("Action should rewrite 2 data files and add 1 data files",
+ ImmutableList.of(row(2, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithFilter() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 10 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // select only 5 files for compaction (files having id = 1)
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s'," +
+ " where => 'id = 1 and data is not null')", catalogName,
tableIdent);
Review comment:
I think this will actually rewrite all data files which *may* contain 1.
Since the planning portion is done just on the column metrics and it wouldn't
know whether 1 actually is used unless it's a partition value
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExpressions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that rewrites datafiles in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ */
+class RewriteDataFilesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("strategy", DataTypes.StringType),
+ ProcedureParameter.optional("sort_order", DataTypes.StringType),
+ ProcedureParameter.optional("options", STRING_MAP),
+ ProcedureParameter.optional("where", DataTypes.StringType)
+ };
+
+ // counts are not nullable since the action result is never null
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("rewritten_data_files_count", DataTypes.IntegerType,
false, Metadata.empty()),
+ new StructField("added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<RewriteDataFilesProcedure>() {
+ @Override
+ protected RewriteDataFilesProcedure doBuild() {
+ return new RewriteDataFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private RewriteDataFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+
+ return modifyIcebergTable(tableIdent, table -> {
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
+
+ String strategy = args.isNullAt(1) ? null : args.getString(1);
+ String sortOrders = args.isNullAt(2) ? null : args.getString(2);
+ if (sortOrders != null && (strategy == null ||
strategy.equalsIgnoreCase("binpack"))) {
Review comment:
I don't know if you need to check this here since the underlying action
should throw the appropriate exception
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExpressions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that rewrites datafiles in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ */
+class RewriteDataFilesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("strategy", DataTypes.StringType),
+ ProcedureParameter.optional("sort_order", DataTypes.StringType),
+ ProcedureParameter.optional("options", STRING_MAP),
+ ProcedureParameter.optional("where", DataTypes.StringType)
+ };
+
+ // counts are not nullable since the action result is never null
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("rewritten_data_files_count", DataTypes.IntegerType,
false, Metadata.empty()),
+ new StructField("added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<RewriteDataFilesProcedure>() {
+ @Override
+ protected RewriteDataFilesProcedure doBuild() {
+ return new RewriteDataFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private RewriteDataFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+
+ return modifyIcebergTable(tableIdent, table -> {
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
+
+ String strategy = args.isNullAt(1) ? null : args.getString(1);
+ String sortOrders = args.isNullAt(2) ? null : args.getString(2);
+ if (sortOrders != null && (strategy == null ||
strategy.equalsIgnoreCase("binpack"))) {
+ throw new IllegalArgumentException(
+ "sort_order is not accepted for binpack strategy. Please
configure sort strategy to use sort_order");
+ }
+ SortOrder sortOrder = collectSortOrders(strategy, table, action,
sortOrders);
+ action = checkAndApplyStrategy(action, strategy, sortOrder);
+
+ if (!args.isNullAt(3)) {
+ action = checkAndApplyOptions(args, action);
+ }
+
+ String where = args.isNullAt(4) ? null : args.getString(4);
+ action = checkAndApplyFilter(action, where);
+
+ RewriteDataFiles.Result result = action.execute();
+
+ return toOutputRows(result);
+ });
+ }
+
+ private RewriteDataFiles checkAndApplyFilter(RewriteDataFiles action, String
where) {
+ if (where != null) {
+ ParserInterface sqlParser = spark().sessionState().sqlParser();
+ try {
+ Expression expression = sqlParser.parseExpression(where);
+ return action.filter(SparkExpressions.convert(expression));
Review comment:
I think we should only allow filters based on the partition spec here,
but I'm fine if we want it to be generic
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExpressions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that rewrites datafiles in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ */
+class RewriteDataFilesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("strategy", DataTypes.StringType),
+ ProcedureParameter.optional("sort_order", DataTypes.StringType),
+ ProcedureParameter.optional("options", STRING_MAP),
+ ProcedureParameter.optional("where", DataTypes.StringType)
+ };
+
+ // counts are not nullable since the action result is never null
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("rewritten_data_files_count", DataTypes.IntegerType,
false, Metadata.empty()),
+ new StructField("added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<RewriteDataFilesProcedure>() {
+ @Override
+ protected RewriteDataFilesProcedure doBuild() {
+ return new RewriteDataFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private RewriteDataFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+
+ return modifyIcebergTable(tableIdent, table -> {
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
+
+ String strategy = args.isNullAt(1) ? null : args.getString(1);
+ String sortOrders = args.isNullAt(2) ? null : args.getString(2);
+ if (sortOrders != null && (strategy == null ||
strategy.equalsIgnoreCase("binpack"))) {
+ throw new IllegalArgumentException(
+ "sort_order is not accepted for binpack strategy. Please
configure sort strategy to use sort_order");
+ }
+ SortOrder sortOrder = collectSortOrders(strategy, table, action,
sortOrders);
+ action = checkAndApplyStrategy(action, strategy, sortOrder);
+
+ if (!args.isNullAt(3)) {
+ action = checkAndApplyOptions(args, action);
+ }
+
+ String where = args.isNullAt(4) ? null : args.getString(4);
+ action = checkAndApplyFilter(action, where);
+
+ RewriteDataFiles.Result result = action.execute();
+
+ return toOutputRows(result);
+ });
+ }
+
+ private RewriteDataFiles checkAndApplyFilter(RewriteDataFiles action, String
where) {
+ if (where != null) {
+ ParserInterface sqlParser = spark().sessionState().sqlParser();
+ try {
+ Expression expression = sqlParser.parseExpression(where);
+ return action.filter(SparkExpressions.convert(expression));
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Cannot parse predicates in where
option: " + where);
+ }
+ }
+ return action;
+ }
+
+ private RewriteDataFiles checkAndApplyOptions(InternalRow args,
RewriteDataFiles action) {
+ Map<String, String> options = Maps.newHashMap();
+ args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ options.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
Review comment:
Not sure why this is here?
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExpressions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that rewrites datafiles in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ */
+class RewriteDataFilesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("strategy", DataTypes.StringType),
+ ProcedureParameter.optional("sort_order", DataTypes.StringType),
+ ProcedureParameter.optional("options", STRING_MAP),
+ ProcedureParameter.optional("where", DataTypes.StringType)
+ };
+
+ // counts are not nullable since the action result is never null
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("rewritten_data_files_count", DataTypes.IntegerType,
false, Metadata.empty()),
+ new StructField("added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<RewriteDataFilesProcedure>() {
+ @Override
+ protected RewriteDataFilesProcedure doBuild() {
+ return new RewriteDataFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private RewriteDataFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+
+ return modifyIcebergTable(tableIdent, table -> {
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
+
+ String strategy = args.isNullAt(1) ? null : args.getString(1);
+ String sortOrders = args.isNullAt(2) ? null : args.getString(2);
+ if (sortOrders != null && (strategy == null ||
strategy.equalsIgnoreCase("binpack"))) {
+ throw new IllegalArgumentException(
+ "sort_order is not accepted for binpack strategy. Please
configure sort strategy to use sort_order");
+ }
+ SortOrder sortOrder = collectSortOrders(strategy, table, action,
sortOrders);
+ action = checkAndApplyStrategy(action, strategy, sortOrder);
+
+ if (!args.isNullAt(3)) {
+ action = checkAndApplyOptions(args, action);
+ }
+
+ String where = args.isNullAt(4) ? null : args.getString(4);
+ action = checkAndApplyFilter(action, where);
+
+ RewriteDataFiles.Result result = action.execute();
+
+ return toOutputRows(result);
+ });
+ }
+
+ private RewriteDataFiles checkAndApplyFilter(RewriteDataFiles action, String
where) {
+ if (where != null) {
+ ParserInterface sqlParser = spark().sessionState().sqlParser();
+ try {
+ Expression expression = sqlParser.parseExpression(where);
+ return action.filter(SparkExpressions.convert(expression));
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Cannot parse predicates in where
option: " + where);
+ }
+ }
+ return action;
+ }
+
+ private RewriteDataFiles checkAndApplyOptions(InternalRow args,
RewriteDataFiles action) {
+ Map<String, String> options = Maps.newHashMap();
+ args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
Review comment:
Could this be putAll?
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExpressions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that rewrites datafiles in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ */
+class RewriteDataFilesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("strategy", DataTypes.StringType),
+ ProcedureParameter.optional("sort_order", DataTypes.StringType),
+ ProcedureParameter.optional("options", STRING_MAP),
+ ProcedureParameter.optional("where", DataTypes.StringType)
+ };
+
+ // counts are not nullable since the action result is never null
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("rewritten_data_files_count", DataTypes.IntegerType,
false, Metadata.empty()),
+ new StructField("added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<RewriteDataFilesProcedure>() {
+ @Override
+ protected RewriteDataFilesProcedure doBuild() {
+ return new RewriteDataFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private RewriteDataFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+
+ return modifyIcebergTable(tableIdent, table -> {
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
+
+ String strategy = args.isNullAt(1) ? null : args.getString(1);
+ String sortOrders = args.isNullAt(2) ? null : args.getString(2);
+ if (sortOrders != null && (strategy == null ||
strategy.equalsIgnoreCase("binpack"))) {
+ throw new IllegalArgumentException(
+ "sort_order is not accepted for binpack strategy. Please
configure sort strategy to use sort_order");
+ }
+ SortOrder sortOrder = collectSortOrders(strategy, table, action,
sortOrders);
+ action = checkAndApplyStrategy(action, strategy, sortOrder);
+
+ if (!args.isNullAt(3)) {
+ action = checkAndApplyOptions(args, action);
+ }
+
+ String where = args.isNullAt(4) ? null : args.getString(4);
+ action = checkAndApplyFilter(action, where);
+
+ RewriteDataFiles.Result result = action.execute();
+
+ return toOutputRows(result);
+ });
+ }
+
+ private RewriteDataFiles checkAndApplyFilter(RewriteDataFiles action, String
where) {
+ if (where != null) {
+ ParserInterface sqlParser = spark().sessionState().sqlParser();
+ try {
+ Expression expression = sqlParser.parseExpression(where);
+ return action.filter(SparkExpressions.convert(expression));
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Cannot parse predicates in where
option: " + where);
+ }
+ }
+ return action;
+ }
+
+ private RewriteDataFiles checkAndApplyOptions(InternalRow args,
RewriteDataFiles action) {
+ Map<String, String> options = Maps.newHashMap();
+ args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ options.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ return action.options(options);
+ }
+
+ private RewriteDataFiles checkAndApplyStrategy(RewriteDataFiles action,
String strategy, SortOrder sortOrder) {
+ if (strategy != null) {
+ if (strategy.equalsIgnoreCase("binpack")) {
+ return action.binPack();
+ } else if (strategy.equalsIgnoreCase("sort")) {
+ if (sortOrder == null) {
+ return action.sort();
+ }
+ return action.sort(sortOrder);
+ } else {
+ throw new IllegalArgumentException("unsupported strategy: " + strategy
+ ". Only binpack,sort is supported");
+ }
+ }
+ return action;
+ }
+
+ private SortOrder collectSortOrders(
+ String strategy, Table table, RewriteDataFiles action, String
sortOrderStr) {
+ if (strategy != null && strategy.equalsIgnoreCase("sort") && sortOrderStr
!= null) {
Review comment:
Instead of doing a custom parse code here is it possible that we can use
the Spark Parser's parse ordered identifier list? If not I would add this to
our own parser code and just use that rather than writing a manual parser here
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkExpressions.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
+import org.apache.spark.sql.catalyst.expressions.And;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.BinaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Cast;
+import org.apache.spark.sql.catalyst.expressions.EqualNullSafe;
+import org.apache.spark.sql.catalyst.expressions.EqualTo;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.GreaterThan;
+import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.In;
+import org.apache.spark.sql.catalyst.expressions.InSet;
+import org.apache.spark.sql.catalyst.expressions.IsNotNull;
+import org.apache.spark.sql.catalyst.expressions.IsNull;
+import org.apache.spark.sql.catalyst.expressions.LessThan;
+import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Not;
+import org.apache.spark.sql.catalyst.expressions.Or;
+import org.apache.spark.sql.catalyst.expressions.ParseToDate;
+import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Year;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.collection.JavaConverters;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.expressions.Expressions.predicate;
+
+
+/**
+ * To convert spark catalyst Expression to Iceberg expression
+ *
+ */
+public class SparkExpressions {
Review comment:
I think another option here may be to attempt to convert
spark.expression into spark.sql.connector.filter
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkExpressions.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
+import org.apache.spark.sql.catalyst.expressions.And;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.BinaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Cast;
+import org.apache.spark.sql.catalyst.expressions.EqualNullSafe;
+import org.apache.spark.sql.catalyst.expressions.EqualTo;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.GreaterThan;
+import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.In;
+import org.apache.spark.sql.catalyst.expressions.InSet;
+import org.apache.spark.sql.catalyst.expressions.IsNotNull;
+import org.apache.spark.sql.catalyst.expressions.IsNull;
+import org.apache.spark.sql.catalyst.expressions.LessThan;
+import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Not;
+import org.apache.spark.sql.catalyst.expressions.Or;
+import org.apache.spark.sql.catalyst.expressions.ParseToDate;
+import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Year;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.collection.JavaConverters;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.expressions.Expressions.predicate;
+
+
+/**
+ * To convert spark catalyst Expression to Iceberg expression
+ *
+ */
+public class SparkExpressions {
Review comment:
Then converting the filters -> iceberg expressions
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkExpressions.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
+import org.apache.spark.sql.catalyst.expressions.And;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.BinaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Cast;
+import org.apache.spark.sql.catalyst.expressions.EqualNullSafe;
+import org.apache.spark.sql.catalyst.expressions.EqualTo;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.GreaterThan;
+import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.In;
+import org.apache.spark.sql.catalyst.expressions.InSet;
+import org.apache.spark.sql.catalyst.expressions.IsNotNull;
+import org.apache.spark.sql.catalyst.expressions.IsNull;
+import org.apache.spark.sql.catalyst.expressions.LessThan;
+import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Not;
+import org.apache.spark.sql.catalyst.expressions.Or;
+import org.apache.spark.sql.catalyst.expressions.ParseToDate;
+import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Year;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.collection.JavaConverters;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.expressions.Expressions.predicate;
+
+
+/**
+ * To convert spark catalyst Expression to Iceberg expression
+ *
+ */
+public class SparkExpressions {
Review comment:
Although I believe that may be private ...
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkExpressions.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
+import org.apache.spark.sql.catalyst.expressions.And;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.BinaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Cast;
+import org.apache.spark.sql.catalyst.expressions.EqualNullSafe;
+import org.apache.spark.sql.catalyst.expressions.EqualTo;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.GreaterThan;
+import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.In;
+import org.apache.spark.sql.catalyst.expressions.InSet;
+import org.apache.spark.sql.catalyst.expressions.IsNotNull;
+import org.apache.spark.sql.catalyst.expressions.IsNull;
+import org.apache.spark.sql.catalyst.expressions.LessThan;
+import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Not;
+import org.apache.spark.sql.catalyst.expressions.Or;
+import org.apache.spark.sql.catalyst.expressions.ParseToDate;
+import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Year;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.collection.JavaConverters;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.expressions.Expressions.predicate;
+
+
+/**
+ * To convert spark catalyst Expression to Iceberg expression
+ *
+ */
+public class SparkExpressions {
Review comment:
DataSourceV2Strategy.toDataSourceFilters ... maybe work if we put it in
the right package
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]