This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 9e63ea6e2bcf1a1587bc3d8b32cda6e6ed76d03d
Author: Yann Byron <[email protected]>
AuthorDate: Fri May 17 12:58:53 2024 +0800

    [spark][core] fix when a table have no snapshot or no statistics (#3341)
---
 .../apache/paimon/table/system/StatisticTable.java | 26 ++++++++++++++--------
 .../commands/PaimonAnalyzeTableColumnCommand.scala | 11 ++++-----
 .../paimon/spark/sql/AnalyzeTableTestBase.scala    |  8 +++++++
 3 files changed, 31 insertions(+), 14 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
index 2f795fd4a..682b5b775 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
@@ -25,6 +25,7 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.EmptyRecordReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.FileStoreTable;
@@ -53,6 +54,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
@@ -204,16 +206,22 @@ public class StatisticTable implements ReadonlyTable {
             if (!(split instanceof StatisticTable.StatisticSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            Statistics statistics = dataTable.statistics().get();
-            Iterator<Statistics> statisticsIterator =
-                    Collections.singletonList(statistics).iterator();
-            Iterator<InternalRow> rows = 
Iterators.transform(statisticsIterator, this::toRow);
-            if (projection != null) {
-                rows =
-                        Iterators.transform(
-                                rows, row -> 
ProjectedRow.from(projection).replaceRow(row));
+
+            Optional<Statistics> statisticsOptional = dataTable.statistics();
+            if (statisticsOptional.isPresent()) {
+                Statistics statistics = statisticsOptional.get();
+                Iterator<Statistics> statisticsIterator =
+                        Collections.singletonList(statistics).iterator();
+                Iterator<InternalRow> rows = 
Iterators.transform(statisticsIterator, this::toRow);
+                if (projection != null) {
+                    rows =
+                            Iterators.transform(
+                                    rows, row -> 
ProjectedRow.from(projection).replaceRow(row));
+                }
+                return new IteratorRecordReader<>(rows);
+            } else {
+                return new EmptyRecordReader<>();
             }
-            return new IteratorRecordReader<>(rows);
         }
 
         private InternalRow toRow(Statistics statistics) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 5f9957dbe..b3eb8059f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -53,10 +53,14 @@ case class PaimonAnalyzeTableColumnCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val relation = DataSourceV2Relation.create(v2Table, Some(catalog), 
Some(identifier))
+    val attributes = getColumnsToAnalyze(relation)
+
     val currentSnapshot = table.snapshotManager().latestSnapshot()
+    if (currentSnapshot == null) {
+      return Seq.empty[Row]
+    }
 
     // compute stats
-    val attributes = getColumnsToAnalyze(relation, columnNames, allColumns)
     val totalSize = PaimonStatsUtils.calculateTotalSize(
       sparkSession.sessionState,
       table.name(),
@@ -92,10 +96,7 @@ case class PaimonAnalyzeTableColumnCommand(
     Seq.empty[Row]
   }
 
-  private def getColumnsToAnalyze(
-      relation: DataSourceV2Relation,
-      columnNames: Option[Seq[String]],
-      allColumns: Boolean): Seq[Attribute] = {
+  private def getColumnsToAnalyze(relation: DataSourceV2Relation): 
Seq[Attribute] = {
     if (columnNames.isDefined && allColumns) {
       throw new UnsupportedOperationException(
         "Parameter `columnNames` and `allColumns` are " +
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
index d1e6f7350..9ff252b52 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
@@ -59,6 +59,7 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
 
     spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
     spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
+    Assertions.assertEquals(0, spark.sql("select * from 
`T$statistics`").count())
 
     spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")
 
@@ -70,6 +71,13 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
       Row(2, 0, 2, "{ }"))
   }
 
+  test("Paimon analyze: analyze table without snapshot") {
+    spark.sql(s"CREATE TABLE T (id STRING, name STRING)")
+    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")
+    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
+    Assertions.assertEquals(0, spark.sql("select * from 
`T$statistics`").count())
+  }
+
   test("Paimon analyze: analyze no scan") {
     spark.sql(s"CREATE TABLE T (id STRING, name STRING)")
     assertThatThrownBy(() => spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS 
NOSCAN"))

Reply via email to