This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 08ae725e1a Spark 3.3, 3.4: Fix always true/false condition in 
rewrite_data_files procedure (#6760)
08ae725e1a is described below

commit 08ae725e1ac75fb146ec916f27813fc2ab986561
Author: Miao Wang <[email protected]>
AuthorDate: Sat May 20 08:29:21 2023 +0800

    Spark 3.3, 3.4: Fix always true/false condition in rewrite_data_files 
procedure (#6760)
---
 .../extensions/TestRewriteDataFilesProcedure.java  | 43 ++++++++++++++++++++++
 .../datasources/SparkExpressionConverter.scala     |  4 ++
 .../extensions/TestRewriteDataFilesProcedure.java  | 43 ++++++++++++++++++++++
 .../datasources/SparkExpressionConverter.scala     |  4 ++
 4 files changed, 94 insertions(+)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 44aca898b6..78fe78742b 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -245,6 +245,49 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesWithDeterministicTrueFilter() {
+    createTable();
+    // create 10 files under non-partitioned table
+    insertData(10);
+    List<Object[]> expectedRecords = currentData();
+    // select all 10 files for compaction
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+            catalogName, tableIdent);
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+  }
+
+  @Test
+  public void testRewriteDataFilesWithDeterministicFalseFilter() {
+    createTable();
+    // create 10 files under non-partitioned table
+    insertData(10);
+    List<Object[]> expectedRecords = currentData();
+    // select no files for compaction
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
+            catalogName, tableIdent);
+    assertEquals(
+        "Action should rewrite 0 data files and add 0 data files",
+        row(0, 0),
+        Arrays.copyOf(output.get(0), 2));
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+  }
+
   @Test
   public void testRewriteDataFilesWithFilterOnPartitionTable() {
     createPartitionTable();
diff --git 
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
 
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 554fa7f66d..9f53eae60a 100644
--- 
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++ 
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -24,8 +24,10 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.Filter
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 
 object SparkExpressionConverter {
 
@@ -44,6 +46,8 @@ object SparkExpressionConverter {
     val optimizedLogicalPlan = 
session.sessionState.executePlan(filter).optimizedPlan
     optimizedLogicalPlan.collectFirst {
       case filter: Filter => filter.condition
+      case dummyRelation: DummyRelation => Literal.TrueLiteral
+      case localRelation: LocalRelation => Literal.FalseLiteral
     }.getOrElse(throw new AnalysisException("Failed to find filter 
expression"))
   }
 
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 3c07e676a5..3ed47d54d3 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -253,6 +253,49 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesWithDeterministicTrueFilter() {
+    createTable();
+    // create 10 files under non-partitioned table
+    insertData(10);
+    List<Object[]> expectedRecords = currentData();
+    // select all 10 files for compaction
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+            catalogName, tableIdent);
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(4);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+  }
+
+  @Test
+  public void testRewriteDataFilesWithDeterministicFalseFilter() {
+    createTable();
+    // create 10 files under non-partitioned table
+    insertData(10);
+    List<Object[]> expectedRecords = currentData();
+    // select no files for compaction
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
+            catalogName, tableIdent);
+    assertEquals(
+        "Action should rewrite 0 data files and add 0 data files",
+        row(0, 0),
+        Arrays.copyOf(output.get(0), 2));
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+  }
+
   @Test
   public void testRewriteDataFilesWithFilterOnPartitionTable() {
     createPartitionTable();
diff --git 
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
 
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 554fa7f66d..9f53eae60a 100644
--- 
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++ 
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -24,8 +24,10 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.Filter
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 
 object SparkExpressionConverter {
 
@@ -44,6 +46,8 @@ object SparkExpressionConverter {
     val optimizedLogicalPlan = 
session.sessionState.executePlan(filter).optimizedPlan
     optimizedLogicalPlan.collectFirst {
       case filter: Filter => filter.condition
+      case dummyRelation: DummyRelation => Literal.TrueLiteral
+      case localRelation: LocalRelation => Literal.FalseLiteral
     }.getOrElse(throw new AnalysisException("Failed to find filter 
expression"))
   }
 

Reply via email to