This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 69c26d3d29 [ISSUE #7228] Converge the use of some important variables
for some class
69c26d3d29 is described below
commit 69c26d3d29cde7b4484ecd112ab9224f9f42bf45
Author: guyinyou <[email protected]>
AuthorDate: Wed Aug 23 10:27:52 2023 +0800
[ISSUE #7228] Converge the use of some important variables for some class
---
.../org/apache/rocketmq/store/ConsumeQueue.java | 16 ++++++-------
.../org/apache/rocketmq/store/MappedFileQueue.java | 26 +++++++++++++---------
.../rocketmq/store/MultiPathMappedFileQueue.java | 4 +---
3 files changed, 24 insertions(+), 22 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index a0b886eb0e..56bee2af3e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -145,7 +145,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
@@ -409,7 +409,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
int logicFileSize = this.mappedFileSize;
- this.maxPhysicOffset = phyOffset;
+ this.setMaxPhysicOffset(phyOffset);
long maxExtAddr = 1;
boolean shouldDeleteFile = false;
while (true) {
@@ -435,7 +435,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
// This maybe not take effect, when not every
consume queue has extend file.
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
@@ -453,7 +453,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
@@ -881,8 +881,8 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
private boolean putMessagePositionInfo(final long offset, final int size,
final long tagsCode,
final long cqOffset) {
- if (offset + size <= this.maxPhysicOffset) {
- log.warn("Maybe try to build consume queue repeatedly
maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
+ if (offset + size <= this.getMaxPhysicOffset()) {
+ log.warn("Maybe try to build consume queue repeatedly
maxPhysicOffset={} phyOffset={}", this.getMaxPhysicOffset(), offset);
return true;
}
@@ -926,7 +926,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
);
}
}
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
@@ -1130,7 +1130,7 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
@Override
public void destroy() {
- this.maxPhysicOffset = -1;
+ this.setMaxPhysicOffset(-1);
this.minLogicOffset = 0;
this.mappedFileQueue.destroy();
if (isExtReadEnable()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 0bc70642fe..32b90d14f7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -285,7 +285,7 @@ public class MappedFileQueue implements Swappable {
if (this.mappedFiles.isEmpty())
return 0;
- long committed = this.flushedWhere;
+ long committed = this.getFlushedWhere();
if (committed != 0) {
MappedFile mappedFile = this.getLastMappedFile(0, false);
if (mappedFile != null) {
@@ -442,11 +442,11 @@ public class MappedFileQueue implements Swappable {
}
public long remainHowManyDataToCommit() {
- return getMaxWrotePosition() - committedWhere;
+ return getMaxWrotePosition() - getCommittedWhere();
}
public long remainHowManyDataToFlush() {
- return getMaxOffset() - flushedWhere;
+ return getMaxOffset() - this.getFlushedWhere();
}
public void deleteLastMappedFile() {
@@ -616,15 +616,15 @@ public class MappedFileQueue implements Swappable {
public boolean flush(final int flushLeastPages) {
boolean result = true;
- MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere,
this.flushedWhere == 0);
+ MappedFile mappedFile =
this.findMappedFileByOffset(this.getFlushedWhere(), this.getFlushedWhere() ==
0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
- result = where == this.flushedWhere;
- this.flushedWhere = where;
+ result = where == this.getFlushedWhere();
+ this.setFlushedWhere(where);
if (0 == flushLeastPages) {
- this.storeTimestamp = tmpTimeStamp;
+ this.setStoreTimestamp(tmpTimeStamp);
}
}
@@ -633,12 +633,12 @@ public class MappedFileQueue implements Swappable {
public synchronized boolean commit(final int commitLeastPages) {
boolean result = true;
- MappedFile mappedFile =
this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
+ MappedFile mappedFile =
this.findMappedFileByOffset(this.getCommittedWhere(), this.getCommittedWhere()
== 0);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
- result = where == this.committedWhere;
- this.committedWhere = where;
+ result = where == this.getCommittedWhere();
+ this.setCommittedWhere(where);
}
return result;
@@ -763,7 +763,7 @@ public class MappedFileQueue implements Swappable {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
- this.flushedWhere = 0;
+ this.setFlushedWhere(0);
// delete parent directory
File file = new File(storePath);
@@ -848,6 +848,10 @@ public class MappedFileQueue implements Swappable {
return storeTimestamp;
}
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+
public List<MappedFile> getMappedFiles() {
return mappedFiles;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
index 8f5af94380..8ff050dfe3 100644
---
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.store;
-
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -113,8 +112,7 @@ public class MultiPathMappedFileQueue extends
MappedFileQueue {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
- this.flushedWhere = 0;
-
+ this.setFlushedWhere(0);
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());