This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c1f7a11df Change the ListRebalanceProcessProcedure result from 
String[] to row[] (#2341)
c1f7a11df is described below

commit c1f7a11df03cacc094420d87c8999b98df40ff36
Author: Pei Yu <[email protected]>
AuthorDate: Fri Jan 16 11:07:35 2026 +0800

    Change the ListRebalanceProcessProcedure result from String[] to row[] 
(#2341)
    
    Signed-off-by: Pei Yu <[email protected]>
---
 .../fluss/cluster/rebalance/RebalanceProgress.java | 10 +++
 .../rebalance/RebalanceProgressJsonSerializer.java | 95 ++++++++++++++++++++++
 .../RebalanceProgressJsonSerializerTest.java       | 76 +++++++++++++++++
 .../procedure/ListRebalanceProcessProcedure.java   | 60 +++++---------
 .../flink/procedure/FlinkProcedureITCase.java      | 19 ++---
 5 files changed, 210 insertions(+), 50 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
 
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
index ae277d14a..e203159e1 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
@@ -19,6 +19,7 @@ package org.apache.fluss.cluster.rebalance;
 
 import org.apache.fluss.metadata.TableBucket;
 
+import java.text.NumberFormat;
 import java.util.Map;
 
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -72,4 +73,13 @@ public class RebalanceProgress {
     public Map<TableBucket, RebalanceResultForBucket> progressForBucketMap() {
         return progressForBucketMap;
     }
+
+    public String formatAsPercentage() {
+        if (progress < 0) {
+            return "NONE";
+        }
+        NumberFormat pctFormat = NumberFormat.getPercentInstance();
+        pctFormat.setMaximumFractionDigits(2);
+        return pctFormat.format(progress);
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializer.java
 
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializer.java
new file mode 100644
index 000000000..448b42f4d
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.metadata.TableBucket;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** Json serializer for {@link RebalanceProgress}. */
+public class RebalanceProgressJsonSerializer implements 
JsonSerializer<RebalanceProgress> {
+
+    public static final RebalanceProgressJsonSerializer INSTANCE =
+            new RebalanceProgressJsonSerializer();
+
+    private static final String REBALANCE_ID = "rebalance_id";
+    private static final String REBALANCE_STATUS = "rebalance_status";
+    private static final String PROGRESS = "progress";
+    private static final String PROGRESS_FOR_BUCKETS = "progress_for_buckets";
+
+    private static final String TABLE_ID = "table_id";
+    private static final String PARTITION_ID = "partition_id";
+    private static final String BUCKET_ID = "bucket_id";
+    private static final String ORIGINAL_LEADER = "original_leader";
+    private static final String NEW_LEADER = "new_leader";
+    private static final String ORIGIN_REPLICAS = "origin_replicas";
+    private static final String NEW_REPLICAS = "new_replicas";
+
+    @Override
+    public void serialize(RebalanceProgress rebalanceProgress, JsonGenerator 
generator)
+            throws IOException {
+        generator.writeStartObject();
+
+        generator.writeStringField(REBALANCE_ID, 
rebalanceProgress.rebalanceId());
+        generator.writeNumberField(REBALANCE_STATUS, 
rebalanceProgress.status().getCode());
+        generator.writeStringField(PROGRESS, 
rebalanceProgress.formatAsPercentage());
+
+        Map<TableBucket, RebalanceResultForBucket> resultForBucketMap =
+                rebalanceProgress.progressForBucketMap();
+
+        // RebalanceProgress.progressForBucketMap
+        generator.writeArrayFieldStart(PROGRESS_FOR_BUCKETS);
+        for (RebalanceResultForBucket rebalanceResultForBucket : 
resultForBucketMap.values()) {
+            TableBucket tableBucket = rebalanceResultForBucket.tableBucket();
+            RebalancePlanForBucket plan = rebalanceResultForBucket.plan();
+
+            // RebalanceResultForBucket.plan
+            generator.writeStartObject();
+            generator.writeNumberField(TABLE_ID, tableBucket.getTableId());
+            generator.writeNumberField(BUCKET_ID, tableBucket.getBucket());
+            Long partitionId = tableBucket.getPartitionId();
+            if (null != partitionId) {
+                generator.writeNumberField(PARTITION_ID, partitionId);
+            }
+            generator.writeNumberField(ORIGINAL_LEADER, 
plan.getOriginalLeader());
+            generator.writeNumberField(NEW_LEADER, plan.getNewLeader());
+            generator.writeArrayFieldStart(ORIGIN_REPLICAS);
+            for (Integer replica : plan.getOriginReplicas()) {
+                generator.writeNumber(replica);
+            }
+            generator.writeEndArray();
+            generator.writeArrayFieldStart(NEW_REPLICAS);
+            for (Integer replica : plan.getNewReplicas()) {
+                generator.writeNumber(replica);
+            }
+            generator.writeEndArray();
+
+            // RebalanceResultForBucket.rebalanceStatus
+            generator.writeNumberField(
+                    REBALANCE_STATUS, 
rebalanceResultForBucket.status().getCode());
+
+            generator.writeEndObject();
+        }
+        generator.writeEndArray();
+
+        generator.writeEndObject();
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializerTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializerTest.java
new file mode 100644
index 000000000..4cf48303c
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RebalanceProgressJsonSerializer}. */
+public class RebalanceProgressJsonSerializerTest {
+
+    @Test
+    public void testSerializer() {
+        String serialize =
+                new String(
+                        JsonSerdeUtils.writeValueAsBytes(
+                                createProgressObj(), 
RebalanceProgressJsonSerializer.INSTANCE),
+                        StandardCharsets.UTF_8);
+        assertThat(serialize).isEqualTo(createProgressJson());
+    }
+
+    private RebalanceProgress createProgressObj() {
+        Map<TableBucket, RebalanceResultForBucket> progressForBucketMap = new 
HashMap<>();
+        progressForBucketMap.put(
+                new TableBucket(0L, 0),
+                RebalanceResultForBucket.of(
+                        new RebalancePlanForBucket(
+                                new TableBucket(0L, 0),
+                                0,
+                                3,
+                                Arrays.asList(0, 1, 2),
+                                Arrays.asList(3, 4, 5)),
+                        RebalanceStatus.COMPLETED));
+        progressForBucketMap.put(
+                new TableBucket(1L, 0L, 0),
+                RebalanceResultForBucket.of(
+                        new RebalancePlanForBucket(
+                                new TableBucket(1L, 0L, 0),
+                                0,
+                                3,
+                                Arrays.asList(0, 1, 2),
+                                Arrays.asList(3, 4, 5)),
+                        RebalanceStatus.COMPLETED));
+        return new RebalanceProgress(
+                "rebalance-task-21jd", RebalanceStatus.COMPLETED, 1d, 
progressForBucketMap);
+    }
+
+    private String createProgressJson() {
+        return 
"{\"rebalance_id\":\"rebalance-task-21jd\",\"rebalance_status\":3,\"progress\":\"100%\",\"progress_for_buckets\":"
+                + 
"[{\"table_id\":1,\"bucket_id\":0,\"partition_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":3},"
+                + 
"{\"table_id\":0,\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":3}]}";
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
index bc0bf99f7..9ebb227b7 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
@@ -19,21 +19,18 @@ package org.apache.fluss.flink.procedure;
 
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.cluster.rebalance.RebalanceProgress;
-import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
-import org.apache.fluss.cluster.rebalance.RebalanceStatus;
-import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceProgressJsonSerializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
 /**
@@ -60,41 +57,26 @@ public class ListRebalanceProcessProcedure extends 
ProcedureBase {
                         name = "rebalanceId",
                         type = @DataTypeHint("STRING"),
                         isOptional = true)
-            })
-    public String[] call(ProcedureContext context, @Nullable String 
rebalanceId) throws Exception {
+            },
+            output =
+                    @DataTypeHint(
+                            "ROW<rebalance_id STRING, rebalance_status STRING, 
rebalance_progress STRING, rebalance_plan STRING>"))
+    public Row[] call(ProcedureContext context, @Nullable String rebalanceId) 
throws Exception {
         Optional<RebalanceProgress> progressOpt = 
admin.listRebalanceProgress(rebalanceId).get();
 
         if (!progressOpt.isPresent()) {
-            return new String[0];
+            return new Row[0];
         }
-
-        return progressToString(progressOpt.get());
-    }
-
-    private static String[] progressToString(RebalanceProgress progress) {
-        RebalanceStatus status = progress.status();
-        double rebalanceProgress = progress.progress();
-        Map<TableBucket, RebalanceResultForBucket> bucketMap = 
progress.progressForBucketMap();
-
-        // TODO format the result into a row type, and the detail progress for 
bucket show in json
-        // format. Trace by: https://github.com/apache/fluss/issues/2325
-        List<String> result = new ArrayList<>();
-        result.add("Rebalance id: " + progress.rebalanceId());
-        result.add("Reblance total status: " + status);
-        result.add("Rebalance progress: " + 
formatAsPercentage(rebalanceProgress));
-        result.add("Rebalance detail progress for bucket:");
-        for (RebalanceResultForBucket resultForBucket : bucketMap.values()) {
-            result.add(resultForBucket.toString());
-        }
-        return result.toArray(new String[0]);
-    }
-
-    public static String formatAsPercentage(double value) {
-        if (value < 0) {
-            return "NONE";
-        }
-        NumberFormat pctFormat = NumberFormat.getPercentInstance();
-        pctFormat.setMaximumFractionDigits(2);
-        return pctFormat.format(value);
+        RebalanceProgress progress = progressOpt.get();
+        return new Row[] {
+            Row.of(
+                    progress.rebalanceId(),
+                    progress.status(),
+                    progress.formatAsPercentage(),
+                    new String(
+                            JsonSerdeUtils.writeValueAsBytes(
+                                    progress, 
RebalanceProgressJsonSerializer.INSTANCE),
+                            StandardCharsets.UTF_8))
+        };
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index cc15d5ccd..c1231c6e9 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -21,6 +21,7 @@ import org.apache.fluss.client.Connection;
 import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.cluster.rebalance.RebalanceProgress;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
 import org.apache.fluss.cluster.rebalance.ServerTag;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
@@ -767,17 +768,13 @@ public abstract class FlinkProcedureITCase {
                                                     "Call 
%s.sys.list_rebalance('%s')",
                                                     CATALOG_NAME, 
progress.rebalanceId()))
                                     .collect()) {
-                        List<String> listProgressResult =
-                                CollectionUtil.iteratorToList(rows).stream()
-                                        .map(Row::toString)
-                                        .collect(Collectors.toList());
-                        
assertThat(listProgressResult.get(0)).startsWith("+I[Rebalance id:");
-                        assertThat(listProgressResult.get(1))
-                                .isEqualTo("+I[Reblance total status: 
COMPLETED]");
-                        assertThat(listProgressResult.get(2))
-                                .isEqualTo("+I[Rebalance progress: 100%]");
-                        assertThat(listProgressResult.get(3))
-                                .isEqualTo("+I[Rebalance detail progress for 
bucket:]");
+                        List<Row> listProgressResult = 
CollectionUtil.iteratorToList(rows);
+                        Row row = listProgressResult.get(0);
+                        assertThat(row.getArity()).isEqualTo(4);
+                        
assertThat(row.getField(0)).isEqualTo(progress.rebalanceId());
+                        
assertThat(row.getField(1)).isEqualTo(RebalanceStatus.COMPLETED);
+                        assertThat((String) row.getField(2)).endsWith("%");
+                        assertThat((String) 
row.getField(3)).startsWith("{\"rebalance_id\":");
                     }
                 });
     }

Reply via email to