This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 31c7a7e4c ZOOKEEPER-4723. Cleanup more in-place bytebuffer
manipulations (#2036)
31c7a7e4c is described below
commit 31c7a7e4c2b3b71549a06f442f8a4c5e9f03a6df
Author: tison <[email protected]>
AuthorDate: Tue Mar 5 16:37:54 2024 +0800
ZOOKEEPER-4723. Cleanup more in-place bytebuffer manipulations (#2036)
Signed-off-by: tison <[email protected]>
---
.../main/java/org/apache/jute/RecordReader.java | 83 ----------------------
.../main/java/org/apache/jute/RecordWriter.java | 81 ---------------------
.../zookeeper/server/ByteBufferOutputStream.java | 57 ---------------
.../java/org/apache/zookeeper/server/DataTree.java | 27 ++++---
.../org/apache/zookeeper/server/NIOServerCnxn.java | 2 +-
.../apache/zookeeper/server/NettyServerCnxn.java | 2 +-
.../zookeeper/server/PrepRequestProcessor.java | 11 +--
.../org/apache/zookeeper/server/ServerCnxn.java | 32 ++++-----
.../zookeeper/server/SimpleRequestRecord.java | 3 +-
.../apache/zookeeper/server/quorum/Zab1_0Test.java | 17 ++---
10 files changed, 38 insertions(+), 277 deletions(-)
diff --git a/zookeeper-jute/src/main/java/org/apache/jute/RecordReader.java
b/zookeeper-jute/src/main/java/org/apache/jute/RecordReader.java
deleted file mode 100644
index 2c91d2e0a..000000000
--- a/zookeeper-jute/src/main/java/org/apache/jute/RecordReader.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jute;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-
-/**
- * Front-end interface to deserializers. Also acts as a factory
- * for deserializers.
- */
-public class RecordReader {
-
- private static HashMap<String, Method> archiveFactory;
-
- private InputArchive archive;
-
- static {
- archiveFactory = new HashMap<>();
-
- try {
- archiveFactory.put(
- "binary",
- BinaryInputArchive.class.getDeclaredMethod("getArchive",
InputStream.class));
- } catch (SecurityException | NoSuchMethodException ex) {
- ex.printStackTrace();
- }
- }
-
- private static InputArchive createArchive(InputStream in, String format) {
- Method factory = archiveFactory.get(format);
-
- if (factory != null) {
- Object[] params = {in};
- try {
- return (InputArchive) factory.invoke(null, params);
- } catch (IllegalArgumentException | InvocationTargetException |
IllegalAccessException ex) {
- ex.printStackTrace();
- }
- }
-
- return null;
- }
-
- /**
- * Creates a new instance of RecordReader.
- *
- * @param in Stream from which to deserialize a record
- * @param format Deserialization format ("binary", "xml", or "csv")
- */
- public RecordReader(InputStream in, String format) {
- archive = createArchive(in, format);
- }
-
- /**
- * Deserialize a record.
- *
- * @param r Record to be deserialized
- */
- public void read(Record r) throws IOException {
- r.deserialize(archive, "");
- }
-
-}
diff --git a/zookeeper-jute/src/main/java/org/apache/jute/RecordWriter.java
b/zookeeper-jute/src/main/java/org/apache/jute/RecordWriter.java
deleted file mode 100644
index a6d74a978..000000000
--- a/zookeeper-jute/src/main/java/org/apache/jute/RecordWriter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jute;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-
-/**
- * Front-end for serializers. Also serves as a factory for serializers.
- */
-public class RecordWriter {
-
- private OutputArchive archive;
-
- static HashMap<String, Method> constructFactory() {
- HashMap<String, Method> factory = new HashMap<>();
-
- try {
- factory.put(
- "binary",
- BinaryOutputArchive.class.getDeclaredMethod("getArchive",
OutputStream.class));
- } catch (SecurityException | NoSuchMethodException ex) {
- ex.printStackTrace();
- }
-
- return factory;
- }
-
- private static HashMap<String, Method> archiveFactory = constructFactory();
-
- private static OutputArchive createArchive(OutputStream out, String
format) {
- Method factory = archiveFactory.get(format);
- if (factory != null) {
- Object[] params = {out};
- try {
- return (OutputArchive) factory.invoke(null, params);
- } catch (IllegalArgumentException | InvocationTargetException |
IllegalAccessException ex) {
- ex.printStackTrace();
- }
- }
- return null;
- }
-
- /**
- * Creates a new instance of RecordWriter.
- *
- * @param out Output stream where the records will be serialized
- * @param format Serialization format ("binary", "xml", or "csv")
- */
- public RecordWriter(OutputStream out, String format) {
- archive = createArchive(out, format);
- }
-
- /**
- * Serialize a record.
- *
- * @param r record to be serialized
- */
- public void write(Record r) throws IOException {
- r.serialize(archive, "");
- }
-}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java
deleted file mode 100644
index 35a528cdb..000000000
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import javax.annotation.Nonnull;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-
-public class ByteBufferOutputStream extends OutputStream {
-
- private final ByteBuffer bb;
-
- public ByteBufferOutputStream(ByteBuffer bb) {
- this.bb = bb;
- }
-
- @Override
- public void write(int b) throws IOException {
- bb.put((byte) b);
- }
-
- @Override
- public void write(@Nonnull byte[] b) throws IOException {
- bb.put(b);
- }
-
- @Override
- public void write(@Nonnull byte[] b, int off, int len) throws IOException {
- bb.put(b, off, len);
- }
-
- public static void record2ByteBuffer(Record record, ByteBuffer bb) throws
IOException {
- BinaryOutputArchive oa;
- oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb));
- record.serialize(oa, "request");
- }
-
-}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index 6dbe94332..4a3de64c7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -21,7 +21,6 @@ package org.apache.zookeeper.server;
import java.io.EOFException;
import java.io.IOException;
import java.io.PrintWriter;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -34,6 +33,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
@@ -969,46 +969,43 @@ public class DataTree {
boolean post_failed = false;
for (Txn subtxn : txns) {
- ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
- Record record;
+ final Supplier<Record> supplier;
switch (subtxn.getType()) {
case OpCode.create:
case OpCode.create2:
- record = new CreateTxn();
+ supplier = CreateTxn::new;
break;
case OpCode.createTTL:
- record = new CreateTTLTxn();
+ supplier = CreateTTLTxn::new;
break;
case OpCode.createContainer:
- record = new CreateContainerTxn();
+ supplier = CreateContainerTxn::new;
break;
case OpCode.delete:
case OpCode.deleteContainer:
- record = new DeleteTxn();
+ supplier = DeleteTxn::new;
break;
case OpCode.setData:
- record = new SetDataTxn();
+ supplier = SetDataTxn::new;
break;
case OpCode.error:
- record = new ErrorTxn();
+ supplier = ErrorTxn::new;
post_failed = true;
break;
case OpCode.check:
- record = new CheckVersionTxn();
+ supplier = CheckVersionTxn::new;
break;
default:
throw new IOException("Invalid type of op: " +
subtxn.getType());
}
- assert record != null;
-
- ByteBufferInputStream.byteBuffer2Record(bb, record);
-
+ final Record record;
if (failed && subtxn.getType() != OpCode.error) {
int ec = post_failed ?
Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();
-
subtxn.setType(OpCode.error);
record = new ErrorTxn(ec);
+ } else {
+ record =
RequestRecord.fromBytes(subtxn.getData()).readRecord(supplier);
}
assert !failed || (subtxn.getType() == OpCode.error);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index 61cbe71ba..cfd04fcf6 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -687,7 +687,7 @@ public class NIOServerCnxn extends ServerCnxn {
public int sendResponse(ReplyHeader h, Record r, String tag, String
cacheKey, Stat stat, int opCode) {
int responseSize = 0;
try {
- ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
+ ByteBuffer[] bb = serialize(h, r, cacheKey, stat, opCode);
responseSize = bb[0].getInt();
bb[0].rewind();
sendBuffer(bb);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index 45d548409..8a7de2e92 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -190,7 +190,7 @@ public class NettyServerCnxn extends ServerCnxn {
if (closingChannel || !channel.isOpen()) {
return 0;
}
- ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
+ ByteBuffer[] bb = serialize(h, r, cacheKey, stat, opCode);
int responseSize = bb[0].getInt();
bb[0].rewind();
sendBuffer(bb);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 274862ee3..075f8d7c8 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -18,10 +18,8 @@
package org.apache.zookeeper.server;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -32,7 +30,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DeleteContainerRequest;
@@ -868,12 +865,8 @@ public class PrepRequestProcessor extends
ZooKeeperCriticalThread implements Req
// TODO: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our
list.
- try (ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
- BinaryOutputArchive boa =
BinaryOutputArchive.getArchive(baos);
- txn.serialize(boa, "request");
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- txns.add(new Txn(type, bb.array()));
- }
+ byte[] bb = RequestRecord.fromRecord(txn).readBytes();
+ txns.add(new Txn(type, bb));
}
request.setTxn(new MultiTxn(txns));
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 331f38d7f..652823e1a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -18,7 +18,6 @@
package org.apache.zookeeper.server;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -35,7 +34,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.WatchedEvent;
@@ -173,29 +171,27 @@ public abstract class ServerCnxn implements Stats,
Watcher {
* @param r reply payload, can be null
* @param tag Jute serialization tag, can be null
* @param cacheKey Key for caching the serialized payload. A null value
prevents caching.
- * @param stat Stat information for the the reply payload, used for cache
invalidation.
+ * @param stat Stat information for the reply payload, used for cache
invalidation.
* A value of 0 prevents caching.
* @param opCode The op code appertains to the corresponding request of
the response,
* used to decide which cache (e.g. read response cache,
* list of children response cache, ...) object to look up
to when applicable.
*/
- public abstract int sendResponse(ReplyHeader h, Record r, String tag,
- String cacheKey, Stat stat, int opCode)
throws IOException;
+ public abstract int sendResponse(
+ ReplyHeader h,
+ Record r,
+ String tag,
+ String cacheKey,
+ Stat stat,
+ int opCode
+ ) throws IOException;
public int sendResponse(ReplyHeader h, Record r, String tag) throws
IOException {
return sendResponse(h, r, tag, null, null, -1);
}
- protected byte[] serializeRecord(Record record) throws IOException {
- ByteArrayOutputStream baos = new
ByteArrayOutputStream(ZooKeeperServer.intBufferStartingSizeBytes);
- BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
- bos.writeRecord(record, null);
- return baos.toByteArray();
- }
-
- protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
- String cacheKey, Stat stat, int opCode)
throws IOException {
- byte[] header = serializeRecord(h);
+ protected ByteBuffer[] serialize(ReplyHeader h, Record r, String cacheKey,
Stat stat, int opCode) throws IOException {
+ byte[] header = RequestRecord.fromRecord(h).readBytes();
byte[] data = null;
if (r != null) {
ResponseCache cache = null;
@@ -221,18 +217,18 @@ public abstract class ServerCnxn implements Stats,
Watcher {
// Use cache to get serialized data.
//
// NB: Tag is ignored both during cache lookup and
serialization,
- // since is is not used in read responses, which are being
cached.
+ // since it is not used in read responses, which are being
cached.
data = cache.get(cacheKey, stat);
if (data == null) {
// Cache miss, serialize the response and put it in cache.
- data = serializeRecord(r);
+ data = RequestRecord.fromRecord(r).readBytes();
cache.put(cacheKey, data, stat);
cacheMiss.add(1);
} else {
cacheHit.add(1);
}
} else {
- data = serializeRecord(r);
+ data = RequestRecord.fromRecord(r).readBytes();
}
}
int dataLength = data == null ? 0 : data.length;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
index a1c78ddad..a97cd19ae 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
@@ -50,7 +50,8 @@ public class SimpleRequestRecord implements RequestRecord {
return bytes;
}
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream(
+ ZooKeeperServer.intBufferStartingSizeBytes)) {
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
record.serialize(boa, "request");
bytes = baos.toByteArray();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 2a4ebaa50..76a678f50 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -53,9 +53,9 @@ import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ByteBufferInputStream;
-import org.apache.zookeeper.server.ByteBufferOutputStream;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -517,8 +517,7 @@ public class Zab1_0Test extends ZKTestCase {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
- byte[] liBytes = new byte[20];
- ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
+ byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1,
liBytes, null);
oa.writeRecord(qp, null);
@@ -830,8 +829,7 @@ public class Zab1_0Test extends ZKTestCase {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
- byte[] liBytes = new byte[20];
- ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
+ byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
liBytes, null);
oa.writeRecord(qp, null);
@@ -871,8 +869,7 @@ public class Zab1_0Test extends ZKTestCase {
assertEquals(0, l.self.getCurrentEpoch());
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
- byte[] liBytes = new byte[20];
- ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
+ byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
liBytes, null);
oa.writeRecord(qp, null);
@@ -1074,8 +1071,7 @@ public class Zab1_0Test extends ZKTestCase {
public void converseWithLeader(InputArchive ia, OutputArchive oa,
Leader l) throws IOException {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
- byte[] liBytes = new byte[20];
- ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
+ byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
/* we are going to say we last acked epoch 20 */
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO,
ZxidUtils.makeZxid(20, 0), liBytes, null);
oa.writeRecord(qp, null);
@@ -1112,8 +1108,7 @@ public class Zab1_0Test extends ZKTestCase {
public void converseWithLeader(InputArchive ia, OutputArchive oa,
Leader l) throws IOException, InterruptedException {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
- byte[] liBytes = new byte[20];
- ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
+ byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
liBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);