This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 9e63ea6e2bcf1a1587bc3d8b32cda6e6ed76d03d Author: Yann Byron <[email protected]> AuthorDate: Fri May 17 12:58:53 2024 +0800 [spark][core] fix when a table have no snapshot or no statistics (#3341) --- .../apache/paimon/table/system/StatisticTable.java | 26 ++++++++++++++-------- .../commands/PaimonAnalyzeTableColumnCommand.scala | 11 ++++----- .../paimon/spark/sql/AnalyzeTableTestBase.scala | 8 +++++++ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java index 2f795fd4a..682b5b775 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java @@ -25,6 +25,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.EmptyRecordReader; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.FileStoreTable; @@ -53,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -204,16 +206,22 @@ public class StatisticTable implements ReadonlyTable { if (!(split instanceof StatisticTable.StatisticSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - Statistics statistics = dataTable.statistics().get(); - Iterator<Statistics> statisticsIterator = - Collections.singletonList(statistics).iterator(); - Iterator<InternalRow> rows = Iterators.transform(statisticsIterator, this::toRow); - if (projection != null) { - rows = - Iterators.transform( - rows, row -> ProjectedRow.from(projection).replaceRow(row)); + + Optional<Statistics> statisticsOptional = dataTable.statistics(); + if (statisticsOptional.isPresent()) { + Statistics statistics = statisticsOptional.get(); + Iterator<Statistics> statisticsIterator = + Collections.singletonList(statistics).iterator(); + Iterator<InternalRow> rows = Iterators.transform(statisticsIterator, this::toRow); + if (projection != null) { + rows = + Iterators.transform( + rows, row -> ProjectedRow.from(projection).replaceRow(row)); + } + return new IteratorRecordReader<>(rows); + } else { + return new EmptyRecordReader<>(); } - return new IteratorRecordReader<>(rows); } private InternalRow toRow(Statistics statistics) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala index 5f9957dbe..b3eb8059f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala @@ -53,10 +53,14 @@ case class PaimonAnalyzeTableColumnCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val relation = DataSourceV2Relation.create(v2Table, Some(catalog), Some(identifier)) + val attributes = getColumnsToAnalyze(relation) + val currentSnapshot = table.snapshotManager().latestSnapshot() + if (currentSnapshot == null) { + return Seq.empty[Row] + } // compute stats - val attributes = getColumnsToAnalyze(relation, columnNames, allColumns) val totalSize = PaimonStatsUtils.calculateTotalSize( sparkSession.sessionState, table.name(), @@ -92,10 +96,7 @@ case class PaimonAnalyzeTableColumnCommand( Seq.empty[Row] } - private def getColumnsToAnalyze( - relation: DataSourceV2Relation, - columnNames: Option[Seq[String]], - allColumns: Boolean): Seq[Attribute] = { + private def getColumnsToAnalyze(relation: DataSourceV2Relation): Seq[Attribute] = { if (columnNames.isDefined && allColumns) { throw new UnsupportedOperationException( "Parameter `columnNames` and `allColumns` are " + diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala index d1e6f7350..9ff252b52 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala @@ -59,6 +59,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)") spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)") + Assertions.assertEquals(0, spark.sql("select * from `T$statistics`").count()) spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") @@ -70,6 +71,13 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { Row(2, 0, 2, "{ }")) } + test("Paimon analyze: analyze table without snapshot") { + spark.sql(s"CREATE TABLE T (id STRING, name STRING)") + spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") + spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS") + Assertions.assertEquals(0, spark.sql("select * from `T$statistics`").count()) + } + test("Paimon analyze: analyze no scan") { spark.sql(s"CREATE TABLE T (id STRING, name STRING)") assertThatThrownBy(() => spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS NOSCAN"))
