This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7de190fe11b [feat](binlog) Support add/build/drop inverted index
binlog #44418 (#44620)
7de190fe11b is described below
commit 7de190fe11bd8a3a8d464e1783a369067432aee0
Author: walter <[email protected]>
AuthorDate: Tue Nov 26 23:17:49 2024 +0800
[feat](binlog) Support add/build/drop inverted index binlog #44418 (#44620)
cherry pick from #44418
---
.../org/apache/doris/alter/IndexChangeJob.java | 4 ++++
.../org/apache/doris/binlog/BinlogManager.java | 24 ++++++++++++++++++++++
.../java/org/apache/doris/persist/EditLog.java | 12 +++++++++--
.../persist/TableAddOrDropInvertedIndicesInfo.java | 4 ++++
gensrc/thrift/FrontendService.thrift | 6 +++---
5 files changed, 45 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
index c5204630dfa..6b38daad8d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
@@ -469,6 +469,10 @@ public class IndexChangeJob implements Writable {
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
}
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
public static IndexChangeJob read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_122) {
IndexChangeJob job = new IndexChangeJob();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 7107c6b19ee..51cb6db33c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -18,6 +18,7 @@
package org.apache.doris.binlog;
import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.alter.IndexChangeJob;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
@@ -35,6 +36,7 @@ import
org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
+import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
import org.apache.doris.persist.TruncateTableInfo;
@@ -377,6 +379,28 @@ public class BinlogManager {
addBarrierLog(log, commitSeq);
}
+ public void
addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info,
long commitSeq) {
+ long dbId = info.getDbId();
+ List<Long> tableIds = Lists.newArrayList();
+ tableIds.add(info.getTableId());
+ long timestamp = -1;
+ TBinlogType type =
TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES;
+ String data = info.toJson();
+
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
info);
+ }
+
+ public void addIndexChangeJob(IndexChangeJob indexChangeJob, long
commitSeq) {
+ long dbId = indexChangeJob.getDbId();
+ List<Long> tableIds = Lists.newArrayList();
+ tableIds.add(indexChangeJob.getTableId());
+ long timestamp = -1;
+ TBinlogType type = TBinlogType.INDEX_CHANGE_JOB;
+ String data = indexChangeJob.toJson();
+
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
indexChangeJob);
+ }
+
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long
prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 572a07eda2a..d4bea4c3dc7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -977,11 +977,13 @@ public class EditLog {
final TableAddOrDropInvertedIndicesInfo info =
(TableAddOrDropInvertedIndicesInfo)
journal.getData();
env.getSchemaChangeHandler().replayModifyTableAddOrDropInvertedIndices(info);
+
env.getBinlogManager().addModifyTableAddOrDropInvertedIndices(info, logId);
break;
}
case OperationType.OP_INVERTED_INDEX_JOB: {
IndexChangeJob indexChangeJob = (IndexChangeJob)
journal.getData();
env.getSchemaChangeHandler().replayIndexChangeJob(indexChangeJob);
+ env.getBinlogManager().addIndexChangeJob(indexChangeJob,
logId);
break;
}
case OperationType.OP_CLEAN_LABEL: {
@@ -1997,11 +1999,17 @@ public class EditLog {
}
public void
logModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) {
- logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES,
info);
+ long logId =
logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES, info);
+ LOG.info("walter log modify table add or drop inverted indices, infos:
{}, json: {}",
+ info, info.toJson(), new RuntimeException("test"));
+
Env.getCurrentEnv().getBinlogManager().addModifyTableAddOrDropInvertedIndices(info,
logId);
}
public void logIndexChangeJob(IndexChangeJob indexChangeJob) {
- logEdit(OperationType.OP_INVERTED_INDEX_JOB, indexChangeJob);
+ long logId = logEdit(OperationType.OP_INVERTED_INDEX_JOB,
indexChangeJob);
+ LOG.info("walter log inverted index job, infos: {}, json: {}",
+ indexChangeJob, indexChangeJob.toJson(), new
RuntimeException("test"));
+
Env.getCurrentEnv().getBinlogManager().addIndexChangeJob(indexChangeJob, logId);
}
public void logCleanLabel(CleanLabelOperationLog log) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java
index efdc3ab6e9e..39a90046d24 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java
@@ -95,6 +95,10 @@ public class TableAddOrDropInvertedIndicesInfo implements
Writable {
return jobId;
}
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 6c8ad296724..aab338d009c 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1142,6 +1142,8 @@ enum TBinlogType {
MODIFY_COMMENT = 16,
MODIFY_VIEW_DEF = 17,
REPLACE_TABLE = 18,
+ MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES = 19,
+ INDEX_CHANGE_JOB = 20,
// Keep some IDs for allocation so that when new binlog types are added in
the
// future, the changes can be picked back to the old versions without
breaking
@@ -1158,9 +1160,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
- MIN_UNKNOWN = 19,
- UNKNOWN_4 = 20,
- UNKNOWN_5 = 21,
+ MIN_UNKNOWN = 21,
UNKNOWN_6 = 22,
UNKNOWN_7 = 23,
UNKNOWN_8 = 24,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]