codelipenghui commented on code in PR #19620:
URL: https://github.com/apache/pulsar/pull/19620#discussion_r1118562948
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java:
##########
@@ -50,14 +50,19 @@ public void checkBrokers(boolean check) {
public boolean shouldKeepLeft(ServiceUnitStateData from,
ServiceUnitStateData to) {
if (to == null) {
return false;
- } else if (to.force()) {
- return false;
}
+ // Skip the compaction case where from = null and to.versionId > 1
+ if (from != null && from.versionId() + 1 != to.versionId()) {
+ return true;
+ }
Review Comment:
Do we need to filter out the invalidate data when reading from the topic?
Otherwise, we might get an inconsistent data view with the compacted data.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java:
##########
@@ -37,19 +36,23 @@ public record ServiceUnitStateData(
}
}
- public ServiceUnitStateData(ServiceUnitState state, String broker, String
sourceBroker) {
- this(state, broker, sourceBroker, false, System.currentTimeMillis());
+ public ServiceUnitStateData(ServiceUnitState state, String broker, String
sourceBroker, long versionId) {
+ this(state, broker, sourceBroker, false, System.currentTimeMillis(),
versionId);
}
- public ServiceUnitStateData(ServiceUnitState state, String broker) {
- this(state, broker, null, false, System.currentTimeMillis());
+ public ServiceUnitStateData(ServiceUnitState state, String broker, long
versionId) {
+ this(state, broker, null, false, System.currentTimeMillis(),
versionId);
}
- public ServiceUnitStateData(ServiceUnitState state, String broker, boolean
force) {
- this(state, broker, null, force, System.currentTimeMillis());
+ public ServiceUnitStateData(ServiceUnitState state, String broker, boolean
force, long versionId) {
+ this(state, broker, null, force, System.currentTimeMillis(),
versionId);
}
public static ServiceUnitState state(ServiceUnitStateData data) {
return data == null ? ServiceUnitState.Init : data.state();
}
+
+ public static long versionId(ServiceUnitStateData data) {
+ return data == null ? 0 : data.versionId();
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]