This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d7c70577a [spark] Analyze size through scan splits instead of listing
files (#4119)
d7c70577a is described below
commit d7c70577ab2c7a7b25a3b4044f8c22e460ac57ac
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Sep 3 17:00:51 2024 +0800
[spark] Analyze size through scan splits instead of listing files (#4119)
---
.../org/apache/spark/sql/PaimonStatsUtils.scala | 18 +-----
.../org/apache/spark/sql/PaimonStatsUtils.scala | 17 +----
.../org/apache/spark/sql/PaimonStatsUtils.scala | 75 ----------------------
.../commands/PaimonAnalyzeTableColumnCommand.scala | 14 ++--
.../org/apache/spark/sql/PaimonStatsUtils.scala | 18 +-----
5 files changed, 12 insertions(+), 130 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
index 11a2406a1..f31195f66 100644
---
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
@@ -18,29 +18,13 @@
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType,
TimestampType}
-
-import java.net.URI
+import org.apache.spark.sql.types._
object PaimonStatsUtils {
- def calculateTotalSize(
- sessionState: SessionState,
- catalogName: String,
- identifier: Identifier,
- locationUri: Option[URI]): Long = {
- CommandUtils.calculateSingleLocationSize(
- sessionState,
- new TableIdentifier(identifier.name(),
Some(identifier.namespace().head)),
- locationUri)
- }
-
def computeColumnStats(
sparkSession: SparkSession,
relation: LogicalPlan,
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
index 7fcff6f32..f0b6f64af 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
@@ -18,27 +18,12 @@
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType,
TimestampNTZType, TimestampType}
-
-import java.net.URI
+import org.apache.spark.sql.types._
object PaimonStatsUtils {
- def calculateTotalSize(
- sessionState: SessionState,
- catalogName: String,
- identifier: Identifier,
- locationUri: Option[URI]): Long = {
- CommandUtils.calculateSingleLocationSize(
- sessionState,
- new TableIdentifier(identifier.name(),
Some(identifier.namespace().head)),
- locationUri)
- }
def computeColumnStats(
sparkSession: SparkSession,
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
deleted file mode 100644
index 8f8c45d49..000000000
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
-import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
-
-import java.net.URI
-
-/**
- * Some classes or methods defined in the spark project are marked as private
under
- * [[org.apache.spark.sql]] package, Hence, use this class to adapt then so
that we can use them
- * indirectly.
- */
-object PaimonStatsUtils {
-
- def calculateTotalSize(
- sessionState: SessionState,
- catalogName: String,
- identifier: Identifier,
- locationUri: Option[URI]): Long = {
- CommandUtils.calculateSingleLocationSize(
- sessionState,
- new TableIdentifier(identifier.name(),
Some(identifier.namespace().head)),
- locationUri)
- }
-
- def computeColumnStats(
- sparkSession: SparkSession,
- relation: LogicalPlan,
- columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
- CommandUtils.computeColumnStats(sparkSession, relation, columns)
- }
-
- /** [[IntegralType]] is private in spark, therefore we need add it here. */
- def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case BooleanType => true
- case _: DatetimeType => true
- case BinaryType | StringType => true
- case _ => false
- }
-
- def hasMinMax(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case BooleanType => true
- case _: DatetimeType => true
- case _ => false
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 3c51c3c5f..b13e5add0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -24,6 +24,7 @@ import
org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.stats.{ColStats, Statistics}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.table.source.DataSplit
import org.apache.parquet.Preconditions
import org.apache.spark.sql.{PaimonStatsUtils, Row, SparkSession}
@@ -61,11 +62,14 @@ case class PaimonAnalyzeTableColumnCommand(
}
// compute stats
- val totalSize = PaimonStatsUtils.calculateTotalSize(
- sparkSession.sessionState,
- catalog.name(),
- identifier,
- Some(table.location().toUri))
+ val totalSize = table
+ .newScan()
+ .plan()
+ .splits()
+ .asScala
+ .flatMap { case split: DataSplit => split.dataFiles().asScala }
+ .map(_.fileSize())
+ .sum
val (mergedRecordCount, colStats) =
PaimonStatsUtils.computeColumnStats(sparkSession, relation, attributes)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
index 7db0a270e..8f24700c2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala
@@ -18,15 +18,10 @@
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
-
-import java.net.URI
+import org.apache.spark.sql.types._
/**
* Some classes or methods defined in the spark project are marked as private
under
@@ -35,17 +30,6 @@ import java.net.URI
*/
object PaimonStatsUtils {
- def calculateTotalSize(
- sessionState: SessionState,
- catalogName: String,
- identifier: Identifier,
- locationUri: Option[URI]): Long = {
- CommandUtils.calculateSingleLocationSize(
- sessionState,
- new TableIdentifier(identifier.name(),
Some(identifier.namespace().head), Some(catalogName)),
- locationUri)
- }
-
def computeColumnStats(
sparkSession: SparkSession,
relation: LogicalPlan,