This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c1f7a11df Change the ListRebalanceProcessProcedure result from
String[] to row[] (#2341)
c1f7a11df is described below
commit c1f7a11df03cacc094420d87c8999b98df40ff36
Author: Pei Yu <[email protected]>
AuthorDate: Fri Jan 16 11:07:35 2026 +0800
Change the ListRebalanceProcessProcedure result from String[] to row[]
(#2341)
Signed-off-by: Pei Yu <[email protected]>
---
.../fluss/cluster/rebalance/RebalanceProgress.java | 10 +++
.../rebalance/RebalanceProgressJsonSerializer.java | 95 ++++++++++++++++++++++
.../RebalanceProgressJsonSerializerTest.java | 76 +++++++++++++++++
.../procedure/ListRebalanceProcessProcedure.java | 60 +++++---------
.../flink/procedure/FlinkProcedureITCase.java | 19 ++---
5 files changed, 210 insertions(+), 50 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
index ae277d14a..e203159e1 100644
---
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
+++
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java
@@ -19,6 +19,7 @@ package org.apache.fluss.cluster.rebalance;
import org.apache.fluss.metadata.TableBucket;
+import java.text.NumberFormat;
import java.util.Map;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -72,4 +73,13 @@ public class RebalanceProgress {
public Map<TableBucket, RebalanceResultForBucket> progressForBucketMap() {
return progressForBucketMap;
}
+
+ public String formatAsPercentage() {
+ if (progress < 0) {
+ return "NONE";
+ }
+ NumberFormat pctFormat = NumberFormat.getPercentInstance();
+ pctFormat.setMaximumFractionDigits(2);
+ return pctFormat.format(progress);
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializer.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializer.java
new file mode 100644
index 000000000..448b42f4d
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.metadata.TableBucket;
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** Json serializer for {@link RebalanceProgress}. */
+public class RebalanceProgressJsonSerializer implements
JsonSerializer<RebalanceProgress> {
+
+ public static final RebalanceProgressJsonSerializer INSTANCE =
+ new RebalanceProgressJsonSerializer();
+
+ private static final String REBALANCE_ID = "rebalance_id";
+ private static final String REBALANCE_STATUS = "rebalance_status";
+ private static final String PROGRESS = "progress";
+ private static final String PROGRESS_FOR_BUCKETS = "progress_for_buckets";
+
+ private static final String TABLE_ID = "table_id";
+ private static final String PARTITION_ID = "partition_id";
+ private static final String BUCKET_ID = "bucket_id";
+ private static final String ORIGINAL_LEADER = "original_leader";
+ private static final String NEW_LEADER = "new_leader";
+ private static final String ORIGIN_REPLICAS = "origin_replicas";
+ private static final String NEW_REPLICAS = "new_replicas";
+
+ @Override
+ public void serialize(RebalanceProgress rebalanceProgress, JsonGenerator
generator)
+ throws IOException {
+ generator.writeStartObject();
+
+ generator.writeStringField(REBALANCE_ID,
rebalanceProgress.rebalanceId());
+ generator.writeNumberField(REBALANCE_STATUS,
rebalanceProgress.status().getCode());
+ generator.writeStringField(PROGRESS,
rebalanceProgress.formatAsPercentage());
+
+ Map<TableBucket, RebalanceResultForBucket> resultForBucketMap =
+ rebalanceProgress.progressForBucketMap();
+
+ // RebalanceProgress.progressForBucketMap
+ generator.writeArrayFieldStart(PROGRESS_FOR_BUCKETS);
+ for (RebalanceResultForBucket rebalanceResultForBucket :
resultForBucketMap.values()) {
+ TableBucket tableBucket = rebalanceResultForBucket.tableBucket();
+ RebalancePlanForBucket plan = rebalanceResultForBucket.plan();
+
+ // RebalanceResultForBucket.plan
+ generator.writeStartObject();
+ generator.writeNumberField(TABLE_ID, tableBucket.getTableId());
+ generator.writeNumberField(BUCKET_ID, tableBucket.getBucket());
+ Long partitionId = tableBucket.getPartitionId();
+ if (null != partitionId) {
+ generator.writeNumberField(PARTITION_ID, partitionId);
+ }
+ generator.writeNumberField(ORIGINAL_LEADER,
plan.getOriginalLeader());
+ generator.writeNumberField(NEW_LEADER, plan.getNewLeader());
+ generator.writeArrayFieldStart(ORIGIN_REPLICAS);
+ for (Integer replica : plan.getOriginReplicas()) {
+ generator.writeNumber(replica);
+ }
+ generator.writeEndArray();
+ generator.writeArrayFieldStart(NEW_REPLICAS);
+ for (Integer replica : plan.getNewReplicas()) {
+ generator.writeNumber(replica);
+ }
+ generator.writeEndArray();
+
+ // RebalanceResultForBucket.rebalanceStatus
+ generator.writeNumberField(
+ REBALANCE_STATUS,
rebalanceResultForBucket.status().getCode());
+
+ generator.writeEndObject();
+ }
+ generator.writeEndArray();
+
+ generator.writeEndObject();
+ }
+}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializerTest.java
b/fluss-common/src/test/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializerTest.java
new file mode 100644
index 000000000..4cf48303c
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/cluster/rebalance/RebalanceProgressJsonSerializerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RebalanceProgressJsonSerializer}. */
+public class RebalanceProgressJsonSerializerTest {
+
+ @Test
+ public void testSerializer() {
+ String serialize =
+ new String(
+ JsonSerdeUtils.writeValueAsBytes(
+ createProgressObj(),
RebalanceProgressJsonSerializer.INSTANCE),
+ StandardCharsets.UTF_8);
+ assertThat(serialize).isEqualTo(createProgressJson());
+ }
+
+ private RebalanceProgress createProgressObj() {
+ Map<TableBucket, RebalanceResultForBucket> progressForBucketMap = new
HashMap<>();
+ progressForBucketMap.put(
+ new TableBucket(0L, 0),
+ RebalanceResultForBucket.of(
+ new RebalancePlanForBucket(
+ new TableBucket(0L, 0),
+ 0,
+ 3,
+ Arrays.asList(0, 1, 2),
+ Arrays.asList(3, 4, 5)),
+ RebalanceStatus.COMPLETED));
+ progressForBucketMap.put(
+ new TableBucket(1L, 0L, 0),
+ RebalanceResultForBucket.of(
+ new RebalancePlanForBucket(
+ new TableBucket(1L, 0L, 0),
+ 0,
+ 3,
+ Arrays.asList(0, 1, 2),
+ Arrays.asList(3, 4, 5)),
+ RebalanceStatus.COMPLETED));
+ return new RebalanceProgress(
+ "rebalance-task-21jd", RebalanceStatus.COMPLETED, 1d,
progressForBucketMap);
+ }
+
+ private String createProgressJson() {
+ return
"{\"rebalance_id\":\"rebalance-task-21jd\",\"rebalance_status\":3,\"progress\":\"100%\",\"progress_for_buckets\":"
+ +
"[{\"table_id\":1,\"bucket_id\":0,\"partition_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":3},"
+ +
"{\"table_id\":0,\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":3}]}";
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
index bc0bf99f7..9ebb227b7 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
@@ -19,21 +19,18 @@ package org.apache.fluss.flink.procedure;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
-import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
-import org.apache.fluss.cluster.rebalance.RebalanceStatus;
-import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceProgressJsonSerializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
import javax.annotation.Nullable;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.nio.charset.StandardCharsets;
import java.util.Optional;
/**
@@ -60,41 +57,26 @@ public class ListRebalanceProcessProcedure extends
ProcedureBase {
name = "rebalanceId",
type = @DataTypeHint("STRING"),
isOptional = true)
- })
- public String[] call(ProcedureContext context, @Nullable String
rebalanceId) throws Exception {
+ },
+ output =
+ @DataTypeHint(
+ "ROW<rebalance_id STRING, rebalance_status STRING,
rebalance_progress STRING, rebalance_plan STRING>"))
+ public Row[] call(ProcedureContext context, @Nullable String rebalanceId)
throws Exception {
Optional<RebalanceProgress> progressOpt =
admin.listRebalanceProgress(rebalanceId).get();
if (!progressOpt.isPresent()) {
- return new String[0];
+ return new Row[0];
}
-
- return progressToString(progressOpt.get());
- }
-
- private static String[] progressToString(RebalanceProgress progress) {
- RebalanceStatus status = progress.status();
- double rebalanceProgress = progress.progress();
- Map<TableBucket, RebalanceResultForBucket> bucketMap =
progress.progressForBucketMap();
-
- // TODO format the result into a row type, and the detail progress for
bucket show in json
- // format. Trace by: https://github.com/apache/fluss/issues/2325
- List<String> result = new ArrayList<>();
- result.add("Rebalance id: " + progress.rebalanceId());
- result.add("Reblance total status: " + status);
- result.add("Rebalance progress: " +
formatAsPercentage(rebalanceProgress));
- result.add("Rebalance detail progress for bucket:");
- for (RebalanceResultForBucket resultForBucket : bucketMap.values()) {
- result.add(resultForBucket.toString());
- }
- return result.toArray(new String[0]);
- }
-
- public static String formatAsPercentage(double value) {
- if (value < 0) {
- return "NONE";
- }
- NumberFormat pctFormat = NumberFormat.getPercentInstance();
- pctFormat.setMaximumFractionDigits(2);
- return pctFormat.format(value);
+ RebalanceProgress progress = progressOpt.get();
+ return new Row[] {
+ Row.of(
+ progress.rebalanceId(),
+ progress.status(),
+ progress.formatAsPercentage(),
+ new String(
+ JsonSerdeUtils.writeValueAsBytes(
+ progress,
RebalanceProgressJsonSerializer.INSTANCE),
+ StandardCharsets.UTF_8))
+ };
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index cc15d5ccd..c1231c6e9 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -21,6 +21,7 @@ import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
@@ -767,17 +768,13 @@ public abstract class FlinkProcedureITCase {
"Call
%s.sys.list_rebalance('%s')",
CATALOG_NAME,
progress.rebalanceId()))
.collect()) {
- List<String> listProgressResult =
- CollectionUtil.iteratorToList(rows).stream()
- .map(Row::toString)
- .collect(Collectors.toList());
-
assertThat(listProgressResult.get(0)).startsWith("+I[Rebalance id:");
- assertThat(listProgressResult.get(1))
- .isEqualTo("+I[Reblance total status:
COMPLETED]");
- assertThat(listProgressResult.get(2))
- .isEqualTo("+I[Rebalance progress: 100%]");
- assertThat(listProgressResult.get(3))
- .isEqualTo("+I[Rebalance detail progress for
bucket:]");
+ List<Row> listProgressResult =
CollectionUtil.iteratorToList(rows);
+ Row row = listProgressResult.get(0);
+ assertThat(row.getArity()).isEqualTo(4);
+
assertThat(row.getField(0)).isEqualTo(progress.rebalanceId());
+
assertThat(row.getField(1)).isEqualTo(RebalanceStatus.COMPLETED);
+ assertThat((String) row.getField(2)).endsWith("%");
+ assertThat((String)
row.getField(3)).startsWith("{\"rebalance_id\":");
}
});
}