This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 93795442a4 [Fix](CCR) Binlog config is missed when create replica task
(#21397)
93795442a4 is described below
commit 93795442a4ec824c3caad3ccb5e749b73b319b42
Author: DeadlineFen <[email protected]>
AuthorDate: Wed Jul 5 10:15:13 2023 +0800
[Fix](CCR) Binlog config is missed when create replica task (#21397)
---
.../java/org/apache/doris/alter/RollupJobV2.java | 5 ++++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 5 ++++-
.../java/org/apache/doris/backup/RestoreJob.java | 12 ++++++++++-
.../apache/doris/datasource/InternalCatalog.java | 23 +++++++++++++++-------
.../org/apache/doris/master/ReportHandler.java | 6 +++++-
.../org/apache/doris/task/CreateReplicaTask.java | 12 ++++++++++-
.../java/org/apache/doris/task/AgentTaskTest.java | 2 +-
7 files changed, 52 insertions(+), 13 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index cfb30f3492..908fffc6bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -27,6 +27,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -243,6 +244,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
tbl.readLock();
try {
+ BinlogConfig binlogConfig = new
BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry :
this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
@@ -281,7 +283,8 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
tbl.storeRowColumn(),
- tbl.isDynamicSchema());
+ tbl.isDynamicSchema(),
+ binlogConfig);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId),
baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 191629860e..54dac1039f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -237,6 +238,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
try {
Preconditions.checkState(tbl.getState() ==
OlapTableState.SCHEMA_CHANGE);
+ BinlogConfig binlogConfig = new
BinlogConfig(tbl.getBinlogConfig());
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
@@ -278,7 +280,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
tbl.storeRowColumn(),
- tbl.isDynamicSchema());
+ tbl.isDynamicSchema(),
+ binlogConfig);
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId,
shadowIdxId)
.get(shadowTabletId), originSchemaHash);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 2067625540..a0237f03d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -24,6 +24,7 @@ import
org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
import org.apache.doris.backup.RestoreFileMapping.IdChain;
import org.apache.doris.backup.Status.ErrCode;
+import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -999,6 +1000,14 @@ public class RestoreJob extends AbstractJob {
private void createReplicas(Database db, AgentBatchTask batchTask,
OlapTable localTbl, Partition restorePart) {
Set<String> bfColumns = localTbl.getCopiedBfColumns();
double bfFpp = localTbl.getBfFpp();
+
+ BinlogConfig binlogConfig;
+ localTbl.readLock();
+ try {
+ binlogConfig = new BinlogConfig(localTbl.getBinlogConfig());
+ } finally {
+ localTbl.readUnlock();
+ }
for (MaterializedIndex restoredIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
@@ -1024,7 +1033,8 @@ public class RestoreJob extends AbstractJob {
localTbl.enableSingleReplicaCompaction(),
localTbl.skipWriteIndexOnLoad(),
localTbl.storeRowColumn(),
- localTbl.isDynamicSchema());
+ localTbl.isDynamicSchema(),
+ binlogConfig);
task.setInRestoreMode(true);
batchTask.addTask(task);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 221d36695a..6998338aaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1344,6 +1344,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
Map<Long, MaterializedIndexMeta> indexIdToMeta;
Set<String> bfColumns;
String partitionName = singlePartitionDesc.getPartitionName();
+ BinlogConfig binlogConfig;
// check
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
@@ -1466,6 +1467,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
indexIdToMeta = olapTable.getCopiedIndexIdToMeta();
bfColumns = olapTable.getCopiedBfColumns();
+
+ // get BinlogConfig
+ binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
} finally {
@@ -1505,7 +1509,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
singlePartitionDesc.getTabletType(),
olapTable.getCompressionType(), olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer,
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
- olapTable.skipWriteIndexOnLoad(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema());
+ olapTable.skipWriteIndexOnLoad(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
+ binlogConfig);
// check again
olapTable = db.getOlapTableOrDdlException(tableName);
@@ -1732,7 +1737,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite,
String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
boolean enableSingleReplicaCompaction, boolean
skipWriteIndexOnLoad,
- boolean storeRowColumn, boolean isDynamicSchema) throws
DdlException {
+ boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig
binlogConfig) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId,
IndexState.NORMAL);
@@ -1795,7 +1800,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
storageMedium, schema, bfColumns, bfFpp,
countDownLatch, indexes, isInMemory, tabletType,
dataSortInfo, compressionType,
enableUniqueKeyMergeOnWrite, storagePolicy,
disableAutoCompaction,
enableSingleReplicaCompaction, skipWriteIndexOnLoad,
- storeRowColumn, isDynamicSchema);
+ storeRowColumn, isDynamicSchema, binlogConfig);
task.setStorageFormat(storageFormat);
batchTask.addTask(task);
@@ -2086,12 +2091,13 @@ public class InternalCatalog implements
CatalogIf<Database> {
Map<String, String> binlogConfigMap =
PropertyAnalyzer.analyzeBinlogConfig(properties);
if (binlogConfigMap != null) {
BinlogConfig binlogConfig = new BinlogConfig();
- binlogConfig.mergeFromProperties(properties);
+ binlogConfig.mergeFromProperties(binlogConfigMap);
olapTable.setBinlogConfig(binlogConfig);
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
+ BinlogConfig binlogConfigForTask = new
BinlogConfig(olapTable.getBinlogConfig());
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// if this is an unpartitioned table, we should analyze data
property and replication num here.
@@ -2264,7 +2270,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad,
- storeRowColumn, isDynamicSchema);
+ storeRowColumn, isDynamicSchema, binlogConfigForTask);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST) {
@@ -2330,7 +2336,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer,
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad,
- storeRowColumn, isDynamicSchema);
+ storeRowColumn, isDynamicSchema,
binlogConfigForTask);
olapTable.addPartition(partition);
}
} else {
@@ -2692,6 +2698,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
Database db = (Database) getDbOrDdlException(dbTbl.getDb());
OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
+ BinlogConfig binlogConfig;
olapTable.readLock();
try {
olapTable.checkNormalStateForAlter();
@@ -2716,6 +2723,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
return;
}
copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(),
IndexExtState.VISIBLE, false);
+
+ binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
} finally {
olapTable.readUnlock();
}
@@ -2747,7 +2756,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(),
- olapTable.storeRowColumn(),
olapTable.isDynamicSchema());
+ olapTable.storeRowColumn(),
olapTable.isDynamicSchema(), binlogConfig);
newPartitions.add(newPartition);
}
} catch (DdlException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index abccbf4e7b..d506eb2432 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -18,6 +18,7 @@
package org.apache.doris.master;
+import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -744,6 +745,8 @@ public class ReportHandler extends Daemon {
continue;
}
+ BinlogConfig binlogConfig = new
BinlogConfig(olapTable.getBinlogConfig());
+
ReplicaState state = replica.getState();
if (state == ReplicaState.NORMAL || state ==
ReplicaState.SCHEMA_CHANGE) {
// if state is PENDING / ROLLUP / CLONE
@@ -782,7 +785,8 @@ public class ReportHandler extends Daemon {
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(),
- olapTable.storeRowColumn(),
olapTable.isDynamicSchema());
+ olapTable.storeRowColumn(),
olapTable.isDynamicSchema(),
+ binlogConfig);
createReplicaTask.setIsRecoverTask(true);
createReplicaBatchTask.addTask(createReplicaTask);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index f416e91472..49bd33fb5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -19,6 +19,7 @@ package org.apache.doris.task;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.DataSortInfo;
+import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
@@ -106,6 +107,8 @@ public class CreateReplicaTask extends AgentTask {
private boolean storeRowColumn;
+ private BinlogConfig binlogConfig;
+
public CreateReplicaTask(long backendId, long dbId, long tableId, long
partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int
schemaHash, long version,
KeysType keysType, TStorageType storageType,
@@ -121,7 +124,8 @@ public class CreateReplicaTask extends AgentTask {
boolean enableSingleReplicaCompaction,
boolean skipWriteIndexOnLoad,
boolean storeRowColumn,
- boolean isDynamicSchema) {
+ boolean isDynamicSchema,
+ BinlogConfig binlogConfig) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId,
indexId, tabletId);
this.replicaId = replicaId;
@@ -159,6 +163,7 @@ public class CreateReplicaTask extends AgentTask {
this.enableSingleReplicaCompaction = enableSingleReplicaCompaction;
this.skipWriteIndexOnLoad = skipWriteIndexOnLoad;
this.storeRowColumn = storeRowColumn;
+ this.binlogConfig = binlogConfig;
}
public void setIsRecoverTask(boolean isRecoverTask) {
@@ -295,6 +300,11 @@ public class CreateReplicaTask extends AgentTask {
createTabletReq.setTabletType(tabletType);
createTabletReq.setCompressionType(compressionType);
createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+
+ if (binlogConfig != null) {
+ createTabletReq.setBinlogConfig(binlogConfig.toThrift());
+ }
+
return createTabletReq;
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index b9ce1c33a8..cf19d7d918 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,7 +107,7 @@ public class AgentTaskTest {
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId,
partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1,
version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false,
TTabletType.TABLET_TYPE_DISK, null,
- TCompressionType.LZ4F, false, "", false, false, false, false,
false);
+ TCompressionType.LZ4F, false, "", false, false, false, false,
false, null);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1,
schemaHash1, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]