Copilot commented on code in PR #6612:
URL: https://github.com/apache/paimon/pull/6612#discussion_r2538736989


##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util
+import java.util.{Arrays, Collections, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+/** Test rescale procedure. See [[RescaleProcedure]]. */

Review Comment:
   The comment references `RescaleProcedureTestBase` but this test class 
extends `PaimonSparkTestBase` directly, not a base class. The description in 
the PR also mentions `RescaleProcedureTestBase`, but no such base class exists 
in the changes. Consider either creating the referenced base class or updating 
the documentation to reflect the actual class structure.
   ```suggestion
   /** Tests for the rescale procedure. See [[RescaleProcedure]]. */
   ```



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util
+import java.util.{Arrays, Collections, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+/** Test rescale procedure. See [[RescaleProcedure]]. */
+class RescaleProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon Procedure: rescale basic functionality") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), 
(5, 'e')")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+      Assertions.assertThat(getBucketCount(reloadedTable)).isEqualTo(4)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+
+      // Rescale without bucket_num (use current bucket)
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '3')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T')"), Row(true) :: 
Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(3)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(4)
+    }
+  }
+
+  test("Paimon Procedure: rescale partitioned tables") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, pt STRING, dt 
STRING, hh INT)
+                   |TBLPROPERTIES ('primary-key'='id, pt, dt, hh', 
'bucket'='2')
+                   |PARTITIONED BY (pt, dt, hh)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(
+        s"INSERT INTO T VALUES (1, 'a', 'p1', '2024-01-01', 0), (2, 'b', 'p1', 
'2024-01-01', 0)")
+      spark.sql(
+        s"INSERT INTO T VALUES (3, 'c', 'p2', '2024-01-01', 1), (4, 'd', 'p2', 
'2024-01-02', 0)")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale single partition field
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p1\"')"),
+        Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+
+      val p1Predicate = PartitionPredicate.fromMap(
+        reloadedTable.schema().logicalPartitionType(),
+        Collections.singletonMap("pt", "p1"),
+        reloadedTable.coreOptions().partitionDefaultName())
+      val p1Splits = reloadedTable.newSnapshotReader
+        .withPartitionFilter(p1Predicate)
+        .read
+        .dataSplits
+        .asScala
+        .toList
+      p1Splits.foreach(split => 
Assertions.assertThat(split.bucket()).isLessThan(4))
+
+      // Rescale multiple partition fields
+      val snapshotBeforeTest2 = lastSnapshotId(reloadedTable)
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, partitions => 
'dt=\"2024-01-01\",hh=0')"),
+        Row(true) :: Nil)
+
+      val reloadedTable2 = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable2)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable2)).isGreaterThan(snapshotBeforeTest2)
+
+      // Rescale empty partition (should not create new snapshot)
+      val snapshotBeforeEmpty = lastSnapshotId(reloadedTable2)
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p3\"')"),
+        Row(true) :: Nil)
+      
Assertions.assertThat(lastSnapshotId(loadTable("T"))).isEqualTo(snapshotBeforeEmpty)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)

Review Comment:
   Consider using `Arrays.asList()` directly instead of creating a new 
`ArrayList` and adding elements one by one. This can be simplified to: 
`Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData:
 _*))` similar to line 301.
   ```suggestion
         
Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData:
 _*))
   ```



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util
+import java.util.{Arrays, Collections, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+/** Test rescale procedure. See [[RescaleProcedure]]. */
+class RescaleProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon Procedure: rescale basic functionality") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), 
(5, 'e')")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+      Assertions.assertThat(getBucketCount(reloadedTable)).isEqualTo(4)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+
+      // Rescale without bucket_num (use current bucket)
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '3')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T')"), Row(true) :: 
Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(3)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(4)
+    }
+  }
+
+  test("Paimon Procedure: rescale partitioned tables") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, pt STRING, dt 
STRING, hh INT)
+                   |TBLPROPERTIES ('primary-key'='id, pt, dt, hh', 
'bucket'='2')
+                   |PARTITIONED BY (pt, dt, hh)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(
+        s"INSERT INTO T VALUES (1, 'a', 'p1', '2024-01-01', 0), (2, 'b', 'p1', 
'2024-01-01', 0)")
+      spark.sql(
+        s"INSERT INTO T VALUES (3, 'c', 'p2', '2024-01-01', 1), (4, 'd', 'p2', 
'2024-01-02', 0)")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale single partition field
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p1\"')"),
+        Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+
+      val p1Predicate = PartitionPredicate.fromMap(
+        reloadedTable.schema().logicalPartitionType(),
+        Collections.singletonMap("pt", "p1"),
+        reloadedTable.coreOptions().partitionDefaultName())
+      val p1Splits = reloadedTable.newSnapshotReader
+        .withPartitionFilter(p1Predicate)
+        .read
+        .dataSplits
+        .asScala
+        .toList
+      p1Splits.foreach(split => 
Assertions.assertThat(split.bucket()).isLessThan(4))
+
+      // Rescale multiple partition fields
+      val snapshotBeforeTest2 = lastSnapshotId(reloadedTable)
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, partitions => 
'dt=\"2024-01-01\",hh=0')"),
+        Row(true) :: Nil)
+
+      val reloadedTable2 = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable2)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable2)).isGreaterThan(snapshotBeforeTest2)
+
+      // Rescale empty partition (should not create new snapshot)
+      val snapshotBeforeEmpty = lastSnapshotId(reloadedTable2)
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p3\"')"),
+        Row(true) :: Nil)
+      
Assertions.assertThat(lastSnapshotId(loadTable("T"))).isEqualTo(snapshotBeforeEmpty)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+    }
+  }
+
+  test("Paimon Procedure: rescale error cases") {
+    // Table with no snapshot
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T1', bucket_num => 4)")
+      }.getMessage.contains("has no snapshot"))
+    }
+
+    // Postpone bucket table requires bucket_num
+    withTable("T2") {
+      spark.sql(s"""
+                   |CREATE TABLE T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='-2')
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T2 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      assert(
+        intercept[IllegalArgumentException] {
+          spark.sql("CALL sys.rescale(table => 'T2')")
+        }.getMessage.contains(
+          "When rescaling postpone bucket tables, you must provide the 
resulting bucket number"))
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T2', bucket_num => 
4)"), Row(true) :: Nil)
+    }
+
+    // partitions and where cannot be used together
+    withTable("T3") {
+      spark.sql(s"""
+                   |CREATE TABLE T3 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T3 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql(
+          "CALL sys.rescale(table => 'T3', bucket_num => 4, partitions => 
'pt=\"p1\"', where => 'pt = \"p1\"')")
+      }.getMessage.contains("partitions and where cannot be used together"))
+    }
+
+    // where clause with non-partition column should fail
+    withTable("T4") {
+      spark.sql(s"""
+                   |CREATE TABLE T4 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T4 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T4', bucket_num => 4, where => 
'id = 1')")
+      }.getMessage.contains("Only partition predicate is supported"))
+    }
+  }
+
+  test("Paimon Procedure: rescale bucket count changes") {
+    // Increase bucket count
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      loadTable("T1")
+      spark.sql(s"INSERT INTO T1 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 
'd'), (5, 'e')")
+      spark.sql("ALTER TABLE T1 SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T1', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T1"))).isEqualTo(4)
+    }
+
+    // Decrease bucket count
+    withTable("T2") {
+      spark.sql(s"""
+                   |CREATE TABLE T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='4')
+                   |""".stripMargin)
+      loadTable("T2")

Review Comment:
   The call to `loadTable("T2")` on this line appears to be unused, as the 
table is loaded again on line 220 when calling `loadTable("T2")`. Consider 
removing this redundant call or assigning it to a variable if it's needed for 
initialization purposes.
   ```suggestion
   
   ```



##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.commands.PaimonSparkWriter;
+import org.apache.paimon.spark.util.ScanPlanHelper$;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.PaimonUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Rescale procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.rescale(table => 'databaseName.tableName', [bucket_num => 16], 
[partitions => 'dt=20250217,hh=08;dt=20250218,hh=08'], [where => 'dt>20250217'])
+ * </code></pre>
+ */
+public class RescaleProcedure extends BaseProcedure {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RescaleProcedure.class);
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                ProcedureParameter.optional("bucket_num", IntegerType),
+                ProcedureParameter.optional("partitions", StringType),
+                ProcedureParameter.optional("where", StringType),
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected RescaleProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        Integer bucketNum = args.isNullAt(1) ? null : args.getInt(1);
+        String partitions = blank(args, 2) ? null : args.getString(2);
+        String where = blank(args, 3) ? null : args.getString(3);
+
+        checkArgument(
+                partitions == null || where == null,
+                "partitions and where cannot be used together.");
+        String finalWhere = partitions != null ? 
SparkProcedureUtils.toWhere(partitions) : where;
+
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    checkArgument(table instanceof FileStoreTable);
+                    FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+                    Optional<Snapshot> optionalSnapshot = 
fileStoreTable.latestSnapshot();
+                    if (!optionalSnapshot.isPresent()) {
+                        throw new IllegalArgumentException(
+                                "Table "
+                                        + table.fullName()
+                                        + " has no snapshot. No need to 
rescale.");

Review Comment:
   The error message says "has no snapshot. No need to rescale." but this is 
formatted as two sentences. Consider making it more consistent with standard 
error message format, e.g., "has no snapshot, no need to rescale" or "has no 
snapshot and does not need rescaling".
   ```suggestion
                                           + " has no snapshot, no need to 
rescale.");
   ```



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util
+import java.util.{Arrays, Collections, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+/** Test rescale procedure. See [[RescaleProcedure]]. */
+class RescaleProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon Procedure: rescale basic functionality") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), 
(5, 'e')")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+      Assertions.assertThat(getBucketCount(reloadedTable)).isEqualTo(4)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)

Review Comment:
   Consider using `Arrays.asList()` directly instead of creating a new 
`ArrayList` and adding elements one by one. This can be simplified to: 
`Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData:
 _*))` similar to line 301.
   ```suggestion
         
Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData:
 _*))
   ```



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util
+import java.util.{Arrays, Collections, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+/** Test rescale procedure. See [[RescaleProcedure]]. */
+class RescaleProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon Procedure: rescale basic functionality") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), 
(5, 'e')")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+      Assertions.assertThat(getBucketCount(reloadedTable)).isEqualTo(4)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+
+      // Rescale without bucket_num (use current bucket)
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '3')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T')"), Row(true) :: 
Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(3)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(4)
+    }
+  }
+
+  test("Paimon Procedure: rescale partitioned tables") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, pt STRING, dt 
STRING, hh INT)
+                   |TBLPROPERTIES ('primary-key'='id, pt, dt, hh', 
'bucket'='2')
+                   |PARTITIONED BY (pt, dt, hh)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(
+        s"INSERT INTO T VALUES (1, 'a', 'p1', '2024-01-01', 0), (2, 'b', 'p1', 
'2024-01-01', 0)")
+      spark.sql(
+        s"INSERT INTO T VALUES (3, 'c', 'p2', '2024-01-01', 1), (4, 'd', 'p2', 
'2024-01-02', 0)")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale single partition field
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p1\"')"),
+        Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+
+      val p1Predicate = PartitionPredicate.fromMap(
+        reloadedTable.schema().logicalPartitionType(),
+        Collections.singletonMap("pt", "p1"),
+        reloadedTable.coreOptions().partitionDefaultName())
+      val p1Splits = reloadedTable.newSnapshotReader
+        .withPartitionFilter(p1Predicate)
+        .read
+        .dataSplits
+        .asScala
+        .toList
+      p1Splits.foreach(split => 
Assertions.assertThat(split.bucket()).isLessThan(4))
+
+      // Rescale multiple partition fields
+      val snapshotBeforeTest2 = lastSnapshotId(reloadedTable)
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, partitions => 
'dt=\"2024-01-01\",hh=0')"),
+        Row(true) :: Nil)
+
+      val reloadedTable2 = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable2)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable2)).isGreaterThan(snapshotBeforeTest2)
+
+      // Rescale empty partition (should not create new snapshot)
+      val snapshotBeforeEmpty = lastSnapshotId(reloadedTable2)
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p3\"')"),
+        Row(true) :: Nil)
+      
Assertions.assertThat(lastSnapshotId(loadTable("T"))).isEqualTo(snapshotBeforeEmpty)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+    }
+  }
+
+  test("Paimon Procedure: rescale error cases") {
+    // Table with no snapshot
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T1', bucket_num => 4)")
+      }.getMessage.contains("has no snapshot"))
+    }
+
+    // Postpone bucket table requires bucket_num
+    withTable("T2") {
+      spark.sql(s"""
+                   |CREATE TABLE T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='-2')
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T2 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      assert(
+        intercept[IllegalArgumentException] {
+          spark.sql("CALL sys.rescale(table => 'T2')")
+        }.getMessage.contains(
+          "When rescaling postpone bucket tables, you must provide the 
resulting bucket number"))
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T2', bucket_num => 
4)"), Row(true) :: Nil)
+    }
+
+    // partitions and where cannot be used together
+    withTable("T3") {
+      spark.sql(s"""
+                   |CREATE TABLE T3 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T3 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql(
+          "CALL sys.rescale(table => 'T3', bucket_num => 4, partitions => 
'pt=\"p1\"', where => 'pt = \"p1\"')")
+      }.getMessage.contains("partitions and where cannot be used together"))
+    }
+
+    // where clause with non-partition column should fail
+    withTable("T4") {
+      spark.sql(s"""
+                   |CREATE TABLE T4 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T4 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T4', bucket_num => 4, where => 
'id = 1')")
+      }.getMessage.contains("Only partition predicate is supported"))
+    }
+  }
+
+  test("Paimon Procedure: rescale bucket count changes") {
+    // Increase bucket count
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      loadTable("T1")
+      spark.sql(s"INSERT INTO T1 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 
'd'), (5, 'e')")
+      spark.sql("ALTER TABLE T1 SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T1', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T1"))).isEqualTo(4)
+    }
+
+    // Decrease bucket count
+    withTable("T2") {
+      spark.sql(s"""
+                   |CREATE TABLE T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='4')
+                   |""".stripMargin)
+      loadTable("T2")
+      spark.sql(
+        s"INSERT INTO T2 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 
'e'), (6, 'f'), (7, 'g'), (8, 'h')")
+      spark.sql("ALTER TABLE T2 SET TBLPROPERTIES ('bucket' = '2')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T2', bucket_num => 
2)"), Row(true) :: Nil)
+      val reloadedTable2 = loadTable("T2")
+      Assertions.assertThat(getBucketCount(reloadedTable2)).isEqualTo(2)
+      reloadedTable2.newSnapshotReader.read.dataSplits.asScala.toList.foreach(
+        split => Assertions.assertThat(split.bucket()).isLessThan(2))
+    }
+  }
+
+  test("Paimon Procedure: rescale with where clause") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+                   |TBLPROPERTIES ('primary-key'='id, dt, hh', 'bucket'='2')
+                   |PARTITIONED BY (dt, hh)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a', '2024-01-01', 0), (2, 'b', 
'2024-01-01', 0)")
+      spark.sql(s"INSERT INTO T VALUES (3, 'c', '2024-01-01', 1), (4, 'd', 
'2024-01-01', 1)")
+      spark.sql(s"INSERT INTO T VALUES (5, 'e', '2024-01-02', 0), (6, 'f', 
'2024-01-02', 1)")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Test 1: Rescale with where clause using single partition column
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, where => 'dt = 
\"2024-01-01\"')"),
+        Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+
+      // Test 2: Rescale with where clause using multiple partition columns
+      val snapshotBeforeTest2 = lastSnapshotId(reloadedTable)
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, where => 'dt = 
\"2024-01-01\" AND hh >= 1')"),
+        Row(true) :: Nil)
+
+      val reloadedTable2 = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable2)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable2)).isGreaterThan(snapshotBeforeTest2)
+
+      // Verify data integrity
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)

Review Comment:
   Consider using `Arrays.asList()` directly instead of creating a new 
`ArrayList` and adding elements one by one. This can be simplified to: 
`Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData:
 _*))` similar to line 301.
   ```suggestion
         
Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData:
 _*))
   ```



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util
+import java.util.{Arrays, Collections, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+/** Test rescale procedure. See [[RescaleProcedure]]. */
+class RescaleProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon Procedure: rescale basic functionality") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), 
(5, 'e')")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+      Assertions.assertThat(getBucketCount(reloadedTable)).isEqualTo(4)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+
+      // Rescale without bucket_num (use current bucket)
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '3')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T')"), Row(true) :: 
Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(3)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(4)
+    }
+  }
+
+  test("Paimon Procedure: rescale partitioned tables") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, pt STRING, dt 
STRING, hh INT)
+                   |TBLPROPERTIES ('primary-key'='id, pt, dt, hh', 
'bucket'='2')
+                   |PARTITIONED BY (pt, dt, hh)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(
+        s"INSERT INTO T VALUES (1, 'a', 'p1', '2024-01-01', 0), (2, 'b', 'p1', 
'2024-01-01', 0)")
+      spark.sql(
+        s"INSERT INTO T VALUES (3, 'c', 'p2', '2024-01-01', 1), (4, 'd', 'p2', 
'2024-01-02', 0)")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale single partition field
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p1\"')"),
+        Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+
+      val p1Predicate = PartitionPredicate.fromMap(
+        reloadedTable.schema().logicalPartitionType(),
+        Collections.singletonMap("pt", "p1"),
+        reloadedTable.coreOptions().partitionDefaultName())
+      val p1Splits = reloadedTable.newSnapshotReader
+        .withPartitionFilter(p1Predicate)
+        .read
+        .dataSplits
+        .asScala
+        .toList
+      p1Splits.foreach(split => 
Assertions.assertThat(split.bucket()).isLessThan(4))
+
+      // Rescale multiple partition fields
+      val snapshotBeforeTest2 = lastSnapshotId(reloadedTable)
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, partitions => 
'dt=\"2024-01-01\",hh=0')"),
+        Row(true) :: Nil)
+
+      val reloadedTable2 = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable2)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable2)).isGreaterThan(snapshotBeforeTest2)
+
+      // Rescale empty partition (should not create new snapshot)
+      val snapshotBeforeEmpty = lastSnapshotId(reloadedTable2)
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p3\"')"),
+        Row(true) :: Nil)
+      
Assertions.assertThat(lastSnapshotId(loadTable("T"))).isEqualTo(snapshotBeforeEmpty)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+    }
+  }
+
+  test("Paimon Procedure: rescale error cases") {
+    // Table with no snapshot
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T1', bucket_num => 4)")
+      }.getMessage.contains("has no snapshot"))
+    }
+
+    // Postpone bucket table requires bucket_num
+    withTable("T2") {
+      spark.sql(s"""
+                   |CREATE TABLE T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='-2')
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T2 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      assert(
+        intercept[IllegalArgumentException] {
+          spark.sql("CALL sys.rescale(table => 'T2')")
+        }.getMessage.contains(
+          "When rescaling postpone bucket tables, you must provide the 
resulting bucket number"))
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T2', bucket_num => 
4)"), Row(true) :: Nil)
+    }
+
+    // partitions and where cannot be used together
+    withTable("T3") {
+      spark.sql(s"""
+                   |CREATE TABLE T3 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T3 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql(
+          "CALL sys.rescale(table => 'T3', bucket_num => 4, partitions => 
'pt=\"p1\"', where => 'pt = \"p1\"')")
+      }.getMessage.contains("partitions and where cannot be used together"))
+    }
+
+    // where clause with non-partition column should fail
+    withTable("T4") {
+      spark.sql(s"""
+                   |CREATE TABLE T4 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T4 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T4', bucket_num => 4, where => 
'id = 1')")
+      }.getMessage.contains("Only partition predicate is supported"))
+    }
+  }
+
+  test("Paimon Procedure: rescale bucket count changes") {
+    // Increase bucket count
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      loadTable("T1")
+      spark.sql(s"INSERT INTO T1 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 
'd'), (5, 'e')")
+      spark.sql("ALTER TABLE T1 SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T1', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T1"))).isEqualTo(4)
+    }
+
+    // Decrease bucket count
+    withTable("T2") {
+      spark.sql(s"""
+                   |CREATE TABLE T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='4')
+                   |""".stripMargin)
+      loadTable("T2")
+      spark.sql(
+        s"INSERT INTO T2 VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 
'e'), (6, 'f'), (7, 'g'), (8, 'h')")
+      spark.sql("ALTER TABLE T2 SET TBLPROPERTIES ('bucket' = '2')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T2', bucket_num => 
2)"), Row(true) :: Nil)
+      val reloadedTable2 = loadTable("T2")
+      Assertions.assertThat(getBucketCount(reloadedTable2)).isEqualTo(2)
+      reloadedTable2.newSnapshotReader.read.dataSplits.asScala.toList.foreach(
+        split => Assertions.assertThat(split.bucket()).isLessThan(2))
+    }
+  }
+
+  test("Paimon Procedure: rescale with where clause") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+                   |TBLPROPERTIES ('primary-key'='id, dt, hh', 'bucket'='2')
+                   |PARTITIONED BY (dt, hh)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a', '2024-01-01', 0), (2, 'b', 
'2024-01-01', 0)")
+      spark.sql(s"INSERT INTO T VALUES (3, 'c', '2024-01-01', 1), (4, 'd', 
'2024-01-01', 1)")
+      spark.sql(s"INSERT INTO T VALUES (5, 'e', '2024-01-02', 0), (6, 'f', 
'2024-01-02', 1)")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Test 1: Rescale with where clause using single partition column
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, where => 'dt = 
\"2024-01-01\"')"),
+        Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+
+      // Test 2: Rescale with where clause using multiple partition columns
+      val snapshotBeforeTest2 = lastSnapshotId(reloadedTable)
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, where => 'dt = 
\"2024-01-01\" AND hh >= 1')"),
+        Row(true) :: Nil)
+
+      val reloadedTable2 = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable2)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable2)).isGreaterThan(snapshotBeforeTest2)
+
+      // Verify data integrity
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+    }
+  }
+
+  test("Paimon Procedure: rescale with ALTER TABLE and write validation") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (f0 INT)
+                   |TBLPROPERTIES ('bucket'='2', 'bucket-key'='f0')
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+
+      spark.sql(s"INSERT INTO T VALUES (1), (2), (3), (4), (5)")
+
+      val snapshot = lastSnapshotId(table)
+      Assertions.assertThat(snapshot).isGreaterThanOrEqualTo(0)
+
+      val initialBuckets = getBucketCount(table)
+      Assertions.assertThat(initialBuckets).isEqualTo(2)
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY f0").collect()
+      Assertions.assertThat(initialData.length).isEqualTo(5)
+
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+
+      val reloadedTable = loadTable("T")
+      val newBuckets = getBucketCount(reloadedTable)
+      Assertions.assertThat(newBuckets).isEqualTo(4)
+
+      val afterAlterData = spark.sql("SELECT * FROM T ORDER BY f0").collect()
+      val initialDataList = Arrays.asList(initialData: _*)
+      
Assertions.assertThat(afterAlterData).containsExactlyElementsOf(initialDataList)
+
+      val writeError = intercept[org.apache.spark.SparkException] {
+        spark.sql("INSERT INTO T VALUES (6)")
+      }
+      val errorMessage = writeError.getMessage
+      val cause = writeError.getCause
+      val causeMessage = if (cause != null && cause.getMessage != null) 
cause.getMessage else ""
+      val expectedMessage =
+        "Try to write table with a new bucket num 4, but the previous bucket 
num is 2"
+      val fullMessage = errorMessage + " " + causeMessage

Review Comment:
   The error message handling constructs a full message by concatenating 
`errorMessage`, a space, and `causeMessage`. This could result in malformed 
messages if either is null or if they don't naturally flow together. Consider 
using a more robust approach like checking for null/empty and using appropriate 
delimiters.
   ```suggestion
         val errorMessage = Option(writeError.getMessage).getOrElse("")
         val cause = writeError.getCause
         val causeMessage = Option(cause).flatMap(c => 
Option(c.getMessage)).getOrElse("")
         val expectedMessage =
           "Try to write table with a new bucket num 4, but the previous bucket 
num is 2"
         val fullMessage = (errorMessage, causeMessage) match {
           case (em, cm) if em.nonEmpty && cm.nonEmpty => em + ": " + cm
           case (em, _) if em.nonEmpty => em
           case (_, cm) if cm.nonEmpty => cm
           case _ => ""
         }
   ```



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions
+
+import java.util
+import java.util.{Arrays, Collections, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+/** Test rescale procedure. See [[RescaleProcedure]]. */
+class RescaleProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon Procedure: rescale basic functionality") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), 
(5, 'e')")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+      Assertions.assertThat(getBucketCount(reloadedTable)).isEqualTo(4)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+
+      // Rescale without bucket_num (use current bucket)
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '3')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T')"), Row(true) :: 
Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(3)
+
+      // Rescale with explicit bucket_num
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 
4)"), Row(true) :: Nil)
+      Assertions.assertThat(getBucketCount(loadTable("T"))).isEqualTo(4)
+    }
+  }
+
+  test("Paimon Procedure: rescale partitioned tables") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, pt STRING, dt 
STRING, hh INT)
+                   |TBLPROPERTIES ('primary-key'='id, pt, dt, hh', 
'bucket'='2')
+                   |PARTITIONED BY (pt, dt, hh)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+      spark.sql(
+        s"INSERT INTO T VALUES (1, 'a', 'p1', '2024-01-01', 0), (2, 'b', 'p1', 
'2024-01-01', 0)")
+      spark.sql(
+        s"INSERT INTO T VALUES (3, 'c', 'p2', '2024-01-01', 1), (4, 'd', 'p2', 
'2024-01-02', 0)")
+
+      val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialSnapshotId = lastSnapshotId(table)
+
+      // Rescale single partition field
+      spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '4')")
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p1\"')"),
+        Row(true) :: Nil)
+
+      val reloadedTable = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId)
+
+      val p1Predicate = PartitionPredicate.fromMap(
+        reloadedTable.schema().logicalPartitionType(),
+        Collections.singletonMap("pt", "p1"),
+        reloadedTable.coreOptions().partitionDefaultName())
+      val p1Splits = reloadedTable.newSnapshotReader
+        .withPartitionFilter(p1Predicate)
+        .read
+        .dataSplits
+        .asScala
+        .toList
+      p1Splits.foreach(split => 
Assertions.assertThat(split.bucket()).isLessThan(4))
+
+      // Rescale multiple partition fields
+      val snapshotBeforeTest2 = lastSnapshotId(reloadedTable)
+      checkAnswer(
+        spark.sql(
+          "CALL sys.rescale(table => 'T', bucket_num => 4, partitions => 
'dt=\"2024-01-01\",hh=0')"),
+        Row(true) :: Nil)
+
+      val reloadedTable2 = loadTable("T")
+      
Assertions.assertThat(lastSnapshotCommand(reloadedTable2)).isEqualTo(CommitKind.OVERWRITE)
+      
Assertions.assertThat(lastSnapshotId(reloadedTable2)).isGreaterThan(snapshotBeforeTest2)
+
+      // Rescale empty partition (should not create new snapshot)
+      val snapshotBeforeEmpty = lastSnapshotId(reloadedTable2)
+      checkAnswer(
+        spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partitions 
=> 'pt=\"p3\"')"),
+        Row(true) :: Nil)
+      
Assertions.assertThat(lastSnapshotId(loadTable("T"))).isEqualTo(snapshotBeforeEmpty)
+
+      val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect()
+      val initialDataList = new util.ArrayList[Row]()
+      initialData.foreach(initialDataList.add)
+      
Assertions.assertThat(afterData).containsExactlyElementsOf(initialDataList)
+    }
+  }
+
+  test("Paimon Procedure: rescale error cases") {
+    // Table with no snapshot
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T1', bucket_num => 4)")
+      }.getMessage.contains("has no snapshot"))
+    }
+
+    // Postpone bucket table requires bucket_num
+    withTable("T2") {
+      spark.sql(s"""
+                   |CREATE TABLE T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='-2')
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T2 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      assert(
+        intercept[IllegalArgumentException] {
+          spark.sql("CALL sys.rescale(table => 'T2')")
+        }.getMessage.contains(
+          "When rescaling postpone bucket tables, you must provide the 
resulting bucket number"))
+      checkAnswer(spark.sql("CALL sys.rescale(table => 'T2', bucket_num => 
4)"), Row(true) :: Nil)
+    }
+
+    // partitions and where cannot be used together
+    withTable("T3") {
+      spark.sql(s"""
+                   |CREATE TABLE T3 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T3 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql(
+          "CALL sys.rescale(table => 'T3', bucket_num => 4, partitions => 
'pt=\"p1\"', where => 'pt = \"p1\"')")
+      }.getMessage.contains("partitions and where cannot be used together"))
+    }
+
+    // where clause with non-partition column should fail
+    withTable("T4") {
+      spark.sql(s"""
+                   |CREATE TABLE T4 (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T4 VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      assert(intercept[IllegalArgumentException] {
+        spark.sql("CALL sys.rescale(table => 'T4', bucket_num => 4, where => 
'id = 1')")
+      }.getMessage.contains("Only partition predicate is supported"))
+    }
+  }
+
+  test("Paimon Procedure: rescale bucket count changes") {
+    // Increase bucket count
+    withTable("T1") {
+      spark.sql(s"""
+                   |CREATE TABLE T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='2')
+                   |""".stripMargin)
+      loadTable("T1")

Review Comment:
   The call to `loadTable("T1")` on this line appears to be unused, as the 
table is loaded again on line 206 when calling 
`getBucketCount(loadTable("T1"))`. Consider removing this redundant call or 
assigning it to a variable if it's needed for initialization purposes.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to