ajantha-bhat commented on a change in pull request #3375:
URL: https://github.com/apache/iceberg/pull/3375#discussion_r743357630
##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which
are not known to the t
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location =>
'tablelocation/data')
```
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more
metadata stored in manifest files, and small data files causes an unnecessary
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the
`rewriteDataFiles` action. This will combine small files into larger files to
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to update |
+| `strategy` | | string | Name of the strategy - binpack or sort.
Defaults to binpack strategy |
+| `sort_order` | | string | Comma separated sort_order_column. Where
sort_order_column is a space separated sort order info per column (ColumnName
SortDirection NullOrder). <br/> All three members are mandatory to provide for
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be FIRST or
LAST |
+| `options` | ️ | map<string, string> | Options to be used for actions|
+| `where` | ️ | string | predicate as a string used for filtering the
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/> [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by
this command |
+| `added_data_files_count` | int | Number of new data files which were
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table
partitioning.
+```sql
+CALL catalog_name.system.rewrite_data_files('db.sample')
+```
+
+Rewrite the data files in table `db.sample` and use sort strategy with id
column as sort order column.
+```sql
+CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy =>
'sort', sort_order => 'id DESC LAST')
Review comment:
I can still have it as NULLS_LAST and NULLS_FIRST (instead of space, use
underscore) and have document mention these syntax.
##########
File path: site/docs/spark-procedures.md
##########
@@ -240,6 +240,57 @@ Remove any files in the `tablelocation/data` folder which
are not known to the t
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location =>
'tablelocation/data')
```
+### `rewrite_data_files`
+
+Iceberg tracks each data file in a table. More data files leads to more
metadata stored in manifest files, and small data files causes an unnecessary
amount of metadata and less efficient queries from file open costs.
+
+Iceberg can compact data files in parallel using Spark with the
`rewriteDataFiles` action. This will combine small files into larger files to
reduce metadata overhead and runtime file open cost.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to update |
+| `strategy` | | string | Name of the strategy - binpack or sort.
Defaults to binpack strategy |
+| `sort_order` | | string | Comma separated sort_order_column. Where
sort_order_column is a space separated sort order info per column (ColumnName
SortDirection NullOrder). <br/> All three members are mandatory to provide for
sort_order_column. SortDirection can be ASC or DESC. NullOrder can be FIRST or
LAST |
+| `options` | ️ | map<string, string> | Options to be used for actions|
+| `where` | ️ | string | predicate as a string used for filtering the
files.|
+
+
+See the [`RewriteDataFiles` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
+<br/> [`BinPackStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/BinPackStrategy.html#field.summary)
+and <br/> [`SortStrategy` Javadoc](./javadoc/{{ versions.iceberg
}}/org/apache/iceberg/actions/SortStrategy.html#field.summary)
+for list of all the supported options for this action.
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `rewritten_data_files_count` | int | Number of data which were re-written by
this command |
+| `added_data_files_count` | int | Number of new data files which were
written by this command |
+
+#### Examples
+
+Rewrite the data files in table `db.sample` and align data files with table
partitioning.
+```sql
+CALL catalog_name.system.rewrite_data_files('db.sample')
+```
+
+Rewrite the data files in table `db.sample` and use sort strategy with id
column as sort order column.
+```sql
+CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy =>
'sort', sort_order => 'id DESC LAST')
Review comment:
Done. I have modified and pushed as NULLS_FIRST and NULLS_LAST
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkExpressions.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
+import org.apache.spark.sql.catalyst.expressions.And;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.BinaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Cast;
+import org.apache.spark.sql.catalyst.expressions.EqualNullSafe;
+import org.apache.spark.sql.catalyst.expressions.EqualTo;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.GreaterThan;
+import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.In;
+import org.apache.spark.sql.catalyst.expressions.InSet;
+import org.apache.spark.sql.catalyst.expressions.IsNotNull;
+import org.apache.spark.sql.catalyst.expressions.IsNull;
+import org.apache.spark.sql.catalyst.expressions.LessThan;
+import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Not;
+import org.apache.spark.sql.catalyst.expressions.Or;
+import org.apache.spark.sql.catalyst.expressions.ParseToDate;
+import org.apache.spark.sql.catalyst.expressions.UnaryExpression;
+import org.apache.spark.sql.catalyst.expressions.Year;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.collection.JavaConverters;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.expressions.Expressions.predicate;
+
+
+/**
+ * To convert spark catalyst Expression to Iceberg expression
+ *
+ */
+public class SparkExpressions {
+ private static final Map<Class<? extends Expression>, Operation> FILTERS =
ImmutableMap
+ .<Class<? extends Expression>, Operation>builder()
+ .put(EqualTo.class, Operation.EQ)
+ .put(EqualNullSafe.class, Operation.EQ)
+ .put(GreaterThan.class, Operation.GT)
+ .put(GreaterThanOrEqual.class, Operation.GT_EQ)
+ .put(LessThan.class, Operation.LT)
+ .put(LessThanOrEqual.class, Operation.LT_EQ)
+ .put(In.class, Operation.IN)
+ .put(InSet.class, Operation.IN)
+ .put(IsNull.class, Operation.IS_NULL)
+ .put(IsNotNull.class, Operation.NOT_NULL)
+ .put(And.class, Operation.AND)
+ .put(Or.class, Operation.OR)
+ .put(Not.class, Operation.NOT)
+ .build();
+ private static final Map<Class<? extends Expression>, Transform> TRANSFORMS
= ImmutableMap
+ .<Class<? extends Expression>, Transform>builder()
+ .put(UnresolvedAttribute.class, Transform.IDENTITY)
+ .put(AttributeReference.class, Transform.IDENTITY)
+ .put(Year.class, Transform.YEAR)
+ .put(ParseToDate.class, Transform.DAY)
+ .put(Cast.class, Transform.DAY)
+ .build();
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ private SparkExpressions() {
+ }
+
+ public static org.apache.iceberg.expressions.Expression convert(Expression
expr) {
+ Class<? extends Expression> exprClass = expr.getClass();
+ Operation op = FILTERS.get(exprClass);
+ if (op != null) {
+ switch (op) {
+ case IS_NULL:
+ case NOT_NULL:
+ UnaryExpression unary = (UnaryExpression) expr;
+ if (unary.child() instanceof Attribute) {
+ Attribute attr = (Attribute) unary.child();
+ return predicate(op, attr.name());
+ }
+ return null;
+ case LT:
+ case LT_EQ:
+ case GT:
+ case GT_EQ:
+ case EQ:
+ case NOT_EQ:
+ BinaryExpression binary = (BinaryExpression) expr;
+ return convert(op, binary.left(), binary.right());
+ case NOT:
+ org.apache.iceberg.expressions.Expression child = convert(((Not)
expr).child());
+ if (child != null) {
+ return not(child);
+ }
+ return null;
+ case AND:
+ And andExpr = (And) expr;
+ org.apache.iceberg.expressions.Expression andLeft =
convert(andExpr.left());
+ org.apache.iceberg.expressions.Expression andRight =
convert(andExpr.right());
+ if (andLeft != null && andRight != null) {
+ return and(convert(andExpr.left()), convert(andExpr.right()));
+ }
+ return null;
+ case OR:
+ Or orExpr = (Or) expr;
+ org.apache.iceberg.expressions.Expression orLeft =
convert(orExpr.left());
+ org.apache.iceberg.expressions.Expression orRight =
convert(orExpr.right());
+ if (orLeft != null && orRight != null) {
+ return or(orLeft, orRight);
+ }
+ return null;
+ case IN:
+ if (expr instanceof In) {
+ In inExpr = (In) expr;
+ List<Object> literals =
convertLiterals(JavaConverters.seqAsJavaListConverter(inExpr.list()).asJava());
+ if (literals != null) {
+ return convertIn(inExpr.value(), literals);
+ } else {
+ // if the list contained a non-literal, it can't be converted
+ return null;
+ }
+ } else if (expr instanceof InSet) {
+ InSet inExpr = (InSet) expr;
+ // expressions are already converted to Java objects
+ Set<Object> literals =
JavaConverters.setAsJavaSetConverter(inExpr.hset()).asJava();
+ return convertIn(inExpr.child(), literals);
+ }
+ return null;
+ default:
+ }
+ }
+ // can't convert
+ return null;
Review comment:
ok. I can throw unsupported exception with reason. So that the users can
know.
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+ public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testRewriteDataFilesInEmptyTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(0, 0)),
+ output);
+ }
+
+ @Test
+ public void testRewriteDataFilesOnPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)", tableName);
+
+ // create 5 files for each partition (data = 'a' and data = 'b')
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s order by id", tableName);
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 10 data files and add 2 data files
(one per partition) ",
+ ImmutableList.of(row(10, 2)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesOnNonPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 10 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s')", catalogName,
tableIdent);
+
+ assertEquals("Action should rewrite 10 data files and add 1 data files",
+ ImmutableList.of(row(10, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithOptions() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 2 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // set the min-input-files = 2, instead of default 5 to allow compaction
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', options =>
map('min-input-files','2'))",
+ catalogName, tableIdent);
+
+ assertEquals("Action should rewrite 2 data files and add 1 data files",
+ ImmutableList.of(row(2, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithSortStrategy() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 2 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // set the min-input-files = 2, instead of default 5 to allow compaction
+ // set sort_order = id DESC LAST
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', options =>
map('min-input-files','2'), " +
+ "strategy => 'sort', sort_order => 'id DESC NULLS_LAST')",
+ catalogName, tableIdent);
+
+ assertEquals("Action should rewrite 2 data files and add 1 data files",
+ ImmutableList.of(row(2, 1)),
+ output);
+
+ List<Object[]> df2 = sql("SELECT * FROM %s ", tableName);
+
+ Assert.assertEquals("data count should match", df1.size(), df2.size());
+ }
+
+ @Test
+ public void testRewriteDataFilesWithFilter() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ // create 10 files under non-partitioned table
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ List<Object[]> df1 = sql("SELECT * FROM %s ", tableName);
+
+ // select only 5 files for compaction (files having id = 1)
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files(table => '%s'," +
+ " where => 'id = 1 and data is not null')", catalogName,
tableIdent);
Review comment:
Agree. May be better to restrict to only partition spec columns.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExpressions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that rewrites datafiles in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles(Table)
+ */
+class RewriteDataFilesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("strategy", DataTypes.StringType),
+ ProcedureParameter.optional("sort_order", DataTypes.StringType),
+ ProcedureParameter.optional("options", STRING_MAP),
+ ProcedureParameter.optional("where", DataTypes.StringType)
+ };
+
+ // counts are not nullable since the action result is never null
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("rewritten_data_files_count", DataTypes.IntegerType,
false, Metadata.empty()),
+ new StructField("added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<RewriteDataFilesProcedure>() {
+ @Override
+ protected RewriteDataFilesProcedure doBuild() {
+ return new RewriteDataFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private RewriteDataFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+
+ return modifyIcebergTable(tableIdent, table -> {
+ RewriteDataFiles action = actions().rewriteDataFiles(table);
+
+ String strategy = args.isNullAt(1) ? null : args.getString(1);
+ String sortOrders = args.isNullAt(2) ? null : args.getString(2);
+ if (sortOrders != null && (strategy == null ||
strategy.equalsIgnoreCase("binpack"))) {
+ throw new IllegalArgumentException(
+ "sort_order is not accepted for binpack strategy. Please
configure sort strategy to use sort_order");
+ }
+ SortOrder sortOrder = collectSortOrders(strategy, table, action,
sortOrders);
+ action = checkAndApplyStrategy(action, strategy, sortOrder);
+
+ if (!args.isNullAt(3)) {
+ action = checkAndApplyOptions(args, action);
+ }
+
+ String where = args.isNullAt(4) ? null : args.getString(4);
+ action = checkAndApplyFilter(action, where);
+
+ RewriteDataFiles.Result result = action.execute();
+
+ return toOutputRows(result);
+ });
+ }
+
+ private RewriteDataFiles checkAndApplyFilter(RewriteDataFiles action, String
where) {
+ if (where != null) {
+ ParserInterface sqlParser = spark().sessionState().sqlParser();
+ try {
+ Expression expression = sqlParser.parseExpression(where);
+ return action.filter(SparkExpressions.convert(expression));
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Cannot parse predicates in where
option: " + where);
+ }
+ }
+ return action;
+ }
+
+ private RewriteDataFiles checkAndApplyOptions(InternalRow args,
RewriteDataFiles action) {
+ Map<String, String> options = Maps.newHashMap();
+ args.getMap(3).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ options.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
Review comment:
Referred from `SnapshotTableProcedure` :)
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
+
+ public TestRewriteDataFilesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testRewriteDataFilesInEmptyTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ List<Object[]> output = sql(
+ "CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(0, 0)),
+ output);
+ }
+
+ @Test
+ public void testRewriteDataFilesOnPartitionTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)", tableName);
+
+ // create 5 files for each partition (data = 'a' and data = 'b')
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
Review comment:
we should have a common util for this ? Instead of duplicating this code
in test classes ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]