This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.1 by this push:
new f22c5b9001 [core] Disable PARTITION_IDLE_TIME_TO_REPORT_STATISTIC by
default (#5514)
f22c5b9001 is described below
commit f22c5b900105ed22d8a5ec16b3da26d71443d19e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 22 21:40:46 2025 +0800
[core] Disable PARTITION_IDLE_TIME_TO_REPORT_STATISTIC by default (#5514)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 46 +++++++---------------
3 files changed, 16 insertions(+), 34 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 8d950f5f05..bc444a2ae7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -697,7 +697,7 @@ This config option does not affect the default filesystem
metastore.</td>
</tr>
<tr>
<td><h5>partition.idle-time-to-report-statistic</h5></td>
- <td style="word-wrap: break-word;">1 h</td>
+ <td style="word-wrap: break-word;">0 ms</td>
<td>Duration</td>
<td>Set a time duration when a partition has no new data after
this time duration, start to report the partition statistics to hms.</td>
</tr>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index cd2ba6322b..0cb1736307 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1631,7 +1631,7 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<Duration>
PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
key("partition.idle-time-to-report-statistic")
.durationType()
- .defaultValue(Duration.ofHours(1))
+ .defaultValue(Duration.ofMillis(0))
.withDescription(
"Set a time duration when a partition has no new
data after this time duration, "
+ "start to report the partition
statistics to hms.");
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 2366ff9e02..a87668311e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -27,8 +27,8 @@ import org.apache.paimon.deletionvectors.DeletionVector
import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
-import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
-import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkRow,
SparkTableWrite, SparkTypeUtils}
+import org.apache.paimon.manifest.FileKind
+import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
import org.apache.paimon.spark.util.SparkRowUtils
@@ -40,10 +40,8 @@ import
org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils
import org.apache.spark.{Partitioner, TaskContext}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
import java.io.IOException
@@ -59,6 +57,15 @@ case class PaimonSparkWriter(table: FileStoreTable) {
private lazy val bucketMode = table.bucketMode
+ private lazy val disableReportStats = {
+ val options = table.coreOptions()
+ val config = options.toConfiguration
+ config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis
<= 0 ||
+ table.partitionKeys.isEmpty ||
+ !options.partitionedTableInMetastore ||
+ table.catalogEnvironment.partitionHandler() == null
+ }
+
private lazy val log = LoggerFactory.getLogger(classOf[PaimonSparkWriter])
@transient private lazy val serializer = new CommitMessageSerializer
@@ -326,37 +333,12 @@ case class PaimonSparkWriter(table: FileStoreTable) {
.map(deserializeCommitMessage(serializer, _))
}
- def buildCommitMessageFromIndexManifestEntry(
- indexManifestEntries: Seq[IndexManifestEntry]): Seq[CommitMessage] = {
- indexManifestEntries
- .groupBy(entry => (entry.partition(), entry.bucket()))
- .map {
- case ((partition, bucket), entries) =>
- val (added, removed) = entries.partition(_.kind() == FileKind.ADD)
- new CommitMessageImpl(
- partition,
- bucket,
- null,
- DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(added.map(_.indexFile()).asJava,
removed.map(_.indexFile()).asJava))
- }
- .toSeq
- }
-
private def reportToHms(messages: Seq[CommitMessage]): Unit = {
- val options = table.coreOptions()
- val config = options.toConfiguration
-
- if (
- config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis
<= 0 ||
- table.partitionKeys.isEmpty ||
- !options.partitionedTableInMetastore ||
- table.catalogEnvironment.partitionHandler() == null
- ) {
+ if (disableReportStats) {
return
}
+ val options = table.coreOptions()
val partitionComputer = new InternalRowPartitionComputer(
options.partitionDefaultName,
table.schema.logicalPartitionType,