This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new fc942c435f7 [feat](binlog) Support drop rollup binlog #44677 (#44743)
fc942c435f7 is described below
commit fc942c435f7c3b13347a9404bf3f1e41a4206f49
Author: walter <[email protected]>
AuthorDate: Fri Nov 29 11:30:37 2024 +0800
[feat](binlog) Support drop rollup binlog #44677 (#44743)
cherry pick from #44677
---
.../doris/alter/MaterializedViewHandler.java | 14 +++++------
.../org/apache/doris/binlog/BinlogManager.java | 29 +++++++++++++++-------
.../java/org/apache/doris/binlog/DBBinlog.java | 9 +++++++
.../apache/doris/datasource/InternalCatalog.java | 4 +--
.../org/apache/doris/persist/BatchDropInfo.java | 12 +++++++--
.../java/org/apache/doris/persist/DropInfo.java | 20 +++++++++++++--
.../java/org/apache/doris/persist/EditLog.java | 27 ++++++++++++++------
.../doris/persist/DropAndRecoverInfoTest.java | 10 ++++----
gensrc/thrift/FrontendService.thrift | 4 +--
9 files changed, 92 insertions(+), 37 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 1ed38748e18..b50bec9cb02 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -915,14 +915,12 @@ public class MaterializedViewHandler extends AlterHandler
{
}
// drop data in memory
- Set<Long> indexIdSet = new HashSet<>();
- Set<String> rollupNameSet = new HashSet<>();
+ Map<Long, String> rollupNameMap = new HashMap<>();
for (AlterClause alterClause : dropRollupClauses) {
DropRollupClause dropRollupClause = (DropRollupClause)
alterClause;
String rollupIndexName = dropRollupClause.getRollupName();
long rollupIndexId = dropMaterializedView(rollupIndexName,
olapTable);
- indexIdSet.add(rollupIndexId);
- rollupNameSet.add(rollupIndexName);
+ rollupNameMap.put(rollupIndexId, rollupIndexName);
}
// batch log drop rollup operation
@@ -930,9 +928,9 @@ public class MaterializedViewHandler extends AlterHandler {
long dbId = db.getId();
long tableId = olapTable.getId();
String tableName = olapTable.getName();
- editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId,
tableName, indexIdSet));
+ editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId,
tableName, rollupNameMap));
LOG.info("finished drop rollup index[{}] in table[{}]",
- String.join("", rollupNameSet), olapTable.getName());
+ String.join("", rollupNameMap.values()),
olapTable.getName());
} finally {
olapTable.writeUnlock();
}
@@ -950,8 +948,8 @@ public class MaterializedViewHandler extends AlterHandler {
long mvIndexId = dropMaterializedView(mvName, olapTable);
// Step3: log drop mv operation
EditLog editLog = Env.getCurrentEnv().getEditLog();
- editLog.logDropRollup(
- new DropInfo(db.getId(), olapTable.getId(),
olapTable.getName(), mvIndexId, false, false, 0));
+ editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(),
olapTable.getName(),
+ mvIndexId, mvName, false, false, 0));
LOG.info("finished drop materialized view [{}] in table [{}]",
mvName, olapTable.getName());
} catch (MetaNotFoundException e) {
if (dropMaterializedViewStmt.isIfExists()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 8886c4b4104..5606ceeffa2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -30,6 +30,7 @@ import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BinlogGcInfo;
+import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
@@ -399,24 +400,34 @@ public class BinlogManager {
public void
addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info,
long commitSeq) {
long dbId = info.getDbId();
- List<Long> tableIds = Lists.newArrayList();
- tableIds.add(info.getTableId());
- long timestamp = -1;
+ long tableId = info.getTableId();
TBinlogType type =
TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES;
String data = info.toJson();
-
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
info);
+ BarrierLog log = new BarrierLog(dbId, tableId, type, data);
+ addBarrierLog(log, commitSeq);
}
public void addIndexChangeJob(IndexChangeJob indexChangeJob, long
commitSeq) {
long dbId = indexChangeJob.getDbId();
- List<Long> tableIds = Lists.newArrayList();
- tableIds.add(indexChangeJob.getTableId());
- long timestamp = -1;
+ long tableId = indexChangeJob.getTableId();
TBinlogType type = TBinlogType.INDEX_CHANGE_JOB;
String data = indexChangeJob.toJson();
+ BarrierLog log = new BarrierLog(dbId, tableId, type, data);
+ addBarrierLog(log, commitSeq);
+ }
+
+ public void addDropRollup(DropInfo info, long commitSeq) {
+ if (StringUtils.isEmpty(info.getIndexName())) {
+ LOG.warn("skip drop rollup binlog, because indexName is empty.
info: {}", info);
+ return;
+ }
- addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
indexChangeJob);
+ long dbId = info.getDbId();
+ long tableId = info.getTableId();
+ TBinlogType type = TBinlogType.DROP_ROLLUP;
+ String data = info.toJson();
+ BarrierLog log = new BarrierLog(dbId, tableId, type, data);
+ addBarrierLog(log, commitSeq);
}
// get binlog by dbId, return first binlog.version > version
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index c96e994be91..b78ed389a0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.BarrierLog;
+import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
@@ -649,6 +650,9 @@ public class DBBinlog {
case REPLACE_TABLE:
raw = ReplaceTableOperationLog.fromJson(data);
break;
+ case DROP_ROLLUP:
+ raw = DropInfo.fromJson(data);
+ break;
case BARRIER:
raw = BarrierLog.fromJson(data);
break;
@@ -693,6 +697,11 @@ public class DBBinlog {
if (!record.isSwapTable()) {
droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
}
+ } else if (binlogType == TBinlogType.DROP_ROLLUP && raw instanceof
DropInfo) {
+ long indexId = ((DropInfo) raw).getIndexId();
+ if (indexId > 0) {
+ droppedIndexes.add(Pair.of(indexId, commitSeq));
+ }
} else if (binlogType == TBinlogType.BARRIER && raw instanceof
BarrierLog) {
BarrierLog log = (BarrierLog) raw;
// keep compatible with doris 2.0/2.1
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 11ada7df69f..aab70f86733 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -972,7 +972,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
db.getId(), table.getId());
- DropInfo info = new DropInfo(db.getId(), table.getId(), tableName,
-1L, isView, forceDrop, recycleTime);
+ DropInfo info = new DropInfo(db.getId(), table.getId(), tableName,
isView, forceDrop, recycleTime);
Env.getCurrentEnv().getEditLog().logDropTable(info);
Env.getCurrentEnv().getMtmvService().dropTable(table);
}
@@ -2989,7 +2989,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
try {
dropTable(db, tableId, true, false, 0L);
if (hadLogEditCreateTable) {
- DropInfo info = new DropInfo(db.getId(), tableId,
olapTable.getName(), -1L, false, true, 0L);
+ DropInfo info = new DropInfo(db.getId(), tableId,
olapTable.getName(), false, true, 0L);
Env.getCurrentEnv().getEditLog().logDropTable(info);
}
} catch (Exception ex) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java
index fdfc44e27bb..260ad316d3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java
@@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -43,12 +44,15 @@ public class BatchDropInfo implements Writable {
private String tableName; // not used in equals and hashCode
@SerializedName(value = "indexIdSet")
private Set<Long> indexIdSet;
+ @SerializedName(value = "indexNameMap")
+ private Map<Long, String> indexNameMap; // not used in equals and hashCode
- public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long>
indexIdSet) {
+ public BatchDropInfo(long dbId, long tableId, String tableName, Map<Long,
String> indexNameMap) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
- this.indexIdSet = indexIdSet;
+ this.indexIdSet = indexNameMap.keySet();
+ this.indexNameMap = indexNameMap;
}
@Override
@@ -82,6 +86,10 @@ public class BatchDropInfo implements Writable {
return indexIdSet;
}
+ public Map<Long, String> getIndexNameMap() {
+ return indexNameMap;
+ }
+
public long getDbId() {
return dbId;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
index 461f3ddd67d..69994caf23d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
@@ -38,6 +38,8 @@ public class DropInfo implements Writable {
private String tableName; // not used in equals and hashCode
@SerializedName(value = "indexId")
private long indexId;
+ @SerializedName(value = "indexName")
+ private String indexName; // not used in equals and hashCode
@SerializedName(value = "isView")
private boolean isView = false;
@SerializedName(value = "forceDrop")
@@ -48,12 +50,18 @@ public class DropInfo implements Writable {
public DropInfo() {
}
- public DropInfo(long dbId, long tableId, String tableName, long indexId,
boolean isView, boolean forceDrop,
- long recycleTime) {
+ public DropInfo(long dbId, long tableId, String tableName, boolean isView,
boolean forceDrop,
+ long recycleTime) {
+ this(dbId, tableId, tableName, -1, "", isView, forceDrop, recycleTime);
+ }
+
+ public DropInfo(long dbId, long tableId, String tableName, long indexId,
String indexName, boolean isView,
+ boolean forceDrop, long recycleTime) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
this.indexId = indexId;
+ this.indexName = indexName;
this.isView = isView;
this.forceDrop = forceDrop;
this.recycleTime = recycleTime;
@@ -75,6 +83,10 @@ public class DropInfo implements Writable {
return this.indexId;
}
+ public String getIndexName() {
+ return this.indexName;
+ }
+
public boolean isView() {
return this.isView;
}
@@ -133,4 +145,8 @@ public class DropInfo implements Writable {
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
+
+ public static DropInfo fromJson(String json) {
+ return GsonUtils.GSON.fromJson(json, DropInfo.class);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 261331ac863..3ac992e6edf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -101,6 +101,7 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
/**
* EditLog maintains a log of the memory modifications.
@@ -337,15 +338,18 @@ public class EditLog {
case OperationType.OP_DROP_ROLLUP: {
DropInfo info = (DropInfo) journal.getData();
env.getMaterializedViewHandler().replayDropRollup(info,
env);
+ env.getBinlogManager().addDropRollup(info, logId);
break;
}
case OperationType.OP_BATCH_DROP_ROLLUP: {
BatchDropInfo batchDropInfo = (BatchDropInfo)
journal.getData();
- for (long indexId : batchDropInfo.getIndexIdSet()) {
- env.getMaterializedViewHandler().replayDropRollup(
- new DropInfo(batchDropInfo.getDbId(),
batchDropInfo.getTableId(),
- batchDropInfo.getTableName(), indexId,
false, false, 0),
- env);
+ for (Map.Entry<Long, String> entry :
batchDropInfo.getIndexNameMap().entrySet()) {
+ long indexId = entry.getKey();
+ String indexName = entry.getValue();
+ DropInfo info = new DropInfo(batchDropInfo.getDbId(),
batchDropInfo.getTableId(),
+ batchDropInfo.getTableName(), indexId,
indexName, false, false, 0);
+
env.getMaterializedViewHandler().replayDropRollup(info, env);
+ env.getBinlogManager().addDropRollup(info, logId);
}
break;
}
@@ -1418,11 +1422,20 @@ public class EditLog {
}
public void logDropRollup(DropInfo info) {
- logEdit(OperationType.OP_DROP_ROLLUP, info);
+ long logId = logEdit(OperationType.OP_DROP_ROLLUP, info);
+ Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
}
public void logBatchDropRollup(BatchDropInfo batchDropInfo) {
- logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo);
+ long logId = logEdit(OperationType.OP_BATCH_DROP_ROLLUP,
batchDropInfo);
+ for (Map.Entry<Long, String> entry :
batchDropInfo.getIndexNameMap().entrySet()) {
+ DropInfo info = new DropInfo(batchDropInfo.getDbId(),
+ batchDropInfo.getTableId(),
+ batchDropInfo.getTableName(),
+ entry.getKey(), entry.getValue(),
+ false, true, 0);
+ Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
+ }
}
public void logFinishConsistencyCheck(ConsistencyCheckInfo info) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
index 88aa22ded22..8c74fba2753 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
@@ -44,7 +44,7 @@ public class DropAndRecoverInfoTest {
DropInfo info1 = new DropInfo();
info1.write(dos);
- DropInfo info2 = new DropInfo(1, 2, "t2", -1, false, true, 0);
+ DropInfo info2 = new DropInfo(1, 2, "t2", -1, "", false, true, 0);
info2.write(dos);
dos.flush();
@@ -65,10 +65,10 @@ public class DropAndRecoverInfoTest {
Assert.assertEquals(rInfo2, rInfo2);
Assert.assertNotEquals(rInfo2, this);
- Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, false,
true, 0));
- Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, false,
true, 0));
- Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, false,
false, 0));
- Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, false, true,
0));
+ Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, "", false,
true, 0));
+ Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, "", false,
true, 0));
+ Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false,
false, 0));
+ Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false,
true, 0));
// 3. delete files
dis.close();
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 95f41158e94..f8edb5d544b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1146,6 +1146,7 @@ enum TBinlogType {
INDEX_CHANGE_JOB = 20,
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
+ DROP_ROLLUP = 23,
// Keep some IDs for allocation so that when new binlog types are added in
the
// future, the changes can be picked back to the old versions without
breaking
@@ -1162,8 +1163,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
- MIN_UNKNOWN = 23,
- UNKNOWN_8 = 24,
+ MIN_UNKNOWN = 24,
UNKNOWN_9 = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]