This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 18c78cd10 ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory
usage.
18c78cd10 is described below
commit 18c78cd10bc02d764a46ac1659b263cf69f2671d
Author: Yan Zhao <[email protected]>
AuthorDate: Sat Feb 10 00:32:53 2024 +0800
ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory usage.
Reduce the committed log memory usage.
Fix ci.
Reviewers: eolivelli, hangc0276, anmolnar
Author: horizonzy
Closes #2115 from horizonzy/reduce-committed-log-memory-usage
---
.../java/org/apache/zookeeper/server/Request.java | 19 ++------
.../zookeeper/server/TxnLogProposalIterator.java | 7 ++-
.../org/apache/zookeeper/server/ZKDatabase.java | 13 ++----
.../org/apache/zookeeper/server/quorum/Leader.java | 54 ++++++++++++++++++++--
.../zookeeper/server/quorum/LearnerHandler.java | 4 +-
.../server/quorum/flexible/QuorumOracleMaj.java | 10 ++--
.../server/quorum/LeaderWithObserverTest.java | 5 +-
.../server/quorum/LearnerHandlerTest.java | 14 +++---
.../zookeeper/test/GetProposalFromTxnTest.java | 2 +-
.../zookeeper/test/LocalSessionRequestTest.java | 4 +-
10 files changed, 82 insertions(+), 50 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 10111c8a6..27fa4e2df 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -19,7 +19,6 @@
package org.apache.zookeeper.server;
import static java.nio.charset.StandardCharsets.UTF_8;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -169,24 +168,16 @@ public class Request {
&& this.type != OpCode.createSession;
}
- private transient byte[] serializeData;
-
- @SuppressFBWarnings(value = "EI_EXPOSE_REP")
public byte[] getSerializeData() {
if (this.hdr == null) {
return null;
}
-
- if (this.serializeData == null) {
- try {
- this.serializeData = Util.marshallTxnEntry(this.hdr, this.txn,
this.txnDigest);
- } catch (IOException e) {
- LOG.error("This really should be impossible.", e);
- this.serializeData = new byte[32];
- }
+ try {
+ return Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
+ } catch (IOException e) {
+ LOG.error("This really should be impossible.", e);
+ return new byte[32];
}
-
- return this.serializeData;
}
/**
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
index 847e3b2fa..2d6ecb631 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
@@ -58,20 +58,19 @@ public class TxnLogProposalIterator implements
Iterator<Proposal> {
@Override
public Proposal next() {
- Proposal p = new Proposal();
+ Proposal p;
try {
byte[] serializedData = Util.marshallTxnEntry(itr.getHeader(),
itr.getTxn(), itr.getDigest());
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL,
itr.getHeader().getZxid(), serializedData, null);
- p.packet = pp;
- p.request = null;
-
+ p = new Proposal(pp);
// This is the only place that can throw IO exception
hasNext = itr.next();
} catch (IOException e) {
LOG.error("Unable to read txnlog from disk", e);
hasNext = false;
+ p = new Proposal();
}
return p;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index eaad05cd2..d98c97f2c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -54,9 +54,8 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
import org.apache.zookeeper.server.persistence.SnapStream;
import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
-import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnDigest;
@@ -323,19 +322,15 @@ public class ZKDatabase {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.remove();
- minCommittedLog = committedLog.peek().packet.getZxid();
+ minCommittedLog = committedLog.peek().getZxid();
}
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
- byte[] data = request.getSerializeData();
- QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
data, null);
- Proposal p = new Proposal();
- p.packet = pp;
- p.request = request;
+ PureRequestProposal p = new PureRequestProposal(request);
committedLog.add(p);
- maxCommittedLog = p.packet.getZxid();
+ maxCommittedLog = p.getZxid();
} finally {
wl.unlock();
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 3b9c827c3..0b57bb182 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -88,14 +88,60 @@ public class Leader extends LearnerMaster {
public static class Proposal extends SyncedLearnerTracker {
- public QuorumPacket packet;
- public Request request;
+ private QuorumPacket packet;
+ protected Request request;
+
+ public Proposal() {
+ }
+
+ public Proposal(QuorumPacket packet) {
+ this.packet = packet;
+ }
+
+ public Proposal(Request request, QuorumPacket packet) {
+ this.request = request;
+ this.packet = packet;
+ }
+
+ public QuorumPacket getQuorumPacket() {
+ return packet;
+ }
+
+ public Request getRequest() {
+ return request;
+ }
+
+ public long getZxid() {
+ return packet.getZxid();
+ }
@Override
public String toString() {
return packet.getType() + ", " + packet.getZxid() + ", " + request;
}
+ }
+ public static class PureRequestProposal extends Proposal {
+
+ public PureRequestProposal(Request request) {
+ this.request = request;
+ }
+
+ @Override
+ public QuorumPacket getQuorumPacket() {
+ byte[] data = request.getSerializeData();
+ return new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
+ }
+
+ @Override
+ public long getZxid() {
+ return request.zxid;
+ }
+
+ @Override
+ public String toString() {
+ return request.toString();
+ }
}
// log ack latency if zxid is a multiple of ackLoggingFrequency. If <=0,
disable logging.
@@ -1258,9 +1304,7 @@ public class Leader extends LearnerMaster {
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
data, null);
- Proposal p = new Proposal();
- p.packet = pp;
- p.request = request;
+ Proposal p = new Proposal(request, pp);
synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier());
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index e9d5cd4e5..049336a16 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -957,7 +957,7 @@ public class LearnerHandler extends ZooKeeperThread {
while (itr.hasNext()) {
Proposal propose = itr.next();
- long packetZxid = propose.packet.getZxid();
+ long packetZxid = propose.getZxid();
// abort if we hit the limit
if ((maxZxid != null) && (packetZxid > maxZxid)) {
break;
@@ -1020,7 +1020,7 @@ public class LearnerHandler extends ZooKeeperThread {
// Since this is already a committed proposal, we need to follow
// it by a commit packet
- queuePacket(propose.packet);
+ queuePacket(propose.getQuorumPacket());
queueOpPacket(Leader.COMMIT, packetZxid);
queuedZxid = packetZxid;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
index 01f3a8240..b3e7fa249 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
@@ -123,18 +123,18 @@ public class QuorumOracleMaj extends QuorumMaj {
LOG.debug("Start Revalidation outstandingProposals");
try {
while (outstandingProposal.size() >= 1) {
- outstandingProposal.sort((o1, o2) -> (int)
(o1.packet.getZxid() - o2.packet.getZxid()));
+ outstandingProposal.sort((o1, o2) -> (int) (o1.getZxid() -
o2.getZxid()));
Leader.Proposal p;
int i = 0;
while (i < outstandingProposal.size()) {
p = outstandingProposal.get(i);
- if (p.request.zxid > lastCommitted) {
- LOG.debug("Re-validate outstanding proposal: 0x{}
size:{} lastCommitted:{}", Long.toHexString(p.request.zxid),
outstandingProposal.size(), Long.toHexString(lastCommitted));
- if (!self.tryToCommit(p, p.request.zxid, null)) {
+ if (p.getZxid() > lastCommitted) {
+ LOG.debug("Re-validate outstanding proposal: 0x{}
size:{} lastCommitted:{}", Long.toHexString(p.getZxid()),
outstandingProposal.size(), Long.toHexString(lastCommitted));
+ if (!self.tryToCommit(p, p.getZxid(), null)) {
break;
} else {
- lastCommitted = p.request.zxid;
+ lastCommitted = p.getZxid();
outstandingProposal.remove(p);
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
index f9de45183..518dcef0f 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
@@ -143,7 +144,9 @@ public class LeaderWithObserverTest {
long zxid = leader.zk.getZxid();
// things needed for waitForNewLeaderAck to run (usually in
leader.lead(), but we're not running leader here)
- leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null,
null);
+ Field field = Leader.Proposal.class.getDeclaredField("packet");
+ field.setAccessible(true);
+ field.set(leader.newLeaderProposal, new QuorumPacket(0, zxid, null,
null));
leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
Set<Long> ackSet =
leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index bbf36367e..43202716d 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -86,14 +86,14 @@ public class LearnerHandlerTest extends ZKTestCase {
public long getmaxCommittedLog() {
if (!committedLog.isEmpty()) {
- return committedLog.getLast().packet.getZxid();
+ return committedLog.getLast().getZxid();
}
return 0;
}
public long getminCommittedLog() {
if (!committedLog.isEmpty()) {
- return committedLog.getFirst().packet.getZxid();
+ return committedLog.getFirst().getZxid();
}
return 0;
}
@@ -107,7 +107,7 @@ public class LearnerHandlerTest extends ZKTestCase {
}
public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long
limit) {
- if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
+ if (peerZxid >= txnLog.peekFirst().getZxid()) {
return txnLog.iterator();
} else {
return Collections.emptyIterator();
@@ -150,10 +150,10 @@ public class LearnerHandlerTest extends ZKTestCase {
}
Proposal createProposal(long zxid) {
- Proposal p = new Proposal();
- p.packet = new QuorumPacket();
- p.packet.setZxid(zxid);
- p.packet.setType(Leader.PROPOSAL);
+ QuorumPacket packet = new QuorumPacket();
+ packet.setZxid(zxid);
+ packet.setType(Leader.PROPOSAL);
+ Proposal p = new Proposal(packet);
return p;
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
index 010d69b33..a85e76d01 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
@@ -107,7 +107,7 @@ public class GetProposalFromTxnTest extends ZKTestCase {
while (itr.hasNext()) {
Proposal proposal = itr.next();
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(
- proposal.packet.getData());
+ proposal.getQuorumPacket().getData());
TxnHeader hdr = logEntry.getHeader();
Record rec = logEntry.getTxn();
if (hdr.getType() == OpCode.create) {
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
index 1c1c72e1a..16a470c31 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
@@ -82,8 +82,8 @@ public class LocalSessionRequestTest extends ZKTestCase {
QuorumPeer peer = qb.getPeerList().get(peerId);
ZKDatabase db = peer.getActiveServer().getZKDatabase();
for (Proposal p : db.getCommittedLog()) {
- assertFalse(p.request.sessionId == sessionId,
- "Should not see " + Request.op2String(p.request.type)
+ assertFalse(p.getRequest().sessionId == sessionId,
+ "Should not see " + Request.op2String(p.getRequest().type)
+ " request from local session 0x" + session + "
on the " + peerType);
}
}