This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 90254d856d4 branch-2.1: [refactor](binlog) put recording dropped
resource into a seperate method #43938 (#43966)
90254d856d4 is described below
commit 90254d856d4bfb0a2864514c93719e898b456c0b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Nov 16 20:55:58 2024 +0800
branch-2.1: [refactor](binlog) put recording dropped resource into a
seperate method #43938 (#43966)
Cherry-picked from #43938
Co-authored-by: walter <[email protected]>
---
.../java/org/apache/doris/binlog/DBBinlog.java | 110 +++++++++++----------
1 file changed, 57 insertions(+), 53 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 86cf8085a42..e2eef7966be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -121,34 +121,7 @@ public class DBBinlog {
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
-
- if (binlog.getType() == TBinlogType.DROP_PARTITION) {
- DropPartitionInfo info = DropPartitionInfo.fromJson(binlog.data);
- if (info != null && info.getPartitionId() > 0) {
- droppedPartitions.add(Pair.of(info.getPartitionId(),
binlog.getCommitSeq()));
- }
- } else if (binlog.getType() == TBinlogType.DROP_TABLE) {
- DropTableRecord record = DropTableRecord.fromJson(binlog.data);
- if (record != null && record.getTableId() > 0) {
- droppedTables.add(Pair.of(record.getTableId(),
binlog.getCommitSeq()));
- }
- } else if (binlog.getType() == TBinlogType.ALTER_JOB) {
- AlterJobRecord record = AlterJobRecord.fromJson(binlog.data);
- if (record != null && record.isSchemaChangeJob() &&
record.isJobFinished()) {
- for (Long indexId : record.getOriginIndexIdList()) {
- if (indexId != null && indexId > 0) {
- droppedIndexes.add(Pair.of(indexId,
binlog.getCommitSeq()));
- }
- }
- }
- } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE) {
- TruncateTableRecord record =
TruncateTableRecord.fromJson(binlog.data);
- if (record != null) {
- for (long partitionId : record.getOldPartitionIds()) {
- droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
- }
- }
- }
+ recordDroppedResources(binlog);
if (tableIds == null) {
return;
@@ -202,31 +175,7 @@ public class DBBinlog {
return;
}
- if (binlog.getType() == TBinlogType.DROP_PARTITION && raw
instanceof DropPartitionInfo) {
- long partitionId = ((DropPartitionInfo) raw).getPartitionId();
- if (partitionId > 0) {
- droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
- }
- } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw
instanceof DropTableRecord) {
- long tableId = ((DropTableRecord) raw).getTableId();
- if (tableId > 0) {
- droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
- }
- } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw
instanceof AlterJobRecord) {
- AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
- if (alterJobRecord.isJobFinished() &&
alterJobRecord.isSchemaChangeJob()) {
- for (Long indexId : alterJobRecord.getOriginIndexIdList())
{
- if (indexId != null && indexId > 0) {
- droppedIndexes.add(Pair.of(indexId,
binlog.getCommitSeq()));
- }
- }
- }
- } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw
instanceof TruncateTableRecord) {
- TruncateTableRecord truncateTableRecord =
(TruncateTableRecord) raw;
- for (long partitionId :
truncateTableRecord.getOldPartitionIds()) {
- droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
- }
- }
+ recordDroppedResources(binlog, raw);
switch (binlog.getType()) {
case CREATE_TABLE:
@@ -670,4 +619,59 @@ public class DBBinlog {
lock.readLock().unlock();
}
}
+
+ private void recordDroppedResources(TBinlog binlog) {
+ recordDroppedResources(binlog, null);
+ }
+
+ // A method to record the dropped tables, indexes, and partitions.
+ private void recordDroppedResources(TBinlog binlog, Object raw) {
+ if (raw == null) {
+ switch (binlog.getType()) {
+ case DROP_PARTITION:
+ raw = DropPartitionInfo.fromJson(binlog.data);
+ break;
+ case DROP_TABLE:
+ raw = DropTableRecord.fromJson(binlog.data);
+ break;
+ case ALTER_JOB:
+ raw = AlterJobRecord.fromJson(binlog.data);
+ break;
+ case TRUNCATE_TABLE:
+ raw = TruncateTableRecord.fromJson(binlog.data);
+ break;
+ default:
+ break;
+ }
+ if (raw == null) {
+ return;
+ }
+ }
+
+ if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof
DropPartitionInfo) {
+ long partitionId = ((DropPartitionInfo) raw).getPartitionId();
+ if (partitionId > 0) {
+ droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
+ }
+ } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw
instanceof DropTableRecord) {
+ long tableId = ((DropTableRecord) raw).getTableId();
+ if (tableId > 0) {
+ droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
+ }
+ } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof
AlterJobRecord) {
+ AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
+ if (alterJobRecord.isJobFinished() &&
alterJobRecord.isSchemaChangeJob()) {
+ for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
+ if (indexId != null && indexId > 0) {
+ droppedIndexes.add(Pair.of(indexId,
binlog.getCommitSeq()));
+ }
+ }
+ }
+ } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw
instanceof TruncateTableRecord) {
+ TruncateTableRecord truncateTableRecord = (TruncateTableRecord)
raw;
+ for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
+ droppedPartitions.add(Pair.of(partitionId,
binlog.getCommitSeq()));
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]