This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 473fa51f012cea1f1a6079940a2fd30f13816a81
Author: Jack Drogon <[email protected]>
AuthorDate: Thu Jul 20 09:52:21 2023 +0800

    [Enhancement](binlog) Add partitionRange && indexIds in UpsertRecord && 
PartitionCommitInfo (#22005)
---
 .../java/org/apache/doris/binlog/BinlogManager.java  |  4 ++++
 .../java/org/apache/doris/binlog/UpsertRecord.java   | 16 ++++++++++++++--
 .../src/main/java/org/apache/doris/catalog/Env.java  |  4 ----
 .../doris/transaction/DatabaseTransactionMgr.java    | 20 +++++++++++++++++---
 .../doris/transaction/PartitionCommitInfo.java       |  9 ++++++++-
 5 files changed, 43 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b07072955f..11075c4fc4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -361,6 +361,10 @@ public class BinlogManager {
                 // Step 2.1: read a binlog
                 TBinlog binlog = readTBinlogFromStream(dis);
 
+                if (!Config.enable_feature_binlog) {
+                    continue;
+                }
+
                 // Step 2.2: check if there is in next db Binlogs region
                 long dbId = binlog.getDbId();
                 if (dbId != currentDbId) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
index 32052f798a..f42c7031cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
@@ -29,12 +29,17 @@ import com.google.gson.annotations.SerializedName;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class UpsertRecord {
     public static class TableRecord {
         public static class PartitionRecord {
             @SerializedName(value = "partitionId")
             public long partitionId;
+
+            @SerializedName(value = "range")
+            private String range;
+
             @SerializedName(value = "version")
             public long version;
         }
@@ -42,13 +47,18 @@ public class UpsertRecord {
         @SerializedName(value = "partitionRecords")
         private List<PartitionRecord> partitionRecords;
 
-        public TableRecord() {
+        @SerializedName(value = "indexIds")
+        private Set<Long> indexIds;
+
+        public TableRecord(Set<Long> indexIds) {
             partitionRecords = Lists.newArrayList();
+            this.indexIds = indexIds;
         }
 
         public void addPartitionRecord(PartitionCommitInfo 
partitionCommitInfo) {
             PartitionRecord partitionRecord = new PartitionRecord();
             partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
+            partitionRecord.range = partitionCommitInfo.getPartitionRange();
             partitionRecord.version = partitionCommitInfo.getVersion();
             partitionRecords.add(partitionRecord);
         }
@@ -83,8 +93,10 @@ public class UpsertRecord {
         dbId = state.getDbId();
         tableRecords = Maps.newHashMap();
 
+        Map<Long, Set<Long>> loadedTableIndexIds = state.getLoadedTblIndexes();
         for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) {
-            TableRecord tableRecord = new TableRecord();
+            Set<Long> indexIds = loadedTableIndexIds.get(info.getTableId());
+            TableRecord tableRecord = new TableRecord(indexIds);
             tableRecords.put(info.getTableId(), tableRecord);
 
             for (PartitionCommitInfo partitionCommitInfo : 
info.getIdToPartitionCommitInfo().values()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 933b7714a6..52ecb88ea0 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1918,10 +1918,6 @@ public class Env {
 
     // load binlogs
     public long loadBinlogs(DataInputStream dis, long checksum) throws 
IOException {
-        if (!Config.enable_feature_binlog) {
-            return checksum;
-        }
-
         binlogManager.read(dis, checksum);
         LOG.info("finished replay binlogMgr from image");
         return checksum;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index eb69eab8b7..9114186162 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
@@ -1027,8 +1028,15 @@ public class DatabaseTransactionMgr {
         transactionState.setErrorReplicas(errorReplicaIds);
         for (long tableId : tableToPartition.keySet()) {
             TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            PartitionInfo tblPartitionInfo = table.getPartitionInfo();
             for (long partitionId : tableToPartition.get(tableId)) {
-                PartitionCommitInfo partitionCommitInfo = new 
PartitionCommitInfo(partitionId, -1, -1);
+                String partitionRange = "";
+                if (tblPartitionInfo.getType() == PartitionType.RANGE
+                        || tblPartitionInfo.getType() == PartitionType.LIST) {
+                    partitionRange = 
tblPartitionInfo.getItem(partitionId).getItems().toString();
+                }
+                PartitionCommitInfo partitionCommitInfo = new 
PartitionCommitInfo(partitionId, partitionRange, -1, -1);
                 tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
             }
             transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
@@ -1060,10 +1068,16 @@ public class DatabaseTransactionMgr {
         transactionState.setErrorReplicas(errorReplicaIds);
         for (long tableId : tableToPartition.keySet()) {
             TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            PartitionInfo tblPartitionInfo = table.getPartitionInfo();
             for (long partitionId : tableToPartition.get(tableId)) {
-                OlapTable table = (OlapTable) db.getTableNullable(tableId);
                 Partition partition = table.getPartition(partitionId);
-                PartitionCommitInfo partitionCommitInfo = new 
PartitionCommitInfo(partitionId,
+                String partitionRange = "";
+                if (tblPartitionInfo.getType() == PartitionType.RANGE
+                        || tblPartitionInfo.getType() == PartitionType.LIST) {
+                    partitionRange = 
tblPartitionInfo.getItem(partitionId).getItems().toString();
+                }
+                PartitionCommitInfo partitionCommitInfo = new 
PartitionCommitInfo(partitionId, partitionRange,
                         partition.getNextVersion(),
                         System.currentTimeMillis() /* use as partition visible 
time */);
                 tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
index 8a206a172d..3f35c1d295 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
@@ -31,6 +31,8 @@ public class PartitionCommitInfo implements Writable {
 
     @SerializedName(value = "partitionId")
     private long partitionId;
+    @SerializedName(value = "range")
+    private String range;
     @SerializedName(value = "version")
     private long version;
     @SerializedName(value = "versionTime")
@@ -40,9 +42,10 @@ public class PartitionCommitInfo implements Writable {
 
     }
 
-    public PartitionCommitInfo(long partitionId, long version, long 
visibleTime) {
+    public PartitionCommitInfo(long partitionId, String partitionRange, long 
version, long visibleTime) {
         super();
         this.partitionId = partitionId;
+        this.range = partitionRange;
         this.version = version;
         this.versionTime = visibleTime;
     }
@@ -62,6 +65,10 @@ public class PartitionCommitInfo implements Writable {
         return partitionId;
     }
 
+    public String getPartitionRange() {
+        return range;
+    }
+
     public long getVersion() {
         return version;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to