This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d2405af00a8 [fix](catalog) Refresh remote OLAP partitions on replica
relocation (#63894)
d2405af00a8 is described below
commit d2405af00a84c3be00e9f1287c30e018b66f3dcb
Author: HonestManXin <[email protected]>
AuthorDate: Wed Jun 3 10:35:55 2026 +0800
[fix](catalog) Refresh remote OLAP partitions on replica relocation (#63894)
Currently, the update of partitions only depends on the visible version
and visible time. If a balance occurs, the version and time of the
partition will not be updated, which means that the updated partition
will not be retrieved from the remote FE. When executing a query, the
tablet on the BE node may no longer exist, resulting in query errors.
To avoid this problem, a checksum will be calculated for the partition
to determine whether the partition's metadata has changed.
---
.../java/org/apache/doris/catalog/Partition.java | 86 +++++++++++
.../java/org/apache/doris/common/util/Util.java | 15 ++
.../doris/datasource/doris/FeServiceClient.java | 57 ++++---
.../apache/doris/service/FrontendServiceImpl.java | 167 ++++++++++++---------
.../doris/catalog/MaterializedIndexTest.java | 134 +++++++++++++++++
gensrc/thrift/FrontendService.thrift | 3 +
6 files changed, 376 insertions(+), 86 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 4edc4d5888e..702c60340f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -23,15 +23,20 @@ import
org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.Util;
import org.apache.doris.rpc.RpcException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.security.MessageDigest;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -92,6 +97,8 @@ public class Partition extends MetaObject {
@SerializedName(value = "di", alternate = {"distributionInfo"})
private DistributionInfo distributionInfo;
+ private transient volatile String remoteMetaChecksum;
+
protected Partition() {
}
@@ -258,6 +265,85 @@ public class Partition extends MetaObject {
return indices;
}
+ public String getMetaChecksum() {
+ MessageDigest digest = DigestUtils.getSha256Digest();
+ // Include partition-level fields whose changes should invalidate the
cached
+ // remote partition payload, even when visibleVersion /
visibleVersionTime
+ // remain unchanged (e.g. ALTER TABLE ... RENAME PARTITION only
mutates name).
+ updateMetaChecksum(digest, (byte) 11, id);
+ updateMetaChecksumString(digest, (byte) 12, name);
+ updateMetaChecksum(digest, (byte) 13, state == null ? -1L :
state.ordinal());
+ updateMetaChecksum(digest, (byte) 14, visibleVersion);
+ updateMetaChecksum(digest, (byte) 15, visibleVersionTime);
+ updateMetaChecksum(digest, (byte) 16, nextVersion);
+ if (distributionInfo != null) {
+ DistributionInfoType distType = distributionInfo.getType();
+ updateMetaChecksum(digest, (byte) 17, distType == null ? -1L :
distType.ordinal());
+ updateMetaChecksum(digest, (byte) 18,
distributionInfo.getBucketNum());
+ updateMetaChecksum(digest, (byte) 19,
distributionInfo.getAutoBucket() ? 1L : 0L);
+ } else {
+ updateMetaChecksum(digest, (byte) 17, -1L);
+ }
+ List<MaterializedIndex> indexes =
getMaterializedIndices(IndexExtState.VISIBLE);
+ indexes.sort(Comparator.comparingLong(MaterializedIndex::getId));
+ for (MaterializedIndex index : indexes) {
+ updateMetaChecksum(digest, (byte) 1, index.getId());
+ List<Tablet> tablets = Lists.newArrayList(index.getTablets());
+ tablets.sort(Comparator.comparingLong(Tablet::getId));
+ for (Tablet tablet : tablets) {
+ updateMetaChecksum(digest, (byte) 2, tablet.getId());
+ List<Replica> replicas =
Lists.newArrayList(tablet.getReplicas());
+ replicas.sort(Comparator.comparingLong(Replica::getId)
+
.thenComparingLong(Replica::getBackendIdWithoutException));
+ for (Replica replica : replicas) {
+ updateMetaChecksum(digest, (byte) 3, replica.getId());
+ updateMetaChecksum(digest, (byte) 4,
replica.getBackendIdWithoutException());
+ // Include all replica fields that affect
getQueryableReplicas() filtering,
+ // so a stale remote cache is invalidated whenever any of
them changes
+ // (e.g. replica becomes bad, lastFailedVersion is set,
version/state changes).
+ updateMetaChecksum(digest, (byte) 5, replica.getVersion());
+ updateMetaChecksum(digest, (byte) 6,
replica.getLastFailedVersion());
+ updateMetaChecksum(digest, (byte) 7,
replica.getPathHash());
+ Replica.ReplicaState state = replica.getState();
+ updateMetaChecksum(digest, (byte) 8, state == null ? -1L :
state.ordinal());
+ updateMetaChecksum(digest, (byte) 9, replica.isBad() ? 1L
: 0L);
+ updateMetaChecksum(digest, (byte) 10, replica.isUserDrop()
? 1L : 0L);
+ }
+ }
+ }
+ return Hex.encodeHexString(digest.digest());
+ }
+
+ public String getRemoteMetaChecksum() {
+ return remoteMetaChecksum;
+ }
+
+ public void setRemoteMetaChecksum(String checksum) {
+ if (checksum != null) {
+ this.remoteMetaChecksum = checksum;
+ }
+ }
+
+ private void updateMetaChecksum(MessageDigest digest, byte tag, long
value) {
+ Util.updateMessageDigest(digest, tag);
+ Util.updateMessageDigest(digest, value);
+ }
+
+ private void updateMetaChecksumString(MessageDigest digest, byte tag,
String value) {
+ Util.updateMessageDigest(digest, tag);
+ if (value == null) {
+ Util.updateMessageDigest(digest, -1L);
+ return;
+ }
+ int len = value.length();
+ Util.updateMessageDigest(digest, (long) len);
+ for (int i = 0; i < len; i++) {
+ char c = value.charAt(i);
+ digest.update((byte) (c >>> 8));
+ digest.update((byte) c);
+ }
+ }
+
public long getAllDataSize(boolean singleReplica) {
return getDataSize(singleReplica) + getRemoteDataSize();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index e260418227a..25c0d81295b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -113,6 +113,21 @@ public class Util {
};
}
+ public static void updateMessageDigest(MessageDigest digest, byte value) {
+ digest.update(value);
+ }
+
+ public static void updateMessageDigest(MessageDigest digest, long value) {
+ digest.update((byte) (value >>> 56));
+ digest.update((byte) (value >>> 48));
+ digest.update((byte) (value >>> 40));
+ digest.update((byte) (value >>> 32));
+ digest.update((byte) (value >>> 24));
+ digest.update((byte) (value >>> 16));
+ digest.update((byte) (value >>> 8));
+ digest.update((byte) value);
+ }
+
// Get a string represent the schema signature, contains:
// list of columns and bloom filter column info.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
index 10ef6435ff1..8fa391fb776 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
@@ -67,6 +67,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@@ -266,18 +267,10 @@ public class FeServiceClient {
request.setPasswd(password);
request.setVersion(FeConstants.meta_version);
for (Partition partition : partitions) {
- TPartitionMeta meta = new TPartitionMeta();
- meta.setId(partition.getId());
- meta.setVisibleVersion(partition.getVisibleVersion());
- meta.setVisibleVersionTime(partition.getVisibleVersionTime());
- request.addToPartitions(meta);
+ request.addToPartitions(buildPartitionMeta(partition));
}
for (Partition partition : tempPartitions) {
- TPartitionMeta meta = new TPartitionMeta();
- meta.setId(partition.getId());
- meta.setVisibleVersion(partition.getVisibleVersion());
- meta.setVisibleVersionTime(partition.getVisibleVersionTime());
- request.addToTempPartitions(meta);
+ request.addToTempPartitions(buildPartitionMeta(partition));
}
String msg = String.format("failed to get table meta from remote
doris:%s", name);
return randomCallWithRetry(client -> {
@@ -291,13 +284,17 @@ public class FeServiceClient {
remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable);
}
List<Partition> updatedPartitions = new
ArrayList<>(result.getUpdatedPartitionsSize());
+ List<String> updatedPartitionChecksums =
result.isSetUpdatedPartitionChecksums()
+ ? result.getUpdatedPartitionChecksums() :
Collections.emptyList();
if (result.getUpdatedPartitionsSize() > 0) {
- for (ByteBuffer buffer : result.getUpdatedPartitions()) {
+ for (int i = 0; i < result.getUpdatedPartitionsSize(); i++) {
+ ByteBuffer buffer = result.getUpdatedPartitions().get(i);
try (ByteArrayInputStream in =
new ByteArrayInputStream(buffer.array(),
buffer.position(), buffer.remaining());
DataInputStream dataInputStream = new
DataInputStream(in)) {
String partitionStr = Text.readString(dataInputStream);
Partition partition =
GsonUtils.GSON.fromJson(partitionStr, Partition.class);
+ setRemoteMetaChecksum(partition,
updatedPartitionChecksums, i);
updatedPartitions.add(partition);
}
}
@@ -308,27 +305,51 @@ public class FeServiceClient {
}
remoteOlapTable.rebuildPartitions(partitions, updatedPartitions,
removedPartitions);
// rebuild temp partitions
+ List<Partition> updatedTempPartitions = new ArrayList<>();
if (result.isSetUpdatedTempPartitions() &&
result.getUpdatedTempPartitionsSize() > 0) {
- updatedPartitions = new
ArrayList<>(result.getUpdatedTempPartitionsSize());
- for (ByteBuffer buffer : result.getUpdatedTempPartitions()) {
+ List<String> updatedTempPartitionChecksums =
result.isSetUpdatedTempPartitionChecksums()
+ ? result.getUpdatedTempPartitionChecksums() :
Collections.emptyList();
+ for (int i = 0; i < result.getUpdatedTempPartitionsSize();
i++) {
+ ByteBuffer buffer =
result.getUpdatedTempPartitions().get(i);
try (ByteArrayInputStream in =
new ByteArrayInputStream(buffer.array(),
buffer.position(), buffer.remaining());
DataInputStream dataInputStream = new
DataInputStream(in)) {
String partitionStr = Text.readString(dataInputStream);
Partition partition =
GsonUtils.GSON.fromJson(partitionStr, Partition.class);
- updatedPartitions.add(partition);
+ setRemoteMetaChecksum(partition,
updatedTempPartitionChecksums, i);
+ updatedTempPartitions.add(partition);
}
}
}
- removedPartitions = result.getRemovedTempPartitions();
- if (removedPartitions == null) {
- removedPartitions = new ArrayList<>();
+ List<Long> removedTempPartitions =
result.getRemovedTempPartitions();
+ if (removedTempPartitions == null) {
+ removedTempPartitions = new ArrayList<>();
}
- remoteOlapTable.rebuildTempPartitions(tempPartitions,
updatedPartitions, removedPartitions);
+ remoteOlapTable.rebuildTempPartitions(tempPartitions,
updatedTempPartitions, removedTempPartitions);
return remoteOlapTable;
}, msg, timeoutMs);
}
+ private TPartitionMeta buildPartitionMeta(Partition partition) {
+ TPartitionMeta meta = new TPartitionMeta();
+ meta.setId(partition.getId());
+ meta.setVisibleVersion(partition.getVisibleVersion());
+ meta.setVisibleVersionTime(partition.getVisibleVersionTime());
+ String remoteMetaChecksum = partition.getRemoteMetaChecksum();
+ if (remoteMetaChecksum == null) {
+ remoteMetaChecksum = partition.getMetaChecksum();
+ partition.setRemoteMetaChecksum(remoteMetaChecksum);
+ }
+ meta.setMetaChecksum(remoteMetaChecksum);
+ return meta;
+ }
+
+ private void setRemoteMetaChecksum(Partition partition, List<String>
checksums, int index) {
+ if (index < checksums.size()) {
+ partition.setRemoteMetaChecksum(checksums.get(index));
+ }
+ }
+
public TBeginRemoteTxnResult beginRemoteTxn(TBeginRemoteTxnRequest
request) throws Exception {
request.setUser(user);
request.setPasswd(password);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 411b7d4c20b..dcc75368500 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -5451,82 +5451,40 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeConstants.meta_version);
metaContext.setThreadLocalInfo();
- table.readLock();
try (ByteArrayOutputStream bOutputStream = new
ByteArrayOutputStream(8192)) {
- OlapTable copyTable = table.copyTableMeta();
- try (DataOutputStream out = new
DataOutputStream(bOutputStream)) {
- copyTable.write(out);
- out.flush();
- result.setTableMeta(bOutputStream.toByteArray());
- }
- Set<Long> updatedPartitionIds =
Sets.newHashSet(table.getPartitionIds());
- List<TPartitionMeta> partitionMetas =
request.getPartitionsSize() == 0 ? Lists.newArrayList()
- : request.getPartitions();
- for (TPartitionMeta partitionMeta : partitionMetas) {
- if (request.getTableId() != table.getId()) {
- result.addToRemovedPartitions(partitionMeta.getId());
- continue;
- }
- Partition partition =
table.getPartition(partitionMeta.getId());
- if (partition == null) {
- result.addToRemovedPartitions(partitionMeta.getId());
- continue;
- }
- if (partition.getVisibleVersion() ==
partitionMeta.getVisibleVersion()
- && partition.getVisibleVersionTime() ==
partitionMeta.getVisibleVersionTime()) {
- updatedPartitionIds.remove(partitionMeta.getId());
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("receive getOlapTableMeta db: {} table:{}
update partitions: {} removed partition:{}",
- request.getDb(), request.getTable(),
updatedPartitionIds.size(),
- result.getRemovedPartitionsSize());
- }
- for (Long partitionId : updatedPartitionIds) {
- bOutputStream.reset();
- Partition partition = table.getPartition(partitionId);
+ Set<Long> updatedPartitionIds;
+ Set<Long> updatedTempPartitionIds;
+ Map<Long, String> partitionChecksums = Maps.newHashMap();
+ Map<Long, String> tempPartitionChecksums = Maps.newHashMap();
+ table.readLock();
+ try {
+ OlapTable copyTable = table.copyTableMeta();
try (DataOutputStream out = new
DataOutputStream(bOutputStream)) {
- Text.writeString(out,
GsonUtils.GSON.toJson(partition));
+ copyTable.write(out);
out.flush();
-
result.addToUpdatedPartitions(ByteBuffer.wrap(bOutputStream.toByteArray()));
+ result.setTableMeta(bOutputStream.toByteArray());
}
- }
- // temp partitions
- updatedPartitionIds =
Sets.newHashSet(table.getTempPartitions().getPartitionIds());
- partitionMetas = request.getTempPartitionsSize() == 0 ?
Lists.newArrayList()
- : request.getTempPartitions();
- for (TPartitionMeta partitionMeta : partitionMetas) {
- if (request.getTableId() != table.getId()) {
-
result.addToRemovedTempPartitions(partitionMeta.getId());
- continue;
- }
- Partition tempPartition =
table.getTempPartitions().getPartition(partitionMeta.getId());
- if (tempPartition == null) {
-
result.addToRemovedTempPartitions(partitionMeta.getId());
- continue;
- }
- if (tempPartition.getVisibleVersion() ==
partitionMeta.getVisibleVersion()
- && tempPartition.getVisibleVersionTime() ==
partitionMeta.getVisibleVersionTime()) {
- updatedPartitionIds.remove(partitionMeta.getId());
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("update temp partitions: {}, removed temp
partition:{}",
- updatedPartitionIds.size(),
result.getRemovedPartitionsSize());
- }
- for (Long partitionId : updatedPartitionIds) {
- bOutputStream.reset();
- Partition partition =
table.getTempPartitions().getPartition(partitionId);
- try (DataOutputStream out = new
DataOutputStream(bOutputStream)) {
- Text.writeString(out,
GsonUtils.GSON.toJson(partition));
- out.flush();
-
result.addToUpdatedTempPartitions(ByteBuffer.wrap(bOutputStream.toByteArray()));
+ updatedPartitionIds =
Sets.newHashSet(table.getPartitionIds());
+ collectPartitionChanges(table, request.getTableId(),
request.getPartitions(), false,
+ updatedPartitionIds, partitionChecksums, result);
+ updatedTempPartitionIds =
Sets.newHashSet(table.getTempPartitions().getPartitionIds());
+ collectPartitionChanges(table, request.getTableId(),
request.getTempPartitions(), true,
+ updatedTempPartitionIds, tempPartitionChecksums,
result);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive getOlapTableMeta db: {} table:{}
update partitions: {} "
+ + "removed partition:{} update temp
partitions: {} removed temp partition:{}",
+ request.getDb(), request.getTable(),
updatedPartitionIds.size(),
+ result.getRemovedPartitionsSize(),
updatedTempPartitionIds.size(),
+ result.getRemovedTempPartitionsSize());
}
+ addUpdatedPartitions(table, updatedPartitionIds, false,
partitionChecksums, bOutputStream, result);
+ addUpdatedPartitions(table, updatedTempPartitionIds, true,
tempPartitionChecksums,
+ bOutputStream, result);
+ } finally {
+ table.readUnlock();
}
return result;
} finally {
- table.readUnlock();
MetaContext.remove();
}
} catch (AuthenticationException e) {
@@ -5549,6 +5507,79 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
}
+ private void collectPartitionChanges(OlapTable table, long requestTableId,
+ List<TPartitionMeta> partitionMetas, boolean tempPartition,
Set<Long> updatedPartitionIds,
+ Map<Long, String> partitionChecksums, TGetOlapTableMetaResult
result) {
+ if (partitionMetas == null) {
+ return;
+ }
+ for (TPartitionMeta partitionMeta : partitionMetas) {
+ long partitionId = partitionMeta.getId();
+ if (requestTableId != table.getId()) {
+ addRemovedPartition(result, partitionId, tempPartition);
+ continue;
+ }
+ Partition partition = getPartition(table, partitionId,
tempPartition);
+ if (partition == null) {
+ addRemovedPartition(result, partitionId, tempPartition);
+ continue;
+ }
+ if (isPartitionVersionMatched(partition, partitionMeta)) {
+ if (!partitionMeta.isSetMetaChecksum()) {
+ updatedPartitionIds.remove(partitionId);
+ continue;
+ }
+ String metaChecksum = getPartitionMetaChecksum(partition,
partitionChecksums);
+ if (metaChecksum.equals(partitionMeta.getMetaChecksum())) {
+ updatedPartitionIds.remove(partitionId);
+ }
+ }
+ }
+ }
+
+ private void addUpdatedPartitions(OlapTable table, Set<Long>
updatedPartitionIds, boolean tempPartition,
+ Map<Long, String> partitionChecksums, ByteArrayOutputStream
bOutputStream,
+ TGetOlapTableMetaResult result) throws IOException {
+ for (Long partitionId : updatedPartitionIds) {
+ Partition partition = getPartition(table, partitionId,
tempPartition);
+ Preconditions.checkState(partition != null);
+ String metaChecksum = getPartitionMetaChecksum(partition,
partitionChecksums);
+ bOutputStream.reset();
+ try (DataOutputStream out = new DataOutputStream(bOutputStream)) {
+ Text.writeString(out, GsonUtils.GSON.toJson(partition));
+ out.flush();
+ if (tempPartition) {
+
result.addToUpdatedTempPartitions(ByteBuffer.wrap(bOutputStream.toByteArray()));
+ result.addToUpdatedTempPartitionChecksums(metaChecksum);
+ } else {
+
result.addToUpdatedPartitions(ByteBuffer.wrap(bOutputStream.toByteArray()));
+ result.addToUpdatedPartitionChecksums(metaChecksum);
+ }
+ }
+ }
+ }
+
+ private String getPartitionMetaChecksum(Partition partition, Map<Long,
String> partitionChecksums) {
+ return partitionChecksums.computeIfAbsent(partition.getId(), key ->
partition.getMetaChecksum());
+ }
+
+ private Partition getPartition(OlapTable table, long partitionId, boolean
tempPartition) {
+ return tempPartition ?
table.getTempPartitions().getPartition(partitionId) :
table.getPartition(partitionId);
+ }
+
+ private void addRemovedPartition(TGetOlapTableMetaResult result, long
partitionId, boolean tempPartition) {
+ if (tempPartition) {
+ result.addToRemovedTempPartitions(partitionId);
+ } else {
+ result.addToRemovedPartitions(partitionId);
+ }
+ }
+
+ private boolean isPartitionVersionMatched(Partition partition,
TPartitionMeta partitionMeta) {
+ return partition.getVisibleVersion() ==
partitionMeta.getVisibleVersion()
+ && partition.getVisibleVersionTime() ==
partitionMeta.getVisibleVersionTime();
+ }
+
@Override
public TStatus syncCloudTabletStats(TSyncCloudTabletStatsRequest request)
throws TException {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
index 3ff3f2519a3..6044e35b5ec 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
@@ -93,6 +93,140 @@ public class MaterializedIndexTest {
Assert.assertThrows(UnsupportedOperationException.class, () ->
snapshot.add(new LocalTablet(3L)));
}
+ @Test
+ public void testPartitionMetaChecksum() {
+ MaterializedIndex firstIndex = new MaterializedIndex(1L,
IndexState.NORMAL);
+ LocalTablet firstTablet = new LocalTablet(10L);
+ firstTablet.addReplica(new LocalReplica(100L, 1000L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ firstTablet.addReplica(new LocalReplica(101L, 1001L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ firstIndex.addTablet(firstTablet, null, true);
+ LocalTablet secondTablet = new LocalTablet(11L);
+ secondTablet.addReplica(new LocalReplica(110L, 1010L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ firstIndex.addTablet(secondTablet, null, true);
+ Partition firstPartition = new Partition(1L, "p1", firstIndex, null);
+ firstPartition.createRollupIndex(createIndex(2L, 20L, 200L, 2000L));
+ firstPartition.createRollupIndex(createIndex(3L, 30L, 300L, 3000L));
+ // Pin visibleVersionTime so partitions constructed in different
millis stay comparable.
+ long pinnedVisibleVersionTime = firstPartition.getVisibleVersionTime();
+ Partition deserializedPartition =
GsonUtils.GSON.fromJson(GsonUtils.GSON.toJson(firstPartition),
+ Partition.class);
+ Assert.assertEquals(firstPartition.getMetaChecksum(),
deserializedPartition.getMetaChecksum());
+
+ MaterializedIndex reorderedIndex = new MaterializedIndex(1L,
IndexState.NORMAL);
+ LocalTablet reorderedSecondTablet = new LocalTablet(11L);
+ reorderedSecondTablet.addReplica(new LocalReplica(110L, 1010L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ reorderedIndex.addTablet(reorderedSecondTablet, null, true);
+ LocalTablet reorderedTablet = new LocalTablet(10L);
+ reorderedTablet.addReplica(new LocalReplica(101L, 1001L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ reorderedTablet.addReplica(new LocalReplica(100L, 1000L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ reorderedIndex.addTablet(reorderedTablet, null, true);
+ Partition reorderedPartition = new Partition(1L, "p1", reorderedIndex,
null);
+
reorderedPartition.setVisibleVersionAndTime(reorderedPartition.getVisibleVersion(),
pinnedVisibleVersionTime);
+ reorderedPartition.createRollupIndex(createIndex(3L, 30L, 300L,
3000L));
+ reorderedPartition.createRollupIndex(createIndex(2L, 20L, 200L,
2000L));
+ Assert.assertEquals(firstPartition.getMetaChecksum(),
reorderedPartition.getMetaChecksum());
+
+ MaterializedIndex movedIndex = new MaterializedIndex(1L,
IndexState.NORMAL);
+ LocalTablet movedTablet = new LocalTablet(10L);
+ movedTablet.addReplica(new LocalReplica(100L, 1000L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ movedTablet.addReplica(new LocalReplica(102L, 1002L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ movedIndex.addTablet(movedTablet, null, true);
+ movedIndex.addTablet(secondTablet, null, true);
+ Partition movedPartition = new Partition(1L, "p1", movedIndex, null);
+
movedPartition.setVisibleVersionAndTime(movedPartition.getVisibleVersion(),
pinnedVisibleVersionTime);
+ movedPartition.createRollupIndex(createIndex(2L, 20L, 200L, 2000L));
+ movedPartition.createRollupIndex(createIndex(3L, 30L, 300L, 3000L));
+ Assert.assertNotEquals(firstPartition.getMetaChecksum(),
movedPartition.getMetaChecksum());
+
+ firstPartition.setRemoteMetaChecksum(firstPartition.getMetaChecksum());
+ Assert.assertEquals(firstPartition.getMetaChecksum(),
firstPartition.getRemoteMetaChecksum());
+ }
+
+ @Test
+ public void testPartitionMetaChecksumChangesOnReplicaQueryFields() {
+ // Build a partition with one tablet/replica.
+ MaterializedIndex baseIndex = new MaterializedIndex(1L,
IndexState.NORMAL);
+ LocalTablet tablet = new LocalTablet(10L);
+ LocalReplica replica = new LocalReplica(100L, 1000L,
Replica.ReplicaState.NORMAL, 1L, 1);
+ tablet.addReplica(replica, true);
+ baseIndex.addTablet(tablet, null, true);
+ Partition partition = new Partition(1L, "p1", baseIndex, null);
+ String original = partition.getMetaChecksum();
+
+ // 1) lastFailedVersion change must invalidate the checksum.
+ replica.updateLastFailedVersion(5L);
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ replica.updateLastFailedVersion(-1L);
+ Assert.assertEquals(original, partition.getMetaChecksum());
+
+ // 2) state change must invalidate the checksum.
+ replica.setState(Replica.ReplicaState.DECOMMISSION);
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ replica.setState(Replica.ReplicaState.NORMAL);
+ Assert.assertEquals(original, partition.getMetaChecksum());
+
+ // 3) bad flag change must invalidate the checksum.
+ Assert.assertTrue(replica.setBad(true));
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ Assert.assertTrue(replica.setBad(false));
+ Assert.assertEquals(original, partition.getMetaChecksum());
+
+ // 4) pathHash change must invalidate the checksum.
+ replica.setPathHash(99L);
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ replica.setPathHash(-1L);
+ Assert.assertEquals(original, partition.getMetaChecksum());
+
+ // 5) version change must invalidate the checksum.
Replica.updateVersion()
+ // refuses to roll back, so this is asserted last with a one-way
change.
+ replica.updateVersion(7L);
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ }
+
+ @Test
+ public void testPartitionMetaChecksumChangesOnPartitionTopLevelFields() {
+ MaterializedIndex baseIndex = new MaterializedIndex(1L,
IndexState.NORMAL);
+ LocalTablet tablet = new LocalTablet(10L);
+ tablet.addReplica(new LocalReplica(100L, 1000L,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ baseIndex.addTablet(tablet, null, true);
+ RandomDistributionInfo distributionInfo = new
RandomDistributionInfo(3);
+ Partition partition = new Partition(1L, "p1", baseIndex,
distributionInfo);
+ String original = partition.getMetaChecksum();
+
+ // RENAME PARTITION only mutates the partition name, with no visible
version change;
+ // the checksum must still change so the remote cache can detect the
rename.
+ partition.setName("p1_renamed");
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ partition.setName("p1");
+ Assert.assertEquals(original, partition.getMetaChecksum());
+
+ // PartitionState changes (e.g. RESTORE) must invalidate the checksum.
+ partition.setState(Partition.PartitionState.RESTORE);
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ partition.setState(Partition.PartitionState.NORMAL);
+ Assert.assertEquals(original, partition.getMetaChecksum());
+
+ // DistributionInfo bucket-num change must invalidate the checksum.
+ int oldBucketNum = distributionInfo.getBucketNum();
+ distributionInfo.setBucketNum(oldBucketNum + 2);
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ distributionInfo.setBucketNum(oldBucketNum);
+ Assert.assertEquals(original, partition.getMetaChecksum());
+
+ // nextVersion changes must invalidate the checksum (asserted last;
+ // setNextVersion() can't be reverted to its original value safely).
+ partition.setNextVersion(partition.getNextVersion() + 1);
+ Assert.assertNotEquals(original, partition.getMetaChecksum());
+ }
+
+ private MaterializedIndex createIndex(long indexId, long tabletId, long
replicaId, long backendId) {
+ MaterializedIndex index = new MaterializedIndex(indexId,
IndexState.NORMAL);
+ LocalTablet tablet = new LocalTablet(tabletId);
+ tablet.addReplica(new LocalReplica(replicaId, backendId,
Replica.ReplicaState.NORMAL, 1L, 1), true);
+ index.addTablet(tablet, null, true);
+ return index;
+ }
+
@Test
public void testConcurrentGetTabletsNeverThrows() throws
InterruptedException {
// A reader repeatedly snapshots and iterates getTablets() while a
writer keeps
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index c47749de06b..cc74ffbe065 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1734,6 +1734,7 @@ struct TPartitionMeta {
1: optional i64 id
2: optional i64 visible_version
3: optional i64 visible_version_time
+ 4: optional string meta_checksum
}
struct TGetOlapTableMetaRequest {
@@ -1754,6 +1755,8 @@ struct TGetOlapTableMetaResult {
4: optional list<i64> removed_partitions
5: optional list<binary> updated_temp_partitions
6: optional list<i64> removed_temp_partitions
+ 7: optional list<string> updated_partition_checksums
+ 8: optional list<string> updated_temp_partition_checksums
}
// Remote transaction request and Result definitions for cross-cluster export.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]