This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 0fc00015a fix(go-client): Changed the validation logic for new
configuration during queryconfig operation (#2287)
0fc00015a is described below
commit 0fc00015a217da8a158b3cd1f1bafd9ab6631f7d
Author: nanorth <[email protected]>
AuthorDate: Thu Oct 16 23:41:23 2025 +0800
fix(go-client): Changed the validation logic for new configuration during
queryconfig operation (#2287)
This PR removes the check that requires the new partition count to be equal
to, double, or half of
the old one. Instead, it only checks whether the new partition count is
valid (greater than or equal
to 4 and is a power of 2).
---
go-client/pegasus/table_connector.go | 7 ++--
go-client/pegasus/table_connector_test.go | 56 ++++++++++++++++++++++++-------
2 files changed, 48 insertions(+), 15 deletions(-)
diff --git a/go-client/pegasus/table_connector.go
b/go-client/pegasus/table_connector.go
index 4a167b5ae..ca429d31e 100644
--- a/go-client/pegasus/table_connector.go
+++ b/go-client/pegasus/table_connector.go
@@ -283,15 +283,16 @@ func (p *pegasusTableConnector) updateConf(ctx
context.Context) error {
return nil
}
-func isPartitionValid(oldCount int, respCount int) bool {
- return oldCount == 0 || oldCount == respCount || oldCount*2 ==
respCount || oldCount == respCount*2
+func isPartitionValid(respCount int) bool {
+ // Check if respCount is greater than or equal to 4 and is a power of 2
+ return respCount >= 2 && (respCount&(respCount-1)) == 0
}
func (p *pegasusTableConnector) handleQueryConfigResp(resp
*replication.QueryCfgResponse) error {
if resp.Err.Errno != base.ERR_OK.String() {
return errors.New(resp.Err.Errno)
}
- if resp.PartitionCount == 0 || len(resp.Partitions) !=
int(resp.PartitionCount) || !isPartitionValid(len(p.parts),
int(resp.PartitionCount)) {
+ if resp.PartitionCount == 0 || len(resp.Partitions) !=
int(resp.PartitionCount) || !isPartitionValid(int(resp.PartitionCount)) {
return fmt.Errorf("invalid table configuration: response [%v]",
resp)
}
diff --git a/go-client/pegasus/table_connector_test.go
b/go-client/pegasus/table_connector_test.go
index 205a34a6a..4d7de11b9 100644
--- a/go-client/pegasus/table_connector_test.go
+++ b/go-client/pegasus/table_connector_test.go
@@ -355,18 +355,6 @@ func
TestPegasusTableConnector_HandleInvalidQueryConfigResp(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, partitionCount, len(p.parts))
}
-
- {
- resp := replication.NewQueryCfgResponse()
- resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
-
- resp.Partitions = make([]*replication.PartitionConfiguration, 2)
- resp.PartitionCount = 2
-
- err := p.handleQueryConfigResp(resp)
- assert.NotNil(t, err)
- assert.Equal(t, partitionCount, len(p.parts))
- }
}
func TestPegasusTableConnector_QueryConfigRespWhileStartSplit(t *testing.T) {
@@ -447,6 +435,50 @@ func
TestPegasusTableConnector_QueryConfigRespWhileCancelSplit(t *testing.T) {
ptb.Close()
}
+func TestPegasusTableConnector_QueryConfigRespWhileTablePartitionResize(t
*testing.T) {
+ // In certain scenarios, when dealing with partitions that consume
excessive disk space,
+ // we may split a single partition into multiple ones (more than double
the original number).
+ // To achieve this, we typically use an offline approach involving
multiple splits,
+ // followed by reconstruction on another cluster. Meanwhile, we utilize
metaproxy to ensure
+ // seamless traffic switching. As a result, significant changes in the
partition count
+ // are possible (e.g., 8x or 1/8x the original count). Therefore, when
updating the configuration,
+ // we do not require the new partition count to be related to the
previous one.
+ // It only needs to be a valid number.
+
+ // Ensure loopForAutoUpdate will be closed.
+ defer leaktest.Check(t)()
+
+ client := NewClient(testingCfg)
+ defer client.Close()
+
+ tb, err := client.OpenTable(context.Background(), "temp")
+ assert.Nil(t, err)
+ defer tb.Close()
+ ptb, _ := tb.(*pegasusTableConnector)
+
+ partitionCount := len(ptb.parts)
+ resp := replication.NewQueryCfgResponse()
+ resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
+ resp.AppID = ptb.appID
+ resp.PartitionCount = int32(partitionCount * 8)
+ resp.Partitions = make([]*replication.PartitionConfiguration,
partitionCount*8)
+ for i := 0; i < partitionCount*8; i++ {
+ if i < partitionCount {
+ resp.Partitions[i] = ptb.parts[i].pconf
+ } else {
+ conf := replication.NewPartitionConfiguration()
+ conf.Ballot = -1
+ conf.Pid = &base.Gpid{ptb.appID, int32(i)}
+ resp.Partitions[i] = conf
+ }
+ }
+
+ err = ptb.handleQueryConfigResp(resp)
+ assert.Nil(t, err)
+ assert.Equal(t, partitionCount*8, len(ptb.parts))
+ ptb.Close()
+}
+
func TestPegasusTableConnector_Close(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]