deardeng commented on code in PR #64167:
URL: https://github.com/apache/doris/pull/64167#discussion_r3496982871
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -5877,6 +5940,237 @@ public void modifyTableColocate(Database db, OlapTable
table, String assignedGro
LOG.info("finished modify table's colocation property. table: {}, is
replay: {}", table.getName(), isReplay);
}
+ public void modifyTenantLevelMasterColocate(Database db, OlapTable table,
Map<Tag, String> assignedGroupNameMap,
+ boolean isReplay, Map<Long, TenantLevelColocateGroupInfo>
assignedGroupReplayMap)
+ throws DdlException {
+ if (Config.isCloudMode()) {
+ throw new DdlException("colocate_group not implemented in cloud
mode");
+ }
+ if (!assignedGroupNameMap.isEmpty() &&
Env.getCurrentColocateIndex().isColocateTable(table.getId())) {
+ throw new DdlException("colocate_with conflict with
colocate_group");
+ }
+ if (!assignedGroupNameMap.isEmpty() && !isReplay &&
table.isAutoBucket()) {
+ throw new DdlException("table " + table.getName() + " is auto
buckets");
+ }
+ Map<Tag, String> oldGroupMap =
tenantLevelColocateTableIndex.getMasterGroupNameMapByTable(table.getId());
+ //When the new name is the same as the old name, we return it to
prevent npe
+ if (assignedGroupNameMap.equals(oldGroupMap)) {
+ LOG.warn("modify table[{}] group name same as old group
name,skip.", table.getName());
+ return;
+ }
+ for (Map.Entry<Tag, String> entry : assignedGroupNameMap.entrySet()) {
+ String assignedGroup = entry.getValue();
+ Tag tag = entry.getKey();
+ if
(Env.getCurrentTenantLevelColocateIndex().hasSlaveGroup(table.getId(), tag)) {
+ throw new DdlException("colocate_slave conflict with
colocate_group");
+ }
+ TenantLevelColocateGroupSchema groupSchema =
tenantLevelColocateTableIndex.getGroupSchema(assignedGroup,
+ tag);
+ if (groupSchema == null) {
+ // user set a new colocate group,
+ // check if all partitions all this table has same buckets num
and same replication number
+ PartitionInfo partitionInfo = table.getPartitionInfo();
+ int bucketsNum =
table.getDefaultDistributionInfo().getBucketNum();
+ ReplicaAllocation replicaAlloc =
table.getDefaultReplicaAllocation();
+ int replicaNum = replicaAlloc.getReplicaNumByTag(tag);
+ if (replicaNum <= 0) {
+ throw new DdlException("No replica in " + table.getName()
+ "/" + tag.value);
+ }
+ for (Partition partition : table.getPartitions()) {
+ if (bucketsNum !=
partition.getDistributionInfo().getBucketNum()) {
+ throw new DdlException(
+ "Partitions in table " + table.getName() + "
have different buckets number");
+ }
+ if (replicaNum !=
partitionInfo.getReplicaAllocation(partition.getId()).getReplicaNumByTag(tag)) {
+ throw new DdlException("Partitions in table " +
table.getName()
+ + " have different replica allocation.");
+ }
+ }
+
TenantLevelColocateGroupSchema.checkDynamicPartition(table.getTableProperty().getProperties(),
+ table.getDefaultDistributionInfo());
+ } else {
+ // set to an already exist colocate group, check if this table
can be added to this group.
+ groupSchema.checkMasterColocateSchema(table,
table.getTableProperty().getProperties());
+ }
+ }
+ Map<Tag, Long> assignedColocateIdMap = new HashMap<>();
+ for (Entry<Long, TenantLevelColocateGroupInfo> entry :
assignedGroupReplayMap.entrySet()) {
+ Long groupId = entry.getKey();
+ TenantLevelColocateGroupInfo colocateGroupInfo = entry.getValue();
+ Tag tag = colocateGroupInfo.getTag();
+ assignedColocateIdMap.put(tag, groupId);
+ }
+ for (Entry<Tag, String> entry : oldGroupMap.entrySet()) {
+ Tag tag = entry.getKey();
+ if (assignedGroupNameMap.containsKey(tag)) {
+ continue;
+ }
+ tenantLevelColocateTableIndex.removeMasterTable(table.getId(),
tag);
+ }
+ Map<Long, TenantLevelColocateGroupInfo> result = new HashMap<>();
+ for (Map.Entry<Tag, String> entry : assignedGroupNameMap.entrySet()) {
+ String assignedGroup = entry.getValue();
+ Tag tag = entry.getKey();
+ TenantLevelColocateGroupSchema groupSchema =
tenantLevelColocateTableIndex.getGroupSchema(assignedGroup,
+ tag);
+ Long assignedGroupId = assignedColocateIdMap.get(tag);
+ List<List<Long>> backendsPerBucketSeq;
+ final boolean isGroupExist = groupSchema != null;
+ if (isGroupExist) {
+ backendsPerBucketSeq =
tenantLevelColocateTableIndex.getBackendsPerBucketSeqByGroup(
+ groupSchema.getGroupId());
+ TenantLevelColocateGroupInfo colocateGroupInfo =
TenantLevelColocateGroupInfo.create(
+ groupSchema, backendsPerBucketSeq);
+ result.put(groupSchema.getGroupId(), colocateGroupInfo);
+ } else if (isReplay) {
+ TenantLevelColocateGroupInfo colocateGroupInfo =
assignedGroupReplayMap.get(assignedGroupId);
+ backendsPerBucketSeq =
colocateGroupInfo.getBackendsPerBucketSeq();
+ result.put(assignedGroupId, colocateGroupInfo);
+ } else {
+ // assign to a newly created group, set backends sequence.
+ // we arbitrarily choose a tablet backends sequence from this
table,
+ // let the colocation balancer do the work.
+ backendsPerBucketSeq =
table.getArbitraryTabletBucketsSeq().get(tag);
+ }
+ String oldGroup = oldGroupMap.get(tag);
+ if (Objects.equals(oldGroup, assignedGroup)) {
+ continue;
+ }
+ // change group after getting backends sequence(if has), in case
'getArbitraryTabletBucketsSeq' failed
+ groupSchema =
tenantLevelColocateTableIndex.changeMasterGroup(table, oldGroup, assignedGroup,
tag,
+ assignedGroupId);
+ long groupId = groupSchema.getGroupId();
+ TenantLevelColocateGroupInfo colocateGroupInfo =
TenantLevelColocateGroupInfo.create(groupSchema,
+ backendsPerBucketSeq);
+ result.put(groupId, colocateGroupInfo);
+ if (!isGroupExist) {
+ Preconditions.checkNotNull(backendsPerBucketSeq);
+ tenantLevelColocateTableIndex.addBackendsPerBucketSeq(groupId,
backendsPerBucketSeq);
+ }
+ // set this group as unstable
+ tenantLevelColocateTableIndex.markMasterGroupUnstable(groupId,
"Colocation group modified by user",
+ false /* edit log is along with modify table log */);
+ }
+ if (!isReplay) {
+ TenantLevelColocateTableInfo info = new
TenantLevelColocateTableInfo(db.getId(), table.getId(), result);
+ editLog.logModifyTenantLevelTableColocate(info);
+ }
+ LOG.info("finished modify table's colocation property. table: {}, is
replay: {}", table.getName(), isReplay);
+ }
+
+ public void
replayModifyTenantLevelMasterColocate(TenantLevelColocateTableInfo info) throws
MetaNotFoundException {
+ long dbId = info.getDbId();
+ Preconditions.checkState(dbId != 0, "replay modify table colocate
failed, table id: " + info.getTableId());
+ long tableId = info.getTableId();
+ Database db = getInternalCatalog().getDbOrMetaException(dbId);
+ OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId,
TableType.OLAP);
+ Map<Tag, String> assignedColocateNameMap = new HashMap<>();
+ for (Entry<Long, TenantLevelColocateGroupInfo> entry :
info.getGroupMap().entrySet()) {
+ TenantLevelColocateGroupInfo colocateGroupInfo = entry.getValue();
+ assignedColocateNameMap.put(colocateGroupInfo.getTag(),
colocateGroupInfo.getName());
+ }
+ olapTable.writeLock();
+ try {
+ modifyTenantLevelMasterColocate(db, olapTable,
assignedColocateNameMap, true,
+ info.getGroupMap());
+ } catch (DdlException e) {
+ // should not happen
+ LOG.warn("failed to replay modify table colocate", e);
+ } finally {
+ olapTable.writeUnlock();
+ }
+ }
+
+ public void modifyTenantLevelSlaveColocate(Database db, OlapTable table,
Map<Tag, String> assignedGroupNameMap,
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]