This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d7c70577a [spark] Analyze size through scan splits instead of listing 
files (#4119)
d7c70577a is described below

commit d7c70577ab2c7a7b25a3b4044f8c22e460ac57ac
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Sep 3 17:00:51 2024 +0800

    [spark] Analyze size through scan splits instead of listing files (#4119)
---
 .../org/apache/spark/sql/PaimonStatsUtils.scala    | 18 +-----
 .../org/apache/spark/sql/PaimonStatsUtils.scala    | 17 +----
 .../org/apache/spark/sql/PaimonStatsUtils.scala    | 75 ----------------------
 .../commands/PaimonAnalyzeTableColumnCommand.scala | 14 ++--
 .../org/apache/spark/sql/PaimonStatsUtils.scala    | 18 +-----
 5 files changed, 12 insertions(+), 130 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
index 11a2406a1..f31195f66 100644
--- 
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++ 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
@@ -18,29 +18,13 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType, 
TimestampType}
-
-import java.net.URI
+import org.apache.spark.sql.types._
 
 object PaimonStatsUtils {
 
-  def calculateTotalSize(
-      sessionState: SessionState,
-      catalogName: String,
-      identifier: Identifier,
-      locationUri: Option[URI]): Long = {
-    CommandUtils.calculateSingleLocationSize(
-      sessionState,
-      new TableIdentifier(identifier.name(), 
Some(identifier.namespace().head)),
-      locationUri)
-  }
-
   def computeColumnStats(
       sparkSession: SparkSession,
       relation: LogicalPlan,
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
index 7fcff6f32..f0b6f64af 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
@@ -18,27 +18,12 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType, 
TimestampNTZType, TimestampType}
-
-import java.net.URI
+import org.apache.spark.sql.types._
 
 object PaimonStatsUtils {
-  def calculateTotalSize(
-      sessionState: SessionState,
-      catalogName: String,
-      identifier: Identifier,
-      locationUri: Option[URI]): Long = {
-    CommandUtils.calculateSingleLocationSize(
-      sessionState,
-      new TableIdentifier(identifier.name(), 
Some(identifier.namespace().head)),
-      locationUri)
-  }
 
   def computeColumnStats(
       sparkSession: SparkSession,
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
deleted file mode 100644
index 8f8c45d49..000000000
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
-import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
-
-import java.net.URI
-
-/**
- * Some classes or methods defined in the spark project are marked as private 
under
- * [[org.apache.spark.sql]] package, Hence, use this class to adapt then so 
that we can use them
- * indirectly.
- */
-object PaimonStatsUtils {
-
-  def calculateTotalSize(
-      sessionState: SessionState,
-      catalogName: String,
-      identifier: Identifier,
-      locationUri: Option[URI]): Long = {
-    CommandUtils.calculateSingleLocationSize(
-      sessionState,
-      new TableIdentifier(identifier.name(), 
Some(identifier.namespace().head)),
-      locationUri)
-  }
-
-  def computeColumnStats(
-      sparkSession: SparkSession,
-      relation: LogicalPlan,
-      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
-    CommandUtils.computeColumnStats(sparkSession, relation, columns)
-  }
-
-  /** [[IntegralType]] is private in spark, therefore we need add it here. */
-  def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case BooleanType => true
-    case _: DatetimeType => true
-    case BinaryType | StringType => true
-    case _ => false
-  }
-
-  def hasMinMax(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case BooleanType => true
-    case _: DatetimeType => true
-    case _ => false
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 3c51c3c5f..b13e5add0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -24,6 +24,7 @@ import 
org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.stats.{ColStats, Statistics}
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.table.source.DataSplit
 
 import org.apache.parquet.Preconditions
 import org.apache.spark.sql.{PaimonStatsUtils, Row, SparkSession}
@@ -61,11 +62,14 @@ case class PaimonAnalyzeTableColumnCommand(
     }
 
     // compute stats
-    val totalSize = PaimonStatsUtils.calculateTotalSize(
-      sparkSession.sessionState,
-      catalog.name(),
-      identifier,
-      Some(table.location().toUri))
+    val totalSize = table
+      .newScan()
+      .plan()
+      .splits()
+      .asScala
+      .flatMap { case split: DataSplit => split.dataFiles().asScala }
+      .map(_.fileSize())
+      .sum
     val (mergedRecordCount, colStats) =
       PaimonStatsUtils.computeColumnStats(sparkSession, relation, attributes)
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
index 7db0a270e..8f24700c2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
@@ -18,15 +18,10 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
-
-import java.net.URI
+import org.apache.spark.sql.types._
 
 /**
  * Some classes or methods defined in the spark project are marked as private 
under
@@ -35,17 +30,6 @@ import java.net.URI
  */
 object PaimonStatsUtils {
 
-  def calculateTotalSize(
-      sessionState: SessionState,
-      catalogName: String,
-      identifier: Identifier,
-      locationUri: Option[URI]): Long = {
-    CommandUtils.calculateSingleLocationSize(
-      sessionState,
-      new TableIdentifier(identifier.name(), 
Some(identifier.namespace().head), Some(catalogName)),
-      locationUri)
-  }
-
   def computeColumnStats(
       sparkSession: SparkSession,
       relation: LogicalPlan,

Reply via email to