This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c06060eb647ada48245c583c04177a1cff3f5f80 Author: DeadlineFen <[email protected]> AuthorDate: Wed Jul 5 10:15:13 2023 +0800 [Fix](CCR) Binlog config is missed when create replica task (#21397) --- .../java/org/apache/doris/alter/RollupJobV2.java | 5 ++++- .../org/apache/doris/alter/SchemaChangeJobV2.java | 5 ++++- .../java/org/apache/doris/backup/RestoreJob.java | 12 ++++++++++- .../apache/doris/datasource/InternalCatalog.java | 23 +++++++++++++++------- .../org/apache/doris/master/ReportHandler.java | 6 +++++- .../org/apache/doris/task/CreateReplicaTask.java | 12 ++++++++++- .../java/org/apache/doris/task/AgentTaskTest.java | 2 +- 7 files changed, 52 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index cfb30f3492..908fffc6bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -27,6 +27,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -243,6 +244,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { tbl.readLock(); try { + BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) { long partitionId = entry.getKey(); @@ -281,7 +283,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), tbl.storeRowColumn(), - tbl.isDynamicSchema()); + tbl.isDynamicSchema(), + binlogConfig); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 191629860e..54dac1039f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -237,6 +238,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); + BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); if (partition == null) { @@ -278,7 +280,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), tbl.storeRowColumn(), - tbl.isDynamicSchema()); + tbl.isDynamicSchema(), + binlogConfig); createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId) .get(shadowTabletId), originSchemaHash); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 2067625540..a0237f03d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -24,6 +24,7 @@ import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo; import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo; import org.apache.doris.backup.RestoreFileMapping.IdChain; import org.apache.doris.backup.Status.ErrCode; +import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -999,6 +1000,14 @@ public class RestoreJob extends AbstractJob { private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable localTbl, Partition restorePart) { Set<String> bfColumns = localTbl.getCopiedBfColumns(); double bfFpp = localTbl.getBfFpp(); + + BinlogConfig binlogConfig; + localTbl.readLock(); + try { + binlogConfig = new BinlogConfig(localTbl.getBinlogConfig()); + } finally { + localTbl.readUnlock(); + } for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) { MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId()); for (Tablet restoreTablet : restoredIdx.getTablets()) { @@ -1024,7 +1033,8 @@ public class RestoreJob extends AbstractJob { localTbl.enableSingleReplicaCompaction(), localTbl.skipWriteIndexOnLoad(), localTbl.storeRowColumn(), - localTbl.isDynamicSchema()); + localTbl.isDynamicSchema(), + binlogConfig); task.setInRestoreMode(true); batchTask.addTask(task); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 1f454a1c69..4d0a054943 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1342,6 +1342,7 @@ public class InternalCatalog implements CatalogIf<Database> { Map<Long, MaterializedIndexMeta> indexIdToMeta; Set<String> bfColumns; String partitionName = singlePartitionDesc.getPartitionName(); + BinlogConfig binlogConfig; // check OlapTable olapTable = db.getOlapTableOrDdlException(tableName); @@ -1464,6 +1465,9 @@ public class InternalCatalog implements CatalogIf<Database> { indexIdToMeta = olapTable.getCopiedIndexIdToMeta(); bfColumns = olapTable.getCopiedBfColumns(); + + // get BinlogConfig + binlogConfig = new BinlogConfig(olapTable.getBinlogConfig()); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } finally { @@ -1503,7 +1507,8 @@ public class InternalCatalog implements CatalogIf<Database> { singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), - olapTable.skipWriteIndexOnLoad(), olapTable.storeRowColumn(), olapTable.isDynamicSchema()); + olapTable.skipWriteIndexOnLoad(), olapTable.storeRowColumn(), olapTable.isDynamicSchema(), + binlogConfig); // check again olapTable = db.getOlapTableOrDdlException(tableName); @@ -1730,7 +1735,7 @@ public class InternalCatalog implements CatalogIf<Database> { DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction, boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad, - boolean storeRowColumn, boolean isDynamicSchema) throws DdlException { + boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig binlogConfig) throws DdlException { // create base index first. Preconditions.checkArgument(baseIndexId != -1); MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL); @@ -1793,7 +1798,7 @@ public class InternalCatalog implements CatalogIf<Database> { storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType, dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, storagePolicy, disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad, - storeRowColumn, isDynamicSchema); + storeRowColumn, isDynamicSchema, binlogConfig); task.setStorageFormat(storageFormat); batchTask.addTask(task); @@ -2071,12 +2076,13 @@ public class InternalCatalog implements CatalogIf<Database> { Map<String, String> binlogConfigMap = PropertyAnalyzer.analyzeBinlogConfig(properties); if (binlogConfigMap != null) { BinlogConfig binlogConfig = new BinlogConfig(); - binlogConfig.mergeFromProperties(properties); + binlogConfig.mergeFromProperties(binlogConfigMap); olapTable.setBinlogConfig(binlogConfig); } } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } + BinlogConfig binlogConfigForTask = new BinlogConfig(olapTable.getBinlogConfig()); if (partitionInfo.getType() == PartitionType.UNPARTITIONED) { // if this is an unpartitioned table, we should analyze data property and replication num here. @@ -2249,7 +2255,7 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, - storeRowColumn, isDynamicSchema); + storeRowColumn, isDynamicSchema, binlogConfigForTask); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { @@ -2315,7 +2321,7 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, - storeRowColumn, isDynamicSchema); + storeRowColumn, isDynamicSchema, binlogConfigForTask); olapTable.addPartition(partition); } } else { @@ -2677,6 +2683,7 @@ public class InternalCatalog implements CatalogIf<Database> { Database db = (Database) getDbOrDdlException(dbTbl.getDb()); OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl()); + BinlogConfig binlogConfig; olapTable.readLock(); try { olapTable.checkNormalStateForAlter(); @@ -2701,6 +2708,8 @@ public class InternalCatalog implements CatalogIf<Database> { return; } copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), IndexExtState.VISIBLE, false); + + binlogConfig = new BinlogConfig(olapTable.getBinlogConfig()); } finally { olapTable.readUnlock(); } @@ -2732,7 +2741,7 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(), idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(), - olapTable.storeRowColumn(), olapTable.isDynamicSchema()); + olapTable.storeRowColumn(), olapTable.isDynamicSchema(), binlogConfig); newPartitions.add(newPartition); } } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index abccbf4e7b..d506eb2432 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -18,6 +18,7 @@ package org.apache.doris.master; +import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -744,6 +745,8 @@ public class ReportHandler extends Daemon { continue; } + BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig()); + ReplicaState state = replica.getState(); if (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE) { // if state is PENDING / ROLLUP / CLONE @@ -782,7 +785,8 @@ public class ReportHandler extends Daemon { olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(), - olapTable.storeRowColumn(), olapTable.isDynamicSchema()); + olapTable.storeRowColumn(), olapTable.isDynamicSchema(), + binlogConfig); createReplicaTask.setIsRecoverTask(true); createReplicaBatchTask.addTask(createReplicaTask); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index f416e91472..49bd33fb5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -19,6 +19,7 @@ package org.apache.doris.task; import org.apache.doris.alter.SchemaChangeHandler; import org.apache.doris.analysis.DataSortInfo; +import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Index; @@ -106,6 +107,8 @@ public class CreateReplicaTask extends AgentTask { private boolean storeRowColumn; + private BinlogConfig binlogConfig; + public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, long replicaId, short shortKeyColumnCount, int schemaHash, long version, KeysType keysType, TStorageType storageType, @@ -121,7 +124,8 @@ public class CreateReplicaTask extends AgentTask { boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad, boolean storeRowColumn, - boolean isDynamicSchema) { + boolean isDynamicSchema, + BinlogConfig binlogConfig) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); this.replicaId = replicaId; @@ -159,6 +163,7 @@ public class CreateReplicaTask extends AgentTask { this.enableSingleReplicaCompaction = enableSingleReplicaCompaction; this.skipWriteIndexOnLoad = skipWriteIndexOnLoad; this.storeRowColumn = storeRowColumn; + this.binlogConfig = binlogConfig; } public void setIsRecoverTask(boolean isRecoverTask) { @@ -295,6 +300,11 @@ public class CreateReplicaTask extends AgentTask { createTabletReq.setTabletType(tabletType); createTabletReq.setCompressionType(compressionType); createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite); + + if (binlogConfig != null) { + createTabletReq.setBinlogConfig(binlogConfig.toThrift()); + } + return createTabletReq; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index b9ce1c33a8..cf19d7d918 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -107,7 +107,7 @@ public class AgentTaskTest { createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null, - TCompressionType.LZ4F, false, "", false, false, false, false, false); + TCompressionType.LZ4F, false, "", false, false, false, false, false, null); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
