This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3352d84a8e4 [feat](binlog) Add replace table binlog (#44263)
3352d84a8e4 is described below
commit 3352d84a8e4180cbc88881cca5e3fa73313a8d91
Author: walter <[email protected]>
AuthorDate: Thu Nov 21 17:44:12 2024 +0800
[feat](binlog) Add replace table binlog (#44263)
close #43436
Related PR: https://github.com/selectdb/ccr-syncer/pull/245
---
.../main/java/org/apache/doris/alter/Alter.java | 2 +-
.../org/apache/doris/binlog/BinlogManager.java | 18 ++++++++
.../java/org/apache/doris/binlog/DBBinlog.java | 53 ++++++++++++++++------
.../java/org/apache/doris/persist/BarrierLog.java | 4 ++
.../java/org/apache/doris/persist/EditLog.java | 11 +++--
.../doris/persist/ReplaceTableOperationLog.java | 28 +++++++++++-
.../persist/ReplaceTableOperationLogTest.java | 4 +-
gensrc/thrift/FrontendService.thrift | 4 +-
8 files changed, 101 insertions(+), 23 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 15c8df9195d..ebb194ed6a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -603,7 +603,7 @@ public class Alter {
replaceTableInternal(db, origTable, olapNewTbl, swapTable,
false, isForce);
// write edit log
ReplaceTableOperationLog log = new
ReplaceTableOperationLog(db.getId(),
- origTable.getId(), olapNewTbl.getId(), swapTable,
isForce);
+ origTable.getId(), oldTblName, olapNewTbl.getId(),
newTblName, swapTable, isForce);
Env.getCurrentEnv().getEditLog().logReplaceTable(log);
LOG.info("finish replacing table {} with table {}, is swap:
{}", oldTblName, newTblName, swapTable);
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 6d483a41314..1f785713666 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -33,6 +33,7 @@ import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.ReplacePartitionOperationLog;
+import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
@@ -45,6 +46,7 @@ import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -367,6 +369,22 @@ public class BinlogManager {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
alterViewInfo);
}
+ public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq)
{
+ if (StringUtils.isEmpty(info.getOrigTblName()) ||
StringUtils.isEmpty(info.getNewTblName())) {
+ LOG.warn("skip replace table binlog, because origTblName or
newTblName is empty. info: {}", info);
+ return;
+ }
+
+ long dbId = info.getDbId();
+ List<Long> tableIds = Lists.newArrayList();
+ tableIds.add(info.getOrigTblId());
+ long timestamp = -1;
+ TBinlogType type = TBinlogType.REPLACE_TABLE;
+ String data = info.toJson();
+
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
info);
+ }
+
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long
prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index e2eef7966be..c96e994be91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -22,7 +22,9 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
@@ -626,19 +628,29 @@ public class DBBinlog {
// A method to record the dropped tables, indexes, and partitions.
private void recordDroppedResources(TBinlog binlog, Object raw) {
+ recordDroppedResources(binlog.getType(), binlog.getCommitSeq(),
binlog.getData(), raw);
+ }
+
+ private void recordDroppedResources(TBinlogType binlogType, long
commitSeq, String data, Object raw) {
if (raw == null) {
- switch (binlog.getType()) {
+ switch (binlogType) {
case DROP_PARTITION:
- raw = DropPartitionInfo.fromJson(binlog.data);
+ raw = DropPartitionInfo.fromJson(data);
break;
case DROP_TABLE:
- raw = DropTableRecord.fromJson(binlog.data);
+ raw = DropTableRecord.fromJson(data);
break;
case ALTER_JOB:
- raw = AlterJobRecord.fromJson(binlog.data);
+ raw = AlterJobRecord.fromJson(data);
break;
case TRUNCATE_TABLE:
- raw = TruncateTableRecord.fromJson(binlog.data);
+ raw = TruncateTableRecord.fromJson(data);
+ break;
+ case REPLACE_TABLE:
+ raw = ReplaceTableOperationLog.fromJson(data);
+ break;
+ case BARRIER:
+ raw = BarrierLog.fromJson(data);
break;
default:
break;
@@ -648,29 +660,44 @@ public class DBBinlog {
}
}
- if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof
DropPartitionInfo) {
+ recordDroppedResources(binlogType, commitSeq, raw);
+ }
+
+ private void recordDroppedResources(TBinlogType binlogType, long
commitSeq, Object raw) {
+ if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof
DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
- droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
+ droppedPartitions.add(Pair.of(partitionId, commitSeq));
}
- } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw
instanceof DropTableRecord) {
+ } else if (binlogType == TBinlogType.DROP_TABLE && raw instanceof
DropTableRecord) {
long tableId = ((DropTableRecord) raw).getTableId();
if (tableId > 0) {
- droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
+ droppedTables.add(Pair.of(tableId, commitSeq));
}
- } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof
AlterJobRecord) {
+ } else if (binlogType == TBinlogType.ALTER_JOB && raw instanceof
AlterJobRecord) {
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
if (alterJobRecord.isJobFinished() &&
alterJobRecord.isSchemaChangeJob()) {
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
- droppedIndexes.add(Pair.of(indexId,
binlog.getCommitSeq()));
+ droppedIndexes.add(Pair.of(indexId, commitSeq));
}
}
}
- } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw
instanceof TruncateTableRecord) {
+ } else if (binlogType == TBinlogType.TRUNCATE_TABLE && raw instanceof
TruncateTableRecord) {
TruncateTableRecord truncateTableRecord = (TruncateTableRecord)
raw;
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
- droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
+ droppedPartitions.add(Pair.of(partitionId, commitSeq));
+ }
+ } else if (binlogType == TBinlogType.REPLACE_TABLE && raw instanceof
ReplaceTableOperationLog) {
+ ReplaceTableOperationLog record = (ReplaceTableOperationLog) raw;
+ if (!record.isSwapTable()) {
+ droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
+ }
+ } else if (binlogType == TBinlogType.BARRIER && raw instanceof
BarrierLog) {
+ BarrierLog log = (BarrierLog) raw;
+ // keep compatible with doris 2.0/2.1
+ if (log.hasBinlog()) {
+ recordDroppedResources(log.getBinlogType(), commitSeq,
log.getBinlog(), null);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
index 4a9ce13e03b..86d56fb4a64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -109,6 +109,10 @@ public class BarrierLog implements Writable {
return GsonUtils.GSON.toJson(this);
}
+ public static BarrierLog fromJson(String json) {
+ return GsonUtils.GSON.fromJson(json, BarrierLog.class);
+ }
+
@Override
public String toString() {
return toJson();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index df0cdb092a8..5ae6f62ebb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -306,7 +306,7 @@ public class EditLog {
case OperationType.OP_RENAME_TABLE: {
TableInfo info = (TableInfo) journal.getData();
env.replayRenameTable(info);
-
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
+ env.getBinlogManager().addTableRename(info, logId);
break;
}
case OperationType.OP_MODIFY_VIEW_DEF: {
@@ -318,7 +318,7 @@ public class EditLog {
case OperationType.OP_RENAME_PARTITION: {
TableInfo info = (TableInfo) journal.getData();
env.replayRenamePartition(info);
-
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
+ env.getBinlogManager().addTableRename(info, logId);
break;
}
case OperationType.OP_RENAME_COLUMN: {
@@ -366,7 +366,7 @@ public class EditLog {
case OperationType.OP_RENAME_ROLLUP: {
TableInfo info = (TableInfo) journal.getData();
env.replayRenameRollup(info);
-
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
+
env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
break;
}
case OperationType.OP_LOAD_START:
@@ -898,6 +898,7 @@ public class EditLog {
case OperationType.OP_REPLACE_TABLE: {
ReplaceTableOperationLog log = (ReplaceTableOperationLog)
journal.getData();
env.getAlterInstance().replayReplaceTable(log);
+ env.getBinlogManager().addReplaceTable(log, logId);
break;
}
case OperationType.OP_CREATE_SQL_BLOCK_RULE: {
@@ -1950,7 +1951,9 @@ public class EditLog {
}
public void logReplaceTable(ReplaceTableOperationLog log) {
- logEdit(OperationType.OP_REPLACE_TABLE, log);
+ long logId = logEdit(OperationType.OP_REPLACE_TABLE, log);
+ LOG.info("add replace table binlog, logId: {}, infos: {}", logId, log);
+ Env.getCurrentEnv().getBinlogManager().addReplaceTable(log, logId);
}
public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2
op) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
index 7a685f3741f..6a2b09336e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
@@ -32,17 +32,25 @@ public class ReplaceTableOperationLog implements Writable {
private long dbId;
@SerializedName(value = "origTblId")
private long origTblId;
+ @SerializedName(value = "origTblName")
+ private String origTblName;
@SerializedName(value = "newTblName")
private long newTblId;
+ @SerializedName(value = "actualNewTblName")
+ private String newTblName;
@SerializedName(value = "swapTable")
private boolean swapTable;
@SerializedName(value = "isForce")
private boolean isForce = true; // older version it was force. so keep
same.
- public ReplaceTableOperationLog(long dbId, long origTblId, long newTblId,
boolean swapTable, boolean isForce) {
+ public ReplaceTableOperationLog(long dbId, long origTblId,
+ String origTblName, long newTblId, String newTblName,
+ boolean swapTable, boolean isForce) {
this.dbId = dbId;
this.origTblId = origTblId;
+ this.origTblName = origTblName;
this.newTblId = newTblId;
+ this.newTblName = newTblName;
this.swapTable = swapTable;
this.isForce = isForce;
}
@@ -55,10 +63,18 @@ public class ReplaceTableOperationLog implements Writable {
return origTblId;
}
+ public String getOrigTblName() {
+ return origTblName;
+ }
+
public long getNewTblId() {
return newTblId;
}
+ public String getNewTblName() {
+ return newTblName;
+ }
+
public boolean isSwapTable() {
return swapTable;
}
@@ -67,13 +83,21 @@ public class ReplaceTableOperationLog implements Writable {
return isForce;
}
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
+ public static ReplaceTableOperationLog fromJson(String json) {
+ return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
- public static ReplaceTableOperationLog read(DataInput in) throws
IOException {
+ public static ReplaceTableOperationLog read(DataInput in) throws
IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
index e05d16141ce..ed56e4c7941 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
@@ -34,7 +34,7 @@ public class ReplaceTableOperationLogTest {
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new
FileOutputStream(file));
- ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 3,
true, true);
+ ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2,
"old", 3, "new", true, true);
log.write(dos);
dos.flush();
@@ -48,6 +48,8 @@ public class ReplaceTableOperationLogTest {
Assert.assertTrue(readLog.getNewTblId() == log.getNewTblId());
Assert.assertTrue(readLog.getOrigTblId() == log.getOrigTblId());
Assert.assertTrue(readLog.isSwapTable() == log.isSwapTable());
+
Assert.assertTrue(readLog.getOrigTblName().equals(log.getOrigTblName()));
+ Assert.assertTrue(readLog.getNewTblName().equals(log.getNewTblName()));
// 3. delete files
dis.close();
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index ec2a685098c..47b88552862 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1191,6 +1191,7 @@ enum TBinlogType {
RENAME_COLUMN = 15,
MODIFY_COMMENT = 16,
MODIFY_VIEW_DEF = 17,
+ REPLACE_TABLE = 18,
// Keep some IDs for allocation so that when new binlog types are added in
the
// future, the changes can be picked back to the old versions without
breaking
@@ -1207,8 +1208,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
- MIN_UNKNOWN = 18,
- UNKNOWN_3 = 19,
+ MIN_UNKNOWN = 19,
UNKNOWN_4 = 20,
UNKNOWN_5 = 21,
UNKNOWN_6 = 22,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]