This is an automated email from the ASF dual-hosted git repository.
xikai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb-meta.git
The following commit(s) were added to refs/heads/main by this push:
new d880dd2 refactor: make create table idempotent (#286)
d880dd2 is described below
commit d880dd24df1d37599fb9e565733d0442540cb6cf
Author: CooooolFrog <[email protected]>
AuthorDate: Tue Jan 23 15:35:47 2024 +0800
refactor: make create table idempotent (#286)
## Rationale
Store the intermediate data for table creation through procedures to
support idempotent table creation.
## Detailed Changes
* Store the intermediate data for table creation through procedure.
* Supports retrying from the upper layer to complete failed table
creation procedures.
## Test Plan
Pass all unit tests and integration tests.
---
server/cluster/cluster.go | 2 +-
server/cluster/metadata/cluster_metadata.go | 33 +++++++-
server/cluster/metadata/topology_manager.go | 95 +++++++++++++++++++++-
server/coordinator/factory.go | 32 +++++---
server/coordinator/factory_test.go | 2 +-
server/coordinator/persist_shard_picker.go | 80 ++++++++++++++++++
server/coordinator/persist_shard_picker_test.go | 87 ++++++++++++++++++++
.../procedure/ddl/createtable/create_table.go | 77 ++++++++++++++----
server/coordinator/procedure/error.go | 1 +
.../scheduler/manager/scheduler_manager_test.go | 2 +-
.../scheduler/rebalanced/scheduler_test.go | 10 ++-
.../coordinator/scheduler/reopen/scheduler_test.go | 4 +-
.../coordinator/scheduler/static/scheduler_test.go | 10 ++-
server/etcdutil/util.go | 29 ++++++-
server/etcdutil/util_test.go | 39 +++++++++
server/storage/key_path.go | 14 ++++
server/storage/meta.go | 7 ++
server/storage/storage_impl.go | 67 +++++++++++++++
server/storage/types.go | 27 ++++++
19 files changed, 576 insertions(+), 42 deletions(-)
diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go
index 6af9b2a..9513617 100644
--- a/server/cluster/cluster.go
+++ b/server/cluster/cluster.go
@@ -58,7 +58,7 @@ func NewCluster(logger *zap.Logger, metadata
*metadata.ClusterMetadata, client *
dispatch := eventdispatch.NewDispatchImpl()
procedureIDRootPath := strings.Join([]string{rootPath, metadata.Name(),
defaultProcedurePrefixKey}, "/")
- procedureFactory := coordinator.NewFactory(logger,
id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep),
dispatch, procedureStorage)
+ procedureFactory := coordinator.NewFactory(logger,
id.NewAllocatorImpl(logger, client, procedureIDRootPath, defaultAllocStep),
dispatch, procedureStorage, metadata)
schedulerManager := manager.NewManager(logger, procedureManager,
procedureFactory, metadata, client, rootPath, metadata.GetTopologyType(),
metadata.GetProcedureExecutingBatchSize())
diff --git a/server/cluster/metadata/cluster_metadata.go
b/server/cluster/metadata/cluster_metadata.go
index ab9b293..0ae03b6 100644
--- a/server/cluster/metadata/cluster_metadata.go
+++ b/server/cluster/metadata/cluster_metadata.go
@@ -115,7 +115,8 @@ func (c *ClusterMetadata) Load(ctx context.Context) error {
return errors.WithMessage(err, "load table manager")
}
- if err := c.topologyManager.Load(ctx); err != nil {
+ schemas := c.tableManager.GetSchemas()
+ if err := c.topologyManager.Load(ctx, schemas); err != nil {
return errors.WithMessage(err, "load topology manager")
}
@@ -279,6 +280,11 @@ func (c *ClusterMetadata) GetTable(schemaName, tableName
string) (storage.Table,
return c.tableManager.GetTable(schemaName, tableName)
}
+// GetTableShard get the shard where the table actually exists.
+func (c *ClusterMetadata) GetTableShard(ctx context.Context, table
storage.Table) (storage.ShardID, bool) {
+ return c.topologyManager.GetTableShardID(ctx, table)
+}
+
func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request
CreateTableMetadataRequest) (CreateTableMetadataResult, error) {
c.logger.Info("create table start", zap.String("cluster", c.Name()),
zap.String("schemaName", request.SchemaName), zap.String("tableName",
request.TableName))
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context,
request CreateTableRe
return ret, nil
}
+func (c *ClusterMetadata) GetTableAssignedShard(ctx context.Context,
schemaName string, tableName string) (storage.ShardID, bool, error) {
+ schema, exists := c.tableManager.GetSchema(schemaName)
+ if !exists {
+ return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema
%s not found", schemaName)
+ }
+ shardIDs, exists := c.topologyManager.GetTableAssignedShard(ctx,
schema.ID, tableName)
+ return shardIDs, exists, nil
+}
+
+func (c *ClusterMetadata) AssignTableToShard(ctx context.Context, schemaName
string, tableName string, shardID storage.ShardID) error {
+ schema, exists := c.tableManager.GetSchema(schemaName)
+ if !exists {
+ return errors.WithMessagef(ErrSchemaNotFound, "schema %s not
found", schemaName)
+ }
+ return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName,
shardID)
+}
+
+func (c *ClusterMetadata) DeleteTableAssignedShard(ctx context.Context,
schemaName string, tableName string) error {
+ schema, exists := c.tableManager.GetSchema(schemaName)
+ if !exists {
+ return errors.WithMessagef(ErrSchemaNotFound, "schema %s not
found", schemaName)
+ }
+ return c.topologyManager.DeleteTableAssignedShard(ctx, schema.ID,
tableName)
+}
+
func (c *ClusterMetadata) GetShards() []storage.ShardID {
return c.topologyManager.GetShards()
}
diff --git a/server/cluster/metadata/topology_manager.go
b/server/cluster/metadata/topology_manager.go
index 2426bb0..f744a5d 100644
--- a/server/cluster/metadata/topology_manager.go
+++ b/server/cluster/metadata/topology_manager.go
@@ -33,7 +33,7 @@ import (
// TopologyManager manages the cluster topology, including the mapping
relationship between shards, nodes, and tables.
type TopologyManager interface {
// Load load cluster topology from storage.
- Load(ctx context.Context) error
+ Load(ctx context.Context, schemas []storage.Schema) error
// GetVersion get cluster view version.
GetVersion() uint64
// GetClusterState get cluster view state.
@@ -44,6 +44,14 @@ type TopologyManager interface {
AddTable(ctx context.Context, shardID storage.ShardID, latestVersion
uint64, tables []storage.Table) error
// RemoveTable remove table on target shards from cluster topology.
RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion
uint64, tableIDs []storage.TableID) error
+ // GetTableShardID get the shardID of the shard where the table is
located.
+ GetTableShardID(ctx context.Context, table storage.Table)
(storage.ShardID, bool)
+ // AssignTableToShard persistent table shard mapping, it is used to
store assign results and make the table creation idempotent.
+ AssignTableToShard(ctx context.Context, schemaID storage.SchemaID,
tableName string, shardID storage.ShardID) error
+ // GetTableAssignedShard get table assign result.
+ GetTableAssignedShard(ctx context.Context, schemaID storage.SchemaID,
tableName string) (storage.ShardID, bool)
+ // DeleteTableAssignedShard delete table assign result.
+ DeleteTableAssignedShard(ctx context.Context, schemaID
storage.SchemaID, tableName string) error
// GetShards get all shards in cluster topology.
GetShards() []storage.ShardID
// GetShardNodesByID get shardNodes with shardID.
@@ -133,6 +141,8 @@ type TopologyManagerImpl struct {
// ShardView in memory.
shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID ->
shardTopology
tableShardMapping map[storage.TableID][]storage.ShardID // tableID ->
ShardID
+ // Table assign result in memory.
+ tableAssignMapping map[storage.SchemaID]map[string]storage.ShardID //
tableName -> shardID
nodes map[string]storage.Node // NodeName in memory.
}
@@ -150,11 +160,12 @@ func NewTopologyManagerImpl(logger *zap.Logger, storage
storage.Storage, cluster
nodeShardsMapping: nil,
shardTablesMapping: nil,
tableShardMapping: nil,
+ tableAssignMapping: nil,
nodes: nil,
}
}
-func (m *TopologyManagerImpl) Load(ctx context.Context) error {
+func (m *TopologyManagerImpl) Load(ctx context.Context, schemas
[]storage.Schema) error {
m.lock.Lock()
defer m.lock.Unlock()
@@ -169,6 +180,11 @@ func (m *TopologyManagerImpl) Load(ctx context.Context)
error {
if err := m.loadNodes(ctx); err != nil {
return errors.WithMessage(err, "load nodes")
}
+
+ if err := m.loadAssignTable(ctx, schemas); err != nil {
+ return errors.WithMessage(err, "load assign table")
+ }
+
return nil
}
@@ -294,6 +310,64 @@ func (m *TopologyManagerImpl) RemoveTable(ctx
context.Context, shardID storage.S
return nil
}
+func (m *TopologyManagerImpl) GetTableShardID(_ context.Context, table
storage.Table) (storage.ShardID, bool) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+
+ shardIDs, exists := m.tableShardMapping[table.ID]
+ if exists {
+ return shardIDs[0], true
+ }
+
+ return 0, false
+}
+
+func (m *TopologyManagerImpl) AssignTableToShard(ctx context.Context, schemaID
storage.SchemaID, tableName string, shardID storage.ShardID) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ if err := m.storage.AssignTableToShard(ctx,
storage.AssignTableToShardRequest{
+ ClusterID: m.clusterID,
+ SchemaID: schemaID,
+ TableName: tableName,
+ ShardID: shardID,
+ }); err != nil {
+ return errors.WithMessage(err, "storage assign table")
+ }
+
+ // Update cache im memory.
+ if _, exists := m.tableAssignMapping[schemaID]; !exists {
+ m.tableAssignMapping[schemaID] =
make(map[string]storage.ShardID, 0)
+ }
+
+ m.tableAssignMapping[schemaID][tableName] = shardID
+
+ return nil
+}
+
+func (m *TopologyManagerImpl) GetTableAssignedShard(_ context.Context,
schemaID storage.SchemaID, tableName string) (storage.ShardID, bool) {
+ assignResult, exists := m.tableAssignMapping[schemaID][tableName]
+ return assignResult, exists
+}
+
+func (m *TopologyManagerImpl) DeleteTableAssignedShard(ctx context.Context,
schemaID storage.SchemaID, tableName string) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ if err := m.storage.DeleteTableAssignedShard(ctx,
storage.DeleteTableAssignedRequest{
+ ClusterID: m.clusterID,
+ SchemaID: schemaID,
+ TableName: tableName,
+ }); err != nil {
+ return errors.WithMessage(err, "storage delete assign table")
+ }
+
+ // Update cache im memory.
+ delete(m.tableAssignMapping[schemaID], tableName)
+
+ return nil
+}
+
func (m *TopologyManagerImpl) GetShards() []storage.ShardID {
m.lock.RLock()
defer m.lock.RUnlock()
@@ -584,6 +658,23 @@ func (m *TopologyManagerImpl) loadShardViews(ctx
context.Context) error {
return nil
}
+func (m *TopologyManagerImpl) loadAssignTable(ctx context.Context, schemas
[]storage.Schema) error {
+ m.tableAssignMapping =
make(map[storage.SchemaID]map[string]storage.ShardID, len(schemas))
+ for _, schema := range schemas {
+ m.tableAssignMapping[schema.ID] =
make(map[string]storage.ShardID, 0)
+
+ listAssignTableResult, err :=
m.storage.ListTableAssignedShard(ctx, storage.ListAssignTableRequest{ClusterID:
m.clusterID, SchemaID: schema.ID})
+ if err != nil {
+ return errors.WithMessage(err, "storage list assign
table")
+ }
+ for _, assignTable := range listAssignTableResult.TableAssigns {
+ m.tableAssignMapping[schema.ID][assignTable.TableName]
= assignTable.ShardID
+ }
+ }
+
+ return nil
+}
+
func (m *TopologyManagerImpl) loadNodes(ctx context.Context) error {
nodesResult, err := m.storage.ListNodes(ctx,
storage.ListNodesRequest{ClusterID: m.clusterID})
if err != nil {
diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go
index a830fe3..9e12c7f 100644
--- a/server/coordinator/factory.go
+++ b/server/coordinator/factory.go
@@ -43,7 +43,7 @@ type Factory struct {
idAllocator id.Allocator
dispatch eventdispatch.Dispatch
storage procedure.Storage
- shardPicker ShardPicker
+ shardPicker *PersistShardPicker
}
type CreateTableRequest struct {
@@ -101,13 +101,13 @@ type BatchRequest struct {
BatchType procedure.Kind
}
-func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch
eventdispatch.Dispatch, storage procedure.Storage) *Factory {
+func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch
eventdispatch.Dispatch, storage procedure.Storage, clusterMetadata
*metadata.ClusterMetadata) *Factory {
return &Factory{
idAllocator: allocator,
dispatch: dispatch,
storage: storage,
logger: logger,
- shardPicker: NewLeastTableShardPicker(),
+ shardPicker: NewPersistShardPicker(clusterMetadata,
NewLeastTableShardPicker()),
}
}
@@ -129,14 +129,24 @@ func (f *Factory) makeCreateTableProcedure(ctx
context.Context, request CreateTa
}
snapshot := request.ClusterMetadata.GetClusterSnapshot()
- shards, err := f.shardPicker.PickShards(ctx, snapshot, 1)
+ var targetShardID storage.ShardID
+ shardID, exists, err :=
request.ClusterMetadata.GetTableAssignedShard(ctx,
request.SourceReq.SchemaName, request.SourceReq.Name)
if err != nil {
- f.logger.Error("pick table shard", zap.Error(err))
- return nil, errors.WithMessage(err, "pick table shard")
+ return nil, err
}
- if len(shards) != 1 {
- f.logger.Error("pick table shards length not equal 1",
zap.Int("shards", len(shards)))
- return nil, errors.WithMessagef(procedure.ErrPickShard, "pick
table shard, shards length:%d", len(shards))
+ if exists {
+ targetShardID = shardID
+ } else {
+ shards, err := f.shardPicker.PickShards(ctx, snapshot,
request.SourceReq.GetSchemaName(), []string{request.SourceReq.GetName()})
+ if err != nil {
+ f.logger.Error("pick table shard", zap.Error(err))
+ return nil, errors.WithMessage(err, "pick table shard")
+ }
+ if len(shards) != 1 {
+ f.logger.Error("pick table shards length not equal 1",
zap.Int("shards", len(shards)))
+ return nil, errors.WithMessagef(procedure.ErrPickShard,
"pick table shard, shards length:%d", len(shards))
+ }
+ targetShardID = shards[request.SourceReq.GetName()].ID
}
return createtable.NewProcedure(createtable.ProcedureParams{
@@ -144,7 +154,7 @@ func (f *Factory) makeCreateTableProcedure(ctx
context.Context, request CreateTa
ClusterMetadata: request.ClusterMetadata,
ClusterSnapshot: snapshot,
ID: id,
- ShardID: shards[0].ID,
+ ShardID: targetShardID,
SourceReq: request.SourceReq,
OnSucceeded: request.OnSucceeded,
OnFailed: request.OnFailed,
@@ -164,7 +174,7 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx
context.Context, request
nodeNames[shardNode.NodeName] = 1
}
- subTableShards, err := f.shardPicker.PickShards(ctx, snapshot,
len(request.SourceReq.PartitionTableInfo.SubTableNames))
+ subTableShards, err := f.shardPicker.PickShards(ctx, snapshot,
request.SourceReq.GetSchemaName(),
request.SourceReq.PartitionTableInfo.SubTableNames)
if err != nil {
return nil, errors.WithMessage(err, "pick sub table shards")
}
diff --git a/server/coordinator/factory_test.go
b/server/coordinator/factory_test.go
index 336255a..04c83c6 100644
--- a/server/coordinator/factory_test.go
+++ b/server/coordinator/factory_test.go
@@ -39,7 +39,7 @@ func setupFactory(t *testing.T) (*coordinator.Factory,
*metadata.ClusterMetadata
dispatch := test.MockDispatch{}
allocator := test.MockIDAllocator{}
storage := test.NewTestStorage(t)
- f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage)
+ f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage,
c.GetMetadata())
return f, c.GetMetadata()
}
diff --git a/server/coordinator/persist_shard_picker.go
b/server/coordinator/persist_shard_picker.go
new file mode 100644
index 0000000..b368422
--- /dev/null
+++ b/server/coordinator/persist_shard_picker.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package coordinator
+
+import (
+ "context"
+
+ "github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
+ "github.com/apache/incubator-horaedb-meta/server/storage"
+)
+
+type PersistShardPicker struct {
+ cluster *metadata.ClusterMetadata
+ internal ShardPicker
+}
+
+func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal
ShardPicker) *PersistShardPicker {
+ return &PersistShardPicker{cluster: cluster, internal: internal}
+}
+
+func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot
metadata.Snapshot, schemaName string, tableNames []string)
(map[string]storage.ShardNode, error) {
+ result := map[string]storage.ShardNode{}
+
+ shardNodeMap := make(map[storage.ShardID]storage.ShardNode,
len(tableNames))
+ for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes {
+ shardNodeMap[shardNode.ID] = shardNode
+ }
+
+ var missingTables []string
+ // If table assign has been created, just reuse it.
+ for i := 0; i < len(tableNames); i++ {
+ shardID, exists, err := p.cluster.GetTableAssignedShard(ctx,
schemaName, tableNames[i])
+ if err != nil {
+ return map[string]storage.ShardNode{}, err
+ }
+ if exists {
+ result[tableNames[i]] = shardNodeMap[shardID]
+ } else {
+ missingTables = append(missingTables, tableNames[i])
+ }
+ }
+
+ // All table has been assigned to shard.
+ if len(missingTables) == 0 {
+ return result, nil
+ }
+
+ // No table assign has been created, try to pick shard and save table
assigns.
+ shardNodes, err := p.internal.PickShards(ctx, snapshot,
len(missingTables))
+ if err != nil {
+ return map[string]storage.ShardNode{}, err
+ }
+
+ for i, shardNode := range shardNodes {
+ result[missingTables[i]] = shardNode
+ err = p.cluster.AssignTableToShard(ctx, schemaName,
missingTables[i], shardNode.ID)
+ if err != nil {
+ return map[string]storage.ShardNode{}, err
+ }
+ }
+
+ return result, nil
+}
diff --git a/server/coordinator/persist_shard_picker_test.go
b/server/coordinator/persist_shard_picker_test.go
new file mode 100644
index 0000000..67e3aa7
--- /dev/null
+++ b/server/coordinator/persist_shard_picker_test.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package coordinator_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
+ "github.com/apache/incubator-horaedb-meta/server/coordinator"
+
"github.com/apache/incubator-horaedb-meta/server/coordinator/procedure/test"
+ "github.com/apache/incubator-horaedb-meta/server/storage"
+ "github.com/stretchr/testify/require"
+)
+
+func TestPersistShardPicker(t *testing.T) {
+ re := require.New(t)
+ ctx := context.Background()
+
+ c := test.InitStableCluster(ctx, t)
+
+ persistShardPicker :=
coordinator.NewPersistShardPicker(c.GetMetadata(),
coordinator.NewLeastTableShardPicker())
+ pickResult, err := persistShardPicker.PickShards(ctx,
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName,
[]string{test.TestTableName0})
+ re.NoError(err)
+ re.Equal(len(pickResult), 1)
+
+ createResult, err := c.GetMetadata().CreateTable(ctx,
metadata.CreateTableRequest{
+ ShardID: pickResult[test.TestTableName0].ID,
+ LatestVersion: 0,
+ SchemaName: test.TestSchemaName,
+ TableName: test.TestTableName0,
+ PartitionInfo: storage.PartitionInfo{Info: nil},
+ })
+ re.NoError(err)
+ re.Equal(test.TestTableName0, createResult.Table.Name)
+
+ // Try to pick shard for same table after the table is created.
+ newPickResult, err := persistShardPicker.PickShards(ctx,
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName,
[]string{test.TestTableName0})
+ re.NoError(err)
+ re.Equal(len(newPickResult), 1)
+ re.Equal(newPickResult[test.TestTableName0],
pickResult[test.TestTableName0])
+
+ // Try to pick shard for another table.
+ pickResult, err = persistShardPicker.PickShards(ctx,
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName,
[]string{test.TestTableName1})
+ re.NoError(err)
+ re.Equal(len(pickResult), 1)
+
+ err = c.GetMetadata().DropTable(ctx, metadata.DropTableRequest{
+ SchemaName: test.TestSchemaName,
+ TableName: test.TestTableName0,
+ ShardID: pickResult[test.TestTableName0].ID,
+ LatestVersion: 0,
+ })
+ re.NoError(err)
+
+ // Try to pick shard for table1 after drop table0.
+ newPickResult, err = persistShardPicker.PickShards(ctx,
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName,
[]string{test.TestTableName1})
+ re.NoError(err)
+ re.Equal(len(pickResult), 1)
+ re.Equal(newPickResult[test.TestTableName1],
pickResult[test.TestTableName1])
+
+ err = c.GetMetadata().DeleteTableAssignedShard(ctx,
test.TestSchemaName, test.TestTableName1)
+ re.NoError(err)
+
+ // Try to pick another for table1 after drop table1 assign result.
+ newPickResult, err = persistShardPicker.PickShards(ctx,
c.GetMetadata().GetClusterSnapshot(), test.TestSchemaName,
[]string{test.TestTableName1})
+ re.NoError(err)
+ re.Equal(len(pickResult), 1)
+ re.NotEqual(newPickResult[test.TestTableName1],
pickResult[test.TestTableName1])
+}
diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go
b/server/coordinator/procedure/ddl/createtable/create_table.go
index 49138a4..292f090 100644
--- a/server/coordinator/procedure/ddl/createtable/create_table.go
+++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -37,30 +37,34 @@ import (
)
const (
- eventCreateMetadata = "EventCreateMetadata"
- eventCreateOnShard = "EventCreateOnShard"
- eventFinish = "EventFinish"
-
- stateBegin = "StateBegin"
- stateCreateMetadata = "StateCreateMetadata"
- stateCreateOnShard = "StateCreateOnShard"
- stateFinish = "StateFinish"
+ eventCheckTableExists = "EventCheckTableExists"
+ eventCreateMetadata = "EventCreateMetadata"
+ eventCreateOnShard = "EventCreateOnShard"
+ eventFinish = "EventFinish"
+
+ stateBegin = "StateBegin"
+ stateCheckTableExists = "StateCheckTableExists"
+ stateCreateMetadata = "StateCreateMetadata"
+ stateCreateOnShard = "StateCreateOnShard"
+ stateFinish = "StateFinish"
)
var (
createTableEvents = fsm.Events{
- {Name: eventCreateMetadata, Src: []string{stateBegin}, Dst:
stateCreateMetadata},
+ {Name: eventCheckTableExists, Src: []string{stateBegin}, Dst:
stateCheckTableExists},
+ {Name: eventCreateMetadata, Src:
[]string{stateCheckTableExists}, Dst: stateCreateMetadata},
{Name: eventCreateOnShard, Src: []string{stateCreateMetadata},
Dst: stateCreateOnShard},
{Name: eventFinish, Src: []string{stateCreateOnShard}, Dst:
stateFinish},
}
createTableCallbacks = fsm.Callbacks{
- eventCreateMetadata: createMetadataCallback,
- eventCreateOnShard: createOnShard,
- eventFinish: createFinish,
+ eventCheckTableExists: checkTableExists,
+ eventCreateMetadata: createMetadata,
+ eventCreateOnShard: createOnShard,
+ eventFinish: createFinish,
}
)
-func createMetadataCallback(event *fsm.Event) {
+func checkTableExists(event *fsm.Event) {
req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
if err != nil {
procedure.CancelEventWithLog(event, err, "get request from
event")
@@ -68,6 +72,42 @@ func createMetadataCallback(event *fsm.Event) {
}
params := req.p.params
+ // Check whether the table metadata already exists.
+ table, exists, err :=
params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(),
params.SourceReq.GetName())
+ if err != nil {
+ procedure.CancelEventWithLog(event, err, "get table metadata")
+ return
+ }
+ if !exists {
+ return
+ }
+
+ // Check whether the table shard mapping already exists.
+ _, exists = params.ClusterMetadata.GetTableShard(req.ctx, table)
+ if exists {
+ procedure.CancelEventWithLog(event,
metadata.ErrTableAlreadyExists, "table shard already exists")
+ return
+ }
+}
+
+func createMetadata(event *fsm.Event) {
+ req, err := procedure.GetRequestFromEvent[*callbackRequest](event)
+ if err != nil {
+ procedure.CancelEventWithLog(event, err, "get request from
event")
+ return
+ }
+ params := req.p.params
+
+ _, exists, err :=
params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(),
params.SourceReq.GetName())
+ if err != nil {
+ procedure.CancelEventWithLog(event, err, "get table metadata")
+ return
+ }
+ if exists {
+ log.Info("table metadata already exists",
zap.String("schemaName", params.SourceReq.GetSchemaName()),
zap.String("tableName", params.SourceReq.GetName()))
+ return
+ }
+
createTableMetadataRequest := metadata.CreateTableMetadataRequest{
SchemaName: params.SourceReq.GetSchemaName(),
TableName: params.SourceReq.GetName(),
@@ -139,6 +179,11 @@ func createFinish(event *fsm.Event) {
procedure.CancelEventWithLog(event, err, "get request from
event")
return
}
+ params := req.p.params
+
+ if err :=
req.p.params.ClusterMetadata.DeleteTableAssignedShard(req.ctx,
params.SourceReq.GetSchemaName(), params.SourceReq.GetName()); err != nil {
+ log.Warn("delete assign table failed", zap.String("schemaName",
params.SourceReq.GetSchemaName()), zap.String("tableName",
params.SourceReq.GetName()))
+ }
assert.Assert(req.createTableResult != nil)
if err := req.p.params.OnSucceeded(*req.createTableResult); err != nil {
@@ -229,7 +274,6 @@ func (p *Procedure) Kind() procedure.Kind {
func (p *Procedure) Start(ctx context.Context) error {
p.updateState(procedure.StateRunning)
- // Try to load persist data.
req := &callbackRequest{
ctx: ctx,
p: p,
@@ -239,6 +283,11 @@ func (p *Procedure) Start(ctx context.Context) error {
for {
switch p.fsm.Current() {
case stateBegin:
+ if err := p.fsm.Event(eventCheckTableExists, req); err
!= nil {
+ _ = p.params.OnFailed(err)
+ return err
+ }
+ case stateCheckTableExists:
if err := p.fsm.Event(eventCreateMetadata, req); err !=
nil {
_ = p.params.OnFailed(err)
return err
diff --git a/server/coordinator/procedure/error.go
b/server/coordinator/procedure/error.go
index 165cb2c..c59dc8f 100644
--- a/server/coordinator/procedure/error.go
+++ b/server/coordinator/procedure/error.go
@@ -23,6 +23,7 @@ import "github.com/apache/incubator-horaedb-meta/pkg/coderr"
var (
ErrShardLeaderNotFound = coderr.NewCodeError(coderr.Internal,
"shard leader not found")
+ ErrShardNotMatch = coderr.NewCodeError(coderr.Internal,
"target shard not match to persis data")
ErrProcedureNotFound = coderr.NewCodeError(coderr.Internal,
"procedure not found")
ErrClusterConfigChanged = coderr.NewCodeError(coderr.Internal,
"cluster config changed")
ErrTableNotExists = coderr.NewCodeError(coderr.Internal,
"table not exists")
diff --git a/server/coordinator/scheduler/manager/scheduler_manager_test.go
b/server/coordinator/scheduler/manager/scheduler_manager_test.go
index 15d2bd6..7edbf5e 100644
--- a/server/coordinator/scheduler/manager/scheduler_manager_test.go
+++ b/server/coordinator/scheduler/manager/scheduler_manager_test.go
@@ -44,7 +44,7 @@ func TestSchedulerManager(t *testing.T) {
dispatch := test.MockDispatch{}
allocator := test.MockIDAllocator{}
s := test.NewTestStorage(t)
- f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s)
+ f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, s,
c.GetMetadata())
_, client, _ := etcdutil.PrepareEtcdServerAndClient(t)
// Create scheduler manager with enableScheduler equal to false.
diff --git a/server/coordinator/scheduler/rebalanced/scheduler_test.go
b/server/coordinator/scheduler/rebalanced/scheduler_test.go
index e451b53..8c93157 100644
--- a/server/coordinator/scheduler/rebalanced/scheduler_test.go
+++ b/server/coordinator/scheduler/rebalanced/scheduler_test.go
@@ -35,23 +35,25 @@ func TestRebalancedScheduler(t *testing.T) {
re := require.New(t)
ctx := context.Background()
- procedureFactory := coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
-
- s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
-
// EmptyCluster would be scheduled an empty procedure.
emptyCluster := test.InitEmptyCluster(ctx, t)
+ procedureFactory := coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t),
emptyCluster.GetMetadata())
+ s := rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
result, err := s.Schedule(ctx,
emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Empty(result)
// PrepareCluster would be scheduled an empty procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
+ procedureFactory = coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t),
prepareCluster.GetMetadata())
+ s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
_, err = s.Schedule(ctx,
prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
// StableCluster with all shards assigned would be scheduled a load
balance procedure.
stableCluster := test.InitStableCluster(ctx, t)
+ procedureFactory = coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t),
stableCluster.GetMetadata())
+ s = rebalanced.NewShardScheduler(zap.NewNop(), procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
_, err = s.Schedule(ctx,
stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
}
diff --git a/server/coordinator/scheduler/reopen/scheduler_test.go
b/server/coordinator/scheduler/reopen/scheduler_test.go
index 8060e2d..2b23051 100644
--- a/server/coordinator/scheduler/reopen/scheduler_test.go
+++ b/server/coordinator/scheduler/reopen/scheduler_test.go
@@ -35,12 +35,12 @@ import (
func TestReopenShardScheduler(t *testing.T) {
re := require.New(t)
ctx := context.Background()
+ emptyCluster := test.InitEmptyCluster(ctx, t)
- procedureFactory := coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
+ procedureFactory := coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t),
emptyCluster.GetMetadata())
s := reopen.NewShardScheduler(procedureFactory, 1)
- emptyCluster := test.InitEmptyCluster(ctx, t)
// ReopenShardScheduler should not schedule when cluster is not stable.
result, err := s.Schedule(ctx,
emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
diff --git a/server/coordinator/scheduler/static/scheduler_test.go
b/server/coordinator/scheduler/static/scheduler_test.go
index 929a2f0..d1c9b83 100644
--- a/server/coordinator/scheduler/static/scheduler_test.go
+++ b/server/coordinator/scheduler/static/scheduler_test.go
@@ -35,24 +35,26 @@ func TestStaticTopologyScheduler(t *testing.T) {
re := require.New(t)
ctx := context.Background()
- procedureFactory := coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
-
- s := static.NewShardScheduler(procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
-
// EmptyCluster would be scheduled an empty procedure.
emptyCluster := test.InitEmptyCluster(ctx, t)
+ procedureFactory := coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t),
emptyCluster.GetMetadata())
+ s := static.NewShardScheduler(procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
result, err := s.Schedule(ctx,
emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.Empty(result)
// PrepareCluster would be scheduled a transfer leader procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
+ procedureFactory = coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t),
prepareCluster.GetMetadata())
+ s = static.NewShardScheduler(procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
result, err = s.Schedule(ctx,
prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.NotEmpty(result)
// StableCluster with all shards assigned would be scheduled a transfer
leader procedure by hash rule.
stableCluster := test.InitStableCluster(ctx, t)
+ procedureFactory = coordinator.NewFactory(zap.NewNop(),
test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t),
stableCluster.GetMetadata())
+ s = static.NewShardScheduler(procedureFactory,
nodepicker.NewConsistentUniformHashNodePicker(zap.NewNop()), 1)
result, err = s.Schedule(ctx,
stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
re.NotEmpty(result)
diff --git a/server/etcdutil/util.go b/server/etcdutil/util.go
index 5d29d80..62ef077 100644
--- a/server/etcdutil/util.go
+++ b/server/etcdutil/util.go
@@ -21,6 +21,7 @@ package etcdutil
import (
"context"
+ "path"
"github.com/apache/incubator-horaedb-meta/pkg/log"
clientv3 "go.etcd.io/etcd/client/v3"
@@ -118,7 +119,7 @@ func Scan(ctx context.Context, client *clientv3.Client,
startKey, endKey string,
}
}
- // Check whether the keys is exhausted.
+ // Check whether the keys are exhausted.
if len(resp.Kvs) < batchSize {
return nil
}
@@ -126,3 +127,29 @@ func Scan(ctx context.Context, client *clientv3.Client,
startKey, endKey string,
lastKeyInPrevBatch = string(resp.Kvs[len(resp.Kvs)-1].Key)
}
}
+
+func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix
string, do func(key string, val []byte) error) error {
+ rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
+ resp, err := client.Get(ctx, prefix, clientv3.WithRange(rangeEnd))
+ if err != nil {
+ return ErrEtcdKVGet.WithCause(err)
+ }
+ // Check whether the keys are exhausted.
+ if len(resp.Kvs) == 0 {
+ return nil
+ }
+
+ for _, item := range resp.Kvs {
+ err := do(string(item.Key), item.Value)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// GetLastPathSegment get the last path segment from completePath, path is
split by '/'.
+func GetLastPathSegment(completePath string) string {
+ return path.Base(path.Clean(completePath))
+}
diff --git a/server/etcdutil/util_test.go b/server/etcdutil/util_test.go
index fa2532d..6b816a4 100644
--- a/server/etcdutil/util_test.go
+++ b/server/etcdutil/util_test.go
@@ -105,3 +105,42 @@ func TestScanFailed(t *testing.T) {
err := Scan(ctx, client, startKey, endKey, 10, do)
r.Equal(fakeErr, err)
}
+
+func TestScanWithPrefix(t *testing.T) {
+ r := require.New(t)
+
+ _, client, closeSrv := PrepareEtcdServerAndClient(t)
+ defer closeSrv()
+ ctx := context.Background()
+
+ // Build keys with different prefix.
+ keys := []string{}
+ keys = append(keys, "/prefix/0")
+ keys = append(keys, "/prefix/1")
+ keys = append(keys, "/diff/0")
+
+ // Put the keys.
+ for _, key := range keys {
+ // Let the value equal key for simplicity.
+ val := key
+ _, err := client.Put(ctx, key, val)
+ r.NoError(err)
+ }
+
+ var scanResult []string
+ do := func(key string, value []byte) error {
+ scanResult = append(scanResult, key)
+ return nil
+ }
+ err := ScanWithPrefix(ctx, client, "/prefix", do)
+ r.NoError(err)
+ r.Equal(len(scanResult), 2)
+}
+
+func TestGetLastPathSegment(t *testing.T) {
+ r := require.New(t)
+
+ path := "/prefix/a/b/c"
+ lastPathSegment := GetLastPathSegment(path)
+ r.Equal("c", lastPathSegment)
+}
diff --git a/server/storage/key_path.go b/server/storage/key_path.go
index 6036e59..4627bc0 100644
--- a/server/storage/key_path.go
+++ b/server/storage/key_path.go
@@ -36,6 +36,7 @@ const (
shardView = "shard_view"
latestVersion = "latest_version"
info = "info"
+ tableAssign = "table_assign"
)
// makeSchemaKey returns the key path to the schema meta info.
@@ -128,6 +129,19 @@ func makeNameToIDKey(rootPath string, clusterID uint32,
schemaID uint32, tableNa
return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)),
schema, fmtID(uint64(schemaID)), tableNameToID, tableName)
}
+// makeTableAssignKey return the tableAssign key path.
+func makeTableAssignKey(rootPath string, clusterID uint32, schemaID uint32,
tableName string) string {
+ // Example:
+ // v1/cluster/1/schema/1/table_assign/tableName1 -> shardID1
+ // v1/cluster/1/schema/1/table_assign/tableName2 -> shardID2
+ return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)),
schema, fmtID(uint64(schemaID)), tableAssign, tableName)
+}
+
+// makeTableAssignPrefixKey return the tableAssign prefix key path.
+func makeTableAssignPrefixKey(rootPath string, clusterID uint32, schemaID
uint32) string {
+ return path.Join(rootPath, version, cluster, fmtID(uint64(clusterID)),
schema, fmtID(uint64(schemaID)), tableAssign)
+}
+
func fmtID(id uint64) string {
return fmt.Sprintf("%020d", id)
}
diff --git a/server/storage/meta.go b/server/storage/meta.go
index 842995d..b56da68 100644
--- a/server/storage/meta.go
+++ b/server/storage/meta.go
@@ -57,6 +57,13 @@ type Storage interface {
// DeleteTable delete table by table name in specified cluster and
schema.
DeleteTable(ctx context.Context, req DeleteTableRequest) error
+ // AssignTableToShard save table assign result.
+ AssignTableToShard(ctx context.Context, req AssignTableToShardRequest)
error
+ // DeleteTableAssignedShard delete table assign result.
+ DeleteTableAssignedShard(ctx context.Context, req
DeleteTableAssignedRequest) error
+ // ListTableAssignedShard list table assign result.
+ ListTableAssignedShard(ctx context.Context, req ListAssignTableRequest)
(ListTableAssignedShardResult, error)
+
// CreateShardViews create shard views in specified cluster.
CreateShardViews(ctx context.Context, req CreateShardViewsRequest) error
// ListShardViews list all shard views in specified cluster.
diff --git a/server/storage/storage_impl.go b/server/storage/storage_impl.go
index 8aaafdd..e6b130e 100644
--- a/server/storage/storage_impl.go
+++ b/server/storage/storage_impl.go
@@ -418,6 +418,73 @@ func (s *metaStorageImpl) DeleteTable(ctx context.Context,
req DeleteTableReques
return nil
}
+func (s *metaStorageImpl) AssignTableToShard(ctx context.Context, req
AssignTableToShardRequest) error {
+ key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID),
uint32(req.SchemaID), req.TableName)
+
+ // Check if the key exists, if not,save table assign result; Otherwise,
the table assign result already exists and return an error.
+ keyMissing := clientv3util.KeyMissing(key)
+ opCreateAssignTable := clientv3.OpPut(key,
strconv.Itoa(int(req.ShardID)))
+
+ resp, err := s.client.Txn(ctx).
+ If(keyMissing).
+ Then(opCreateAssignTable).
+ Commit()
+ if err != nil {
+ return errors.WithMessagef(err, "create assign table,
clusterID:%d, schemaID:%d, key:%s", req.ClusterID, req.ShardID, key)
+ }
+ if !resp.Succeeded {
+ return ErrCreateSchemaAgain.WithCausef("assign table may
already exist, clusterID:%d, schemaID:%d, key:%s, resp:%v", req.ClusterID,
req.SchemaID, key, resp)
+ }
+
+ return nil
+}
+
+func (s *metaStorageImpl) DeleteTableAssignedShard(ctx context.Context, req
DeleteTableAssignedRequest) error {
+ key := makeTableAssignKey(s.rootPath, uint32(req.ClusterID),
uint32(req.SchemaID), req.TableName)
+
+ keyExists := clientv3util.KeyExists(key)
+ opDeleteAssignTable := clientv3.OpDelete(key)
+
+ resp, err := s.client.Txn(ctx).
+ If(keyExists).
+ Then(opDeleteAssignTable).
+ Commit()
+ if err != nil {
+ return errors.WithMessagef(err, "delete assign table,
clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID, req.SchemaID,
req.TableName)
+ }
+ if !resp.Succeeded {
+ return ErrDeleteTableAgain.WithCausef("assign table may have
been deleted, clusterID:%d, schemaID:%d, tableName:%s", req.ClusterID,
req.SchemaID, req.TableName)
+ }
+
+ return nil
+}
+
+func (s *metaStorageImpl) ListTableAssignedShard(ctx context.Context, req
ListAssignTableRequest) (ListTableAssignedShardResult, error) {
+ key := makeTableAssignPrefixKey(s.rootPath, uint32(req.ClusterID),
uint32(req.SchemaID))
+ rangeLimit := s.opts.MaxScanLimit
+
+ var tableAssigns []TableAssign
+ do := func(key string, value []byte) error {
+ tableName := etcdutil.GetLastPathSegment(key)
+ shardIDStr := string(value)
+ shardID, err := strconv.ParseUint(shardIDStr, 10, 32)
+ if err != nil {
+ return err
+ }
+ tableAssigns = append(tableAssigns, TableAssign{
+ TableName: tableName,
+ ShardID: ShardID(shardID),
+ })
+ return nil
+ }
+
+ if err := etcdutil.ScanWithPrefix(ctx, s.client, key, do); err != nil {
+ return ListTableAssignedShardResult{}, errors.WithMessagef(err,
"scan tables, clusterID:%d, schemaID:%d, prefix key:%s, range limit:%d",
req.ClusterID, req.SchemaID, key, rangeLimit)
+ }
+
+ return ListTableAssignedShardResult{TableAssigns: tableAssigns}, nil
+}
+
func (s *metaStorageImpl) createNShardViews(ctx context.Context, clusterID
ClusterID, shardViews []ShardView, ifConds []clientv3.Cmp, opCreates
[]clientv3.Op) error {
for _, shardView := range shardViews {
shardViewPB := convertShardViewToPB(shardView)
diff --git a/server/storage/types.go b/server/storage/types.go
index baff5e2..5711283 100644
--- a/server/storage/types.go
+++ b/server/storage/types.go
@@ -141,6 +141,28 @@ type DeleteTableRequest struct {
TableName string
}
+type AssignTableToShardRequest struct {
+ ClusterID ClusterID
+ SchemaID SchemaID
+ TableName string
+ ShardID ShardID
+}
+
+type DeleteTableAssignedRequest struct {
+ ClusterID ClusterID
+ SchemaID SchemaID
+ TableName string
+}
+
+type ListAssignTableRequest struct {
+ ClusterID ClusterID
+ SchemaID SchemaID
+}
+
+type ListTableAssignedShardResult struct {
+ TableAssigns []TableAssign
+}
+
type CreateShardViewsRequest struct {
ClusterID ClusterID
ShardViews []ShardView
@@ -232,6 +254,11 @@ func (t Table) IsPartitioned() bool {
return t.PartitionInfo.Info != nil
}
+type TableAssign struct {
+ TableName string
+ ShardID ShardID
+}
+
type ShardView struct {
ShardID ShardID
Version uint64
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]