This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.1 by this push:
     new f22c5b9001 [core] Disable PARTITION_IDLE_TIME_TO_REPORT_STATISTIC by 
default (#5514)
f22c5b9001 is described below

commit f22c5b900105ed22d8a5ec16b3da26d71443d19e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 22 21:40:46 2025 +0800

    [core] Disable PARTITION_IDLE_TIME_TO_REPORT_STATISTIC by default (#5514)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  2 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 46 +++++++---------------
 3 files changed, 16 insertions(+), 34 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 8d950f5f05..bc444a2ae7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -697,7 +697,7 @@ This config option does not affect the default filesystem 
metastore.</td>
         </tr>
         <tr>
             <td><h5>partition.idle-time-to-report-statistic</h5></td>
-            <td style="word-wrap: break-word;">1 h</td>
+            <td style="word-wrap: break-word;">0 ms</td>
             <td>Duration</td>
             <td>Set a time duration when a partition has no new data after 
this time duration, start to report the partition statistics to hms.</td>
         </tr>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index cd2ba6322b..0cb1736307 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1631,7 +1631,7 @@ public class CoreOptions implements Serializable {
     public static final ConfigOption<Duration> 
PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
             key("partition.idle-time-to-report-statistic")
                     .durationType()
-                    .defaultValue(Duration.ofHours(1))
+                    .defaultValue(Duration.ofMillis(0))
                     .withDescription(
                             "Set a time duration when a partition has no new 
data after this time duration, "
                                     + "start to report the partition 
statistics to hms.");
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 2366ff9e02..a87668311e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -27,8 +27,8 @@ import org.apache.paimon.deletionvectors.DeletionVector
 import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer
 import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
 import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
-import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
-import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkRow, 
SparkTableWrite, SparkTypeUtils}
+import org.apache.paimon.manifest.FileKind
+import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
 import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
 import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
 import org.apache.paimon.spark.util.SparkRowUtils
@@ -40,10 +40,8 @@ import 
org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils
 
 import org.apache.spark.{Partitioner, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.StructType
 import org.slf4j.LoggerFactory
 
 import java.io.IOException
@@ -59,6 +57,15 @@ case class PaimonSparkWriter(table: FileStoreTable) {
 
   private lazy val bucketMode = table.bucketMode
 
+  private lazy val disableReportStats = {
+    val options = table.coreOptions()
+    val config = options.toConfiguration
+    config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis 
<= 0 ||
+    table.partitionKeys.isEmpty ||
+    !options.partitionedTableInMetastore ||
+    table.catalogEnvironment.partitionHandler() == null
+  }
+
   private lazy val log = LoggerFactory.getLogger(classOf[PaimonSparkWriter])
 
   @transient private lazy val serializer = new CommitMessageSerializer
@@ -326,37 +333,12 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       .map(deserializeCommitMessage(serializer, _))
   }
 
-  def buildCommitMessageFromIndexManifestEntry(
-      indexManifestEntries: Seq[IndexManifestEntry]): Seq[CommitMessage] = {
-    indexManifestEntries
-      .groupBy(entry => (entry.partition(), entry.bucket()))
-      .map {
-        case ((partition, bucket), entries) =>
-          val (added, removed) = entries.partition(_.kind() == FileKind.ADD)
-          new CommitMessageImpl(
-            partition,
-            bucket,
-            null,
-            DataIncrement.emptyIncrement(),
-            CompactIncrement.emptyIncrement(),
-            new IndexIncrement(added.map(_.indexFile()).asJava, 
removed.map(_.indexFile()).asJava))
-      }
-      .toSeq
-  }
-
   private def reportToHms(messages: Seq[CommitMessage]): Unit = {
-    val options = table.coreOptions()
-    val config = options.toConfiguration
-
-    if (
-      config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis 
<= 0 ||
-      table.partitionKeys.isEmpty ||
-      !options.partitionedTableInMetastore ||
-      table.catalogEnvironment.partitionHandler() == null
-    ) {
+    if (disableReportStats) {
       return
     }
 
+    val options = table.coreOptions()
     val partitionComputer = new InternalRowPartitionComputer(
       options.partitionDefaultName,
       table.schema.logicalPartitionType,

Reply via email to