This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fc00015a fix(go-client): Changed the validation logic for new 
configuration during queryconfig operation (#2287)
0fc00015a is described below

commit 0fc00015a217da8a158b3cd1f1bafd9ab6631f7d
Author: nanorth <[email protected]>
AuthorDate: Thu Oct 16 23:41:23 2025 +0800

    fix(go-client): Changed the validation logic for new configuration during 
queryconfig operation (#2287)
    
    This PR removes the check that requires the new partition count to be equal 
to, double, or half of
    the old one. Instead, it only checks whether the new partition count is 
valid (greater than or equal
    to 4 and is a power of 2).
---
 go-client/pegasus/table_connector.go      |  7 ++--
 go-client/pegasus/table_connector_test.go | 56 ++++++++++++++++++++++++-------
 2 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/go-client/pegasus/table_connector.go 
b/go-client/pegasus/table_connector.go
index 4a167b5ae..ca429d31e 100644
--- a/go-client/pegasus/table_connector.go
+++ b/go-client/pegasus/table_connector.go
@@ -283,15 +283,16 @@ func (p *pegasusTableConnector) updateConf(ctx 
context.Context) error {
        return nil
 }
 
-func isPartitionValid(oldCount int, respCount int) bool {
-       return oldCount == 0 || oldCount == respCount || oldCount*2 == 
respCount || oldCount == respCount*2
+func isPartitionValid(respCount int) bool {
+       // Check if respCount is greater than or equal to 4 and is a power of 2
+       return respCount >= 2 && (respCount&(respCount-1)) == 0
 }
 
 func (p *pegasusTableConnector) handleQueryConfigResp(resp 
*replication.QueryCfgResponse) error {
        if resp.Err.Errno != base.ERR_OK.String() {
                return errors.New(resp.Err.Errno)
        }
-       if resp.PartitionCount == 0 || len(resp.Partitions) != 
int(resp.PartitionCount) || !isPartitionValid(len(p.parts), 
int(resp.PartitionCount)) {
+       if resp.PartitionCount == 0 || len(resp.Partitions) != 
int(resp.PartitionCount) || !isPartitionValid(int(resp.PartitionCount)) {
                return fmt.Errorf("invalid table configuration: response [%v]", 
resp)
        }
 
diff --git a/go-client/pegasus/table_connector_test.go 
b/go-client/pegasus/table_connector_test.go
index 205a34a6a..4d7de11b9 100644
--- a/go-client/pegasus/table_connector_test.go
+++ b/go-client/pegasus/table_connector_test.go
@@ -355,18 +355,6 @@ func 
TestPegasusTableConnector_HandleInvalidQueryConfigResp(t *testing.T) {
                assert.NotNil(t, err)
                assert.Equal(t, partitionCount, len(p.parts))
        }
-
-       {
-               resp := replication.NewQueryCfgResponse()
-               resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
-
-               resp.Partitions = make([]*replication.PartitionConfiguration, 2)
-               resp.PartitionCount = 2
-
-               err := p.handleQueryConfigResp(resp)
-               assert.NotNil(t, err)
-               assert.Equal(t, partitionCount, len(p.parts))
-       }
 }
 
 func TestPegasusTableConnector_QueryConfigRespWhileStartSplit(t *testing.T) {
@@ -447,6 +435,50 @@ func 
TestPegasusTableConnector_QueryConfigRespWhileCancelSplit(t *testing.T) {
        ptb.Close()
 }
 
+func TestPegasusTableConnector_QueryConfigRespWhileTablePartitionResize(t 
*testing.T) {
+       // In certain scenarios, when dealing with partitions that consume 
excessive disk space,
+       // we may split a single partition into multiple ones (more than double 
the original number).
+       // To achieve this, we typically use an offline approach involving 
multiple splits,
+       // followed by reconstruction on another cluster. Meanwhile, we utilize 
metaproxy to ensure
+       // seamless traffic switching. As a result, significant changes in the 
partition count
+       // are possible (e.g., 8x or 1/8x the original count). Therefore, when 
updating the configuration,
+       // we do not require the new partition count to be related to the 
previous one.
+       // It only needs to be a valid number.
+
+       // Ensure loopForAutoUpdate will be closed.
+       defer leaktest.Check(t)()
+
+       client := NewClient(testingCfg)
+       defer client.Close()
+
+       tb, err := client.OpenTable(context.Background(), "temp")
+       assert.Nil(t, err)
+       defer tb.Close()
+       ptb, _ := tb.(*pegasusTableConnector)
+
+       partitionCount := len(ptb.parts)
+       resp := replication.NewQueryCfgResponse()
+       resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
+       resp.AppID = ptb.appID
+       resp.PartitionCount = int32(partitionCount * 8)
+       resp.Partitions = make([]*replication.PartitionConfiguration, 
partitionCount*8)
+       for i := 0; i < partitionCount*8; i++ {
+               if i < partitionCount {
+                       resp.Partitions[i] = ptb.parts[i].pconf
+               } else {
+                       conf := replication.NewPartitionConfiguration()
+                       conf.Ballot = -1
+                       conf.Pid = &base.Gpid{ptb.appID, int32(i)}
+                       resp.Partitions[i] = conf
+               }
+       }
+
+       err = ptb.handleQueryConfigResp(resp)
+       assert.Nil(t, err)
+       assert.Equal(t, partitionCount*8, len(ptb.parts))
+       ptb.Close()
+}
+
 func TestPegasusTableConnector_Close(t *testing.T) {
        // Ensure loopForAutoUpdate will be closed.
        defer leaktest.Check(t)()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to