This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 93795442a4 [Fix](CCR) Binlog config is missed when create replica task 
(#21397)
93795442a4 is described below

commit 93795442a4ec824c3caad3ccb5e749b73b319b42
Author: DeadlineFen <[email protected]>
AuthorDate: Wed Jul 5 10:15:13 2023 +0800

    [Fix](CCR) Binlog config is missed when create replica task (#21397)
---
 .../java/org/apache/doris/alter/RollupJobV2.java   |  5 ++++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  5 ++++-
 .../java/org/apache/doris/backup/RestoreJob.java   | 12 ++++++++++-
 .../apache/doris/datasource/InternalCatalog.java   | 23 +++++++++++++++-------
 .../org/apache/doris/master/ReportHandler.java     |  6 +++++-
 .../org/apache/doris/task/CreateReplicaTask.java   | 12 ++++++++++-
 .../java/org/apache/doris/task/AgentTaskTest.java  |  2 +-
 7 files changed, 52 insertions(+), 13 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index cfb30f3492..908fffc6bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -27,6 +27,7 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -243,6 +244,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
 
         tbl.readLock();
         try {
+            BinlogConfig binlogConfig = new 
BinlogConfig(tbl.getBinlogConfig());
             Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
             for (Map.Entry<Long, MaterializedIndex> entry : 
this.partitionIdToRollupIndex.entrySet()) {
                 long partitionId = entry.getKey();
@@ -281,7 +283,8 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 tbl.enableSingleReplicaCompaction(),
                                 tbl.skipWriteIndexOnLoad(),
                                 tbl.storeRowColumn(),
-                                tbl.isDynamicSchema());
+                                tbl.isDynamicSchema(),
+                                binlogConfig);
                         
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), 
baseSchemaHash);
                         if (this.storageFormat != null) {
                             
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 191629860e..54dac1039f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -237,6 +238,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         try {
 
             Preconditions.checkState(tbl.getState() == 
OlapTableState.SCHEMA_CHANGE);
+            BinlogConfig binlogConfig = new 
BinlogConfig(tbl.getBinlogConfig());
             for (long partitionId : partitionIndexMap.rowKeySet()) {
                 Partition partition = tbl.getPartition(partitionId);
                 if (partition == null) {
@@ -278,7 +280,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     tbl.enableSingleReplicaCompaction(),
                                     tbl.skipWriteIndexOnLoad(),
                                     tbl.storeRowColumn(),
-                                    tbl.isDynamicSchema());
+                                    tbl.isDynamicSchema(),
+                                    binlogConfig);
 
                             
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, 
shadowIdxId)
                                     .get(shadowTabletId), originSchemaHash);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 2067625540..a0237f03d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
 import org.apache.doris.backup.RestoreFileMapping.IdChain;
 import org.apache.doris.backup.Status.ErrCode;
+import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -999,6 +1000,14 @@ public class RestoreJob extends AbstractJob {
     private void createReplicas(Database db, AgentBatchTask batchTask, 
OlapTable localTbl, Partition restorePart) {
         Set<String> bfColumns = localTbl.getCopiedBfColumns();
         double bfFpp = localTbl.getBfFpp();
+
+        BinlogConfig binlogConfig;
+        localTbl.readLock();
+        try {
+            binlogConfig = new BinlogConfig(localTbl.getBinlogConfig());
+        } finally {
+            localTbl.readUnlock();
+        }
         for (MaterializedIndex restoredIdx : 
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
             MaterializedIndexMeta indexMeta = 
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
             for (Tablet restoreTablet : restoredIdx.getTablets()) {
@@ -1024,7 +1033,8 @@ public class RestoreJob extends AbstractJob {
                             localTbl.enableSingleReplicaCompaction(),
                             localTbl.skipWriteIndexOnLoad(),
                             localTbl.storeRowColumn(),
-                            localTbl.isDynamicSchema());
+                            localTbl.isDynamicSchema(),
+                            binlogConfig);
 
                     task.setInRestoreMode(true);
                     batchTask.addTask(task);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 221d36695a..6998338aaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1344,6 +1344,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         Map<Long, MaterializedIndexMeta> indexIdToMeta;
         Set<String> bfColumns;
         String partitionName = singlePartitionDesc.getPartitionName();
+        BinlogConfig binlogConfig;
 
         // check
         OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
@@ -1466,6 +1467,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
             indexIdToMeta = olapTable.getCopiedIndexIdToMeta();
             bfColumns = olapTable.getCopiedBfColumns();
+
+            // get BinlogConfig
+            binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
         } catch (AnalysisException e) {
             throw new DdlException(e.getMessage());
         } finally {
@@ -1505,7 +1509,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     singlePartitionDesc.getTabletType(), 
olapTable.getCompressionType(), olapTable.getDataSortInfo(),
                     olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, 
idGeneratorBuffer,
                     olapTable.disableAutoCompaction(), 
olapTable.enableSingleReplicaCompaction(),
-                    olapTable.skipWriteIndexOnLoad(), 
olapTable.storeRowColumn(), olapTable.isDynamicSchema());
+                    olapTable.skipWriteIndexOnLoad(), 
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
+                    binlogConfig);
 
             // check again
             olapTable = db.getOlapTableOrDdlException(tableName);
@@ -1732,7 +1737,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, 
String storagePolicy,
             IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
             boolean enableSingleReplicaCompaction, boolean 
skipWriteIndexOnLoad,
-            boolean storeRowColumn, boolean isDynamicSchema) throws 
DdlException {
+            boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig 
binlogConfig) throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
         MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, 
IndexState.NORMAL);
@@ -1795,7 +1800,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             storageMedium, schema, bfColumns, bfFpp, 
countDownLatch, indexes, isInMemory, tabletType,
                             dataSortInfo, compressionType, 
enableUniqueKeyMergeOnWrite, storagePolicy,
                             disableAutoCompaction, 
enableSingleReplicaCompaction, skipWriteIndexOnLoad,
-                            storeRowColumn, isDynamicSchema);
+                            storeRowColumn, isDynamicSchema, binlogConfig);
 
                     task.setStorageFormat(storageFormat);
                     batchTask.addTask(task);
@@ -2086,12 +2091,13 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             Map<String, String> binlogConfigMap = 
PropertyAnalyzer.analyzeBinlogConfig(properties);
             if (binlogConfigMap != null) {
                 BinlogConfig binlogConfig = new BinlogConfig();
-                binlogConfig.mergeFromProperties(properties);
+                binlogConfig.mergeFromProperties(binlogConfigMap);
                 olapTable.setBinlogConfig(binlogConfig);
             }
         } catch (AnalysisException e) {
             throw new DdlException(e.getMessage());
         }
+        BinlogConfig binlogConfigForTask = new 
BinlogConfig(olapTable.getBinlogConfig());
 
         if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
             // if this is an unpartitioned table, we should analyze data 
property and replication num here.
@@ -2264,7 +2270,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         olapTable.getDataSortInfo(), 
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
                         idGeneratorBuffer, olapTable.disableAutoCompaction(),
                         olapTable.enableSingleReplicaCompaction(), 
skipWriteIndexOnLoad,
-                        storeRowColumn, isDynamicSchema);
+                        storeRowColumn, isDynamicSchema, binlogConfigForTask);
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE
                     || partitionInfo.getType() == PartitionType.LIST) {
@@ -2330,7 +2336,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             olapTable.getDataSortInfo(), 
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
                             idGeneratorBuffer, 
olapTable.disableAutoCompaction(),
                             olapTable.enableSingleReplicaCompaction(), 
skipWriteIndexOnLoad,
-                            storeRowColumn, isDynamicSchema);
+                            storeRowColumn, isDynamicSchema, 
binlogConfigForTask);
                     olapTable.addPartition(partition);
                 }
             } else {
@@ -2692,6 +2698,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         Database db = (Database) getDbOrDdlException(dbTbl.getDb());
         OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
 
+        BinlogConfig binlogConfig;
         olapTable.readLock();
         try {
             olapTable.checkNormalStateForAlter();
@@ -2716,6 +2723,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 return;
             }
             copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), 
IndexExtState.VISIBLE, false);
+
+            binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
         } finally {
             olapTable.readUnlock();
         }
@@ -2747,7 +2756,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
                         idGeneratorBuffer, olapTable.disableAutoCompaction(),
                         olapTable.enableSingleReplicaCompaction(), 
olapTable.skipWriteIndexOnLoad(),
-                        olapTable.storeRowColumn(), 
olapTable.isDynamicSchema());
+                        olapTable.storeRowColumn(), 
olapTable.isDynamicSchema(), binlogConfig);
                 newPartitions.add(newPartition);
             }
         } catch (DdlException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index abccbf4e7b..d506eb2432 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -18,6 +18,7 @@
 package org.apache.doris.master;
 
 
+import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -744,6 +745,8 @@ public class ReportHandler extends Daemon {
                         continue;
                     }
 
+                    BinlogConfig binlogConfig = new 
BinlogConfig(olapTable.getBinlogConfig());
+
                     ReplicaState state = replica.getState();
                     if (state == ReplicaState.NORMAL || state == 
ReplicaState.SCHEMA_CHANGE) {
                         // if state is PENDING / ROLLUP / CLONE
@@ -782,7 +785,8 @@ public class ReportHandler extends Daemon {
                                             olapTable.disableAutoCompaction(),
                                             
olapTable.enableSingleReplicaCompaction(),
                                             olapTable.skipWriteIndexOnLoad(),
-                                            olapTable.storeRowColumn(), 
olapTable.isDynamicSchema());
+                                            olapTable.storeRowColumn(), 
olapTable.isDynamicSchema(),
+                                            binlogConfig);
 
                                     createReplicaTask.setIsRecoverTask(true);
                                     
createReplicaBatchTask.addTask(createReplicaTask);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index f416e91472..49bd33fb5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -19,6 +19,7 @@ package org.apache.doris.task;
 
 import org.apache.doris.alter.SchemaChangeHandler;
 import org.apache.doris.analysis.DataSortInfo;
+import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Index;
@@ -106,6 +107,8 @@ public class CreateReplicaTask extends AgentTask {
 
     private boolean storeRowColumn;
 
+    private BinlogConfig binlogConfig;
+
     public CreateReplicaTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId, long tabletId,
                              long replicaId, short shortKeyColumnCount, int 
schemaHash, long version,
                              KeysType keysType, TStorageType storageType,
@@ -121,7 +124,8 @@ public class CreateReplicaTask extends AgentTask {
                              boolean enableSingleReplicaCompaction,
                              boolean skipWriteIndexOnLoad,
                              boolean storeRowColumn,
-                             boolean isDynamicSchema) {
+                             boolean isDynamicSchema,
+                             BinlogConfig binlogConfig) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, 
indexId, tabletId);
 
         this.replicaId = replicaId;
@@ -159,6 +163,7 @@ public class CreateReplicaTask extends AgentTask {
         this.enableSingleReplicaCompaction = enableSingleReplicaCompaction;
         this.skipWriteIndexOnLoad = skipWriteIndexOnLoad;
         this.storeRowColumn = storeRowColumn;
+        this.binlogConfig = binlogConfig;
     }
 
     public void setIsRecoverTask(boolean isRecoverTask) {
@@ -295,6 +300,11 @@ public class CreateReplicaTask extends AgentTask {
         createTabletReq.setTabletType(tabletType);
         createTabletReq.setCompressionType(compressionType);
         
createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+
+        if (binlogConfig != null) {
+            createTabletReq.setBinlogConfig(binlogConfig.toThrift());
+        }
+
         return createTabletReq;
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index b9ce1c33a8..cf19d7d918 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,7 +107,7 @@ public class AgentTaskTest {
         createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, 
partitionId,
                 indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, 
version, KeysType.AGG_KEYS, storageType,
                 TStorageMedium.SSD, columns, null, 0, latch, null, false, 
TTabletType.TABLET_TYPE_DISK, null,
-                TCompressionType.LZ4F, false, "", false, false, false, false, 
false);
+                TCompressionType.LZ4F, false, "", false, false, false, false, 
false, null);
 
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, 
schemaHash1, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to