This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 08ae725e1a Spark 3.3, 3.4: Fix always true/false condition in
rewrite_data_files procedure (#6760)
08ae725e1a is described below
commit 08ae725e1ac75fb146ec916f27813fc2ab986561
Author: Miao Wang <[email protected]>
AuthorDate: Sat May 20 08:29:21 2023 +0800
Spark 3.3, 3.4: Fix always true/false condition in rewrite_data_files
procedure (#6760)
---
.../extensions/TestRewriteDataFilesProcedure.java | 43 ++++++++++++++++++++++
.../datasources/SparkExpressionConverter.scala | 4 ++
.../extensions/TestRewriteDataFilesProcedure.java | 43 ++++++++++++++++++++++
.../datasources/SparkExpressionConverter.scala | 4 ++
4 files changed, 94 insertions(+)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 44aca898b6..78fe78742b 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -245,6 +245,49 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithDeterministicTrueFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select all 10 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
+ @Test
+ public void testRewriteDataFilesWithDeterministicFalseFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select no files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 0 data files and add 0 data files",
+ row(0, 0),
+ Arrays.copyOf(output.get(0), 2));
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
@Test
public void testRewriteDataFilesWithFilterOnPartitionTable() {
createPartitionTable();
diff --git
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 554fa7f66d..9f53eae60a 100644
---
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -24,8 +24,10 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
object SparkExpressionConverter {
@@ -44,6 +46,8 @@ object SparkExpressionConverter {
val optimizedLogicalPlan =
session.sessionState.executePlan(filter).optimizedPlan
optimizedLogicalPlan.collectFirst {
case filter: Filter => filter.condition
+ case dummyRelation: DummyRelation => Literal.TrueLiteral
+ case localRelation: LocalRelation => Literal.FalseLiteral
}.getOrElse(throw new AnalysisException("Failed to find filter
expression"))
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 3c07e676a5..3ed47d54d3 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -253,6 +253,49 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithDeterministicTrueFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select all 10 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(4);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
+ @Test
+ public void testRewriteDataFilesWithDeterministicFalseFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select no files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 0 data files and add 0 data files",
+ row(0, 0),
+ Arrays.copyOf(output.get(0), 2));
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
@Test
public void testRewriteDataFilesWithFilterOnPartitionTable() {
createPartitionTable();
diff --git
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 554fa7f66d..9f53eae60a 100644
---
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -24,8 +24,10 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
object SparkExpressionConverter {
@@ -44,6 +46,8 @@ object SparkExpressionConverter {
val optimizedLogicalPlan =
session.sessionState.executePlan(filter).optimizedPlan
optimizedLogicalPlan.collectFirst {
case filter: Filter => filter.condition
+ case dummyRelation: DummyRelation => Literal.TrueLiteral
+ case localRelation: LocalRelation => Literal.FalseLiteral
}.getOrElse(throw new AnalysisException("Failed to find filter
expression"))
}