[CALCITE-1094] Replace ByteArrayOutputStream to avoid synchronized writes Pull in the ZeroCopyByteString class. We can cap the amount of byte[]'s that we are making by providing a buffer to the current thread. This also avoids any synchronization on a typical object pool.
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/1d3a26df Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/1d3a26df Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/1d3a26df Branch: refs/heads/master Commit: 1d3a26dfac17fea458402a637449007dc095bced Parents: 0de38aa Author: Josh Elser <[email protected]> Authored: Wed Mar 2 17:43:41 2016 -0500 Committer: Josh Elser <[email protected]> Committed: Wed Mar 2 23:26:02 2016 -0500 ---------------------------------------------------------------------- .../avatica/server/AvaticaJsonHandler.java | 16 +- .../avatica/server/AvaticaProtobufHandler.java | 20 ++- .../protobuf/HBaseZeroCopyByteString.java | 77 ++++++++++ .../java/com/google/protobuf/package-info.java | 26 ++++ .../apache/calcite/avatica/AvaticaUtils.java | 47 ++++-- .../avatica/remote/ProtobufTranslation.java | 10 +- .../avatica/remote/ProtobufTranslationImpl.java | 146 +++++++++++++++--- .../avatica/remote/RequestTranslator.java | 3 +- .../apache/calcite/avatica/remote/Service.java | 8 +- .../calcite/avatica/remote/TypedValue.java | 3 +- .../avatica/util/UnsynchronizedBuffer.java | 152 +++++++++++++++++++ .../remote/ProtobufSerializationTest.java | 75 +++++++++ .../avatica/util/UnsynchronizedBufferTest.java | 41 +++++ 13 files changed, 578 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java index 703a2c3..34a9333 100644 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java +++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java @@ -25,6 +25,7 @@ import org.apache.calcite.avatica.remote.Handler.HandlerResponse; import org.apache.calcite.avatica.remote.JsonHandler; import org.apache.calcite.avatica.remote.Service; import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; +import org.apache.calcite.avatica.util.UnsynchronizedBuffer; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; @@ -54,6 +55,8 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA final MetricsSystem metrics; final Timer requestTimer; + final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer; + public AvaticaJsonHandler(Service service) { this(service, NoopMetricsSystem.getInstance()); } @@ -67,6 +70,12 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA // Metrics this.requestTimer = this.metrics.getTimer( concat(AvaticaJsonHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME)); + + this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() { + @Override public UnsynchronizedBuffer initialValue() { + return new UnsynchronizedBuffer(); + } + }; } public void handle(String target, Request baseRequest, @@ -80,8 +89,13 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA // The latter allows very large requests without hitting HTTP 413. String rawRequest = request.getHeader("request"); if (rawRequest == null) { + // Avoid a new buffer creation for every HTTP request + final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); try (ServletInputStream inputStream = request.getInputStream()) { - rawRequest = AvaticaUtils.readFully(inputStream); + rawRequest = AvaticaUtils.readFully(inputStream, buffer); + } finally { + // Reset the offset into the buffer after we're done + buffer.reset(); } } final String jsonRequest = http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java index aeebad7..27e73de 100644 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java +++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java @@ -28,6 +28,7 @@ import org.apache.calcite.avatica.remote.ProtobufTranslation; import org.apache.calcite.avatica.remote.ProtobufTranslationImpl; import org.apache.calcite.avatica.remote.Service; import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; +import org.apache.calcite.avatica.util.UnsynchronizedBuffer; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; @@ -55,19 +56,28 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw private final MetricsSystem metrics; private final Timer requestTimer; + final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer; + public AvaticaProtobufHandler(Service service) { this(service, NoopMetricsSystem.getInstance()); } public AvaticaProtobufHandler(Service service, MetricsSystem metrics) { - this.protobufTranslation = new ProtobufTranslationImpl(); this.service = Objects.requireNonNull(service); this.metrics = Objects.requireNonNull(metrics); - this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics); this.requestTimer = this.metrics.getTimer( MetricsHelper.concat(AvaticaProtobufHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME)); + + this.protobufTranslation = new ProtobufTranslationImpl(); + this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics); + + this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() { + @Override public UnsynchronizedBuffer initialValue() { + return new UnsynchronizedBuffer(); + } + }; } public void handle(String target, Request baseRequest, @@ -78,8 +88,12 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw response.setStatus(HttpServletResponse.SC_OK); if (request.getMethod().equals("POST")) { byte[] requestBytes; + // Avoid a new buffer creation for every HTTP request + final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); try (ServletInputStream inputStream = request.getInputStream()) { - requestBytes = AvaticaUtils.readFullyToBytes(inputStream); + requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer); + } finally { + buffer.reset(); } HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes); http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java b/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java new file mode 100644 index 0000000..62c4dd2 --- /dev/null +++ b/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.protobuf; + +/** + * Helper class to extract byte arrays from {@link ByteString} without copy. + * + * Without this protobufs would force us to copy every single byte array out + * of the objects de-serialized from the wire (which already do one copy, on + * top of the copies the JVM does to go from kernel buffer to C buffer and + * from C buffer to JVM buffer). + * + * Graciously copied from Apache HBase. + */ +public final class HBaseZeroCopyByteString extends LiteralByteString { + // Gotten from AsyncHBase code base with permission. + /** Private constructor so this class cannot be instantiated. */ + private HBaseZeroCopyByteString() { + super(null); + throw new UnsupportedOperationException("Should never be here."); + } + + /** + * Wraps a byte array in a {@link ByteString} without copying it. + * + * @param array The byte array to wrap + * @return a ByteString wrapping the <code>array</code> + */ + public static ByteString wrap(final byte[] array) { + return new LiteralByteString(array); + } + + /** + * Wraps a subset of a byte array in a {@link ByteString} without copying it. + * + * @param array The byte array to wrap + * @param offset the start of data in the array + * @param length The number of bytes of data at <code>offset</code> + * @return a ByteString wrapping the <code>array</code> + */ + public static ByteString wrap(final byte[] array, int offset, int length) { + return new BoundedByteString(array, offset, length); + } + + + /** + * Extracts the byte array from the given {@link ByteString} without copy. + * @param buf A buffer from which to extract the array. This buffer must be + * actually an instance of a {@code LiteralByteString}. + * + * @param buf <code>ByteString</code> to access + * @return The underlying byte array of the ByteString + */ + public static byte[] zeroCopyGetBytes(final ByteString buf) { + if (buf instanceof LiteralByteString) { + return ((LiteralByteString) buf).bytes; + } + throw new UnsupportedOperationException("Need a LiteralByteString, got a " + + buf.getClass().getName()); + } +} + +// End HBaseZeroCopyByteString.java http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/com/google/protobuf/package-info.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/com/google/protobuf/package-info.java b/avatica/src/main/java/com/google/protobuf/package-info.java new file mode 100644 index 0000000..92f110e --- /dev/null +++ b/avatica/src/main/java/com/google/protobuf/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Avatica-custom classes to access protected classes in Google Protobuf. + */ +@PackageMarker +package com.google.protobuf; + +import org.apache.calcite.avatica.util.PackageMarker; + +// End package-info.java http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java index 9382f87..a999f19 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java @@ -16,13 +16,15 @@ */ package org.apache.calcite.avatica; -import java.io.ByteArrayOutputStream; +import org.apache.calcite.avatica.util.UnsynchronizedBuffer; + import java.io.IOException; import java.io.InputStream; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Statement; import java.util.AbstractList; @@ -45,6 +47,12 @@ public class AvaticaUtils { private static final Set<String> UNIQUE_STRINGS = new HashSet<>(); + private static final ThreadLocal<byte[]> PER_THREAD_BUFFER = new ThreadLocal<byte[]>() { + @Override protected byte[] initialValue() { + return new byte[4096]; + } + }; + private AvaticaUtils() {} static { @@ -200,25 +208,46 @@ public class AvaticaUtils { /** Reads the contents of an input stream and returns as a string. */ public static String readFully(InputStream inputStream) throws IOException { - return _readFully(inputStream).toString(); + return readFully(inputStream, new UnsynchronizedBuffer(1024)); } + /** Reads the contents of an input stream and returns as a string. */ + public static String readFully(InputStream inputStream, UnsynchronizedBuffer buffer) + throws IOException { + // Variant that lets us use a pooled Buffer + final byte[] bytes = _readFully(inputStream, buffer); + return new String(bytes, 0, bytes.length, StandardCharsets.UTF_8); + } + + /** Reads the contents of an input stream and returns as a string. */ public static byte[] readFullyToBytes(InputStream inputStream) throws IOException { - return _readFully(inputStream).toByteArray(); + return readFullyToBytes(inputStream, new UnsynchronizedBuffer(1024)); } - /** Reads the contents of an input stream and returns a ByteArrayOutputStrema. */ - static ByteArrayOutputStream _readFully(InputStream inputStream) throws IOException { - final byte[] bytes = new byte[4096]; - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + /** Reads the contents of an input stream and returns as a string. */ + public static byte[] readFullyToBytes(InputStream inputStream, UnsynchronizedBuffer buffer) + throws IOException { + // Variant that lets us use a pooled Buffer + return _readFully(inputStream, buffer); + } + + /** + * Reads the contents of an input stream and returns a byte array. + * + * @param inputStream the input to read from. + * @return A byte array whose length is equal to the number of bytes contained. + */ + static byte[] _readFully(InputStream inputStream, UnsynchronizedBuffer buffer) + throws IOException { + final byte[] bytes = PER_THREAD_BUFFER.get(); for (;;) { int count = inputStream.read(bytes, 0, bytes.length); if (count < 0) { break; } - baos.write(bytes, 0, count); + buffer.write(bytes, 0, count); } - return baos; + return buffer.toArray(); } /** Invokes {@code Statement#setLargeMaxRows}, falling back on http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java index acb82db..7142d59 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java @@ -19,8 +19,6 @@ package org.apache.calcite.avatica.remote; import org.apache.calcite.avatica.remote.Service.Request; import org.apache.calcite.avatica.remote.Service.Response; -import com.google.protobuf.InvalidProtocolBufferException; - import java.io.IOException; /** @@ -49,18 +47,18 @@ public interface ProtobufTranslation { * * @param bytes Serialized protocol buffer request from client * @return A Request object for the given bytes - * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized + * @throws IOException If the protocol buffer cannot be deserialized */ - Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException; + Request parseRequest(byte[] bytes) throws IOException; /** * Parses a serialized protocol buffer response into a {@link Response}. * * @param bytes Serialized protocol buffer request from server * @return The Response object for the given bytes - * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized + * @throws IOException If the protocol buffer cannot be deserialized */ - Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException; + Response parseResponse(byte[] bytes) throws IOException; } // End ProtobufTranslation.java http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java index 646d706..80d2b22 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java @@ -54,17 +54,22 @@ import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse; import org.apache.calcite.avatica.remote.Service.Request; import org.apache.calcite.avatica.remote.Service.Response; import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; +import org.apache.calcite.avatica.util.UnsynchronizedBuffer; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.Message; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static java.nio.charset.StandardCharsets.UTF_8; /** * Implementation of {@link ProtobufTranslationImpl} that translates @@ -75,9 +80,10 @@ public class ProtobufTranslationImpl implements ProtobufTranslation { // Extremely ugly mapping of PB class name into a means to convert it to the POJO private static final Map<String, RequestTranslator> REQUEST_PARSERS; private static final Map<String, ResponseTranslator> RESPONSE_PARSERS; + private static final Map<Class<?>, ByteString> MESSAGE_CLASSES; static { - HashMap<String, RequestTranslator> reqParsers = new HashMap<>(); + Map<String, RequestTranslator> reqParsers = new ConcurrentHashMap<>(); reqParsers.put(CatalogsRequest.class.getName(), new RequestTranslator(CatalogsRequest.parser(), new Service.CatalogsRequest())); reqParsers.put(OpenConnectionRequest.class.getName(), @@ -123,7 +129,7 @@ public class ProtobufTranslationImpl implements ProtobufTranslation { REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers); - HashMap<String, ResponseTranslator> respParsers = new HashMap<>(); + Map<String, ResponseTranslator> respParsers = new ConcurrentHashMap<>(); respParsers.put(OpenConnectionResponse.class.getName(), new ResponseTranslator(OpenConnectionResponse.parser(), new Service.OpenConnectionResponse())); @@ -162,8 +168,65 @@ public class ProtobufTranslationImpl implements ProtobufTranslation { new ResponseTranslator(RollbackResponse.parser(), new Service.RollbackResponse())); RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers); + + Map<Class<?>, ByteString> messageClassNames = new ConcurrentHashMap<>(); + for (Class<?> msgClz : getAllMessageClasses()) { + messageClassNames.put(msgClz, wrapClassName(msgClz)); + } + MESSAGE_CLASSES = Collections.unmodifiableMap(messageClassNames); + } + + private static List<Class<?>> getAllMessageClasses() { + List<Class<?>> messageClasses = new ArrayList<>(); + messageClasses.add(CatalogsRequest.class); + messageClasses.add(CloseConnectionRequest.class); + messageClasses.add(CloseStatementRequest.class); + messageClasses.add(ColumnsRequest.class); + messageClasses.add(CommitRequest.class); + messageClasses.add(ConnectionSyncRequest.class); + messageClasses.add(CreateStatementRequest.class); + messageClasses.add(DatabasePropertyRequest.class); + messageClasses.add(ExecuteRequest.class); + messageClasses.add(FetchRequest.class); + messageClasses.add(OpenConnectionRequest.class); + messageClasses.add(PrepareAndExecuteRequest.class); + messageClasses.add(PrepareRequest.class); + messageClasses.add(RollbackRequest.class); + messageClasses.add(SchemasRequest.class); + messageClasses.add(SyncResultsRequest.class); + messageClasses.add(TableTypesRequest.class); + messageClasses.add(TablesRequest.class); + messageClasses.add(TypeInfoRequest.class); + messageClasses.add(CloseConnectionResponse.class); + messageClasses.add(CloseStatementResponse.class); + messageClasses.add(CommitResponse.class); + messageClasses.add(ConnectionSyncResponse.class); + messageClasses.add(CreateStatementResponse.class); + messageClasses.add(DatabasePropertyResponse.class); + messageClasses.add(ErrorResponse.class); + messageClasses.add(ExecuteResponse.class); + messageClasses.add(FetchResponse.class); + messageClasses.add(OpenConnectionResponse.class); + messageClasses.add(PrepareResponse.class); + messageClasses.add(ResultSetResponse.class); + messageClasses.add(RollbackResponse.class); + messageClasses.add(RpcMetadata.class); + messageClasses.add(SyncResultsResponse.class); + + return messageClasses; + } + + private static ByteString wrapClassName(Class<?> clz) { + return HBaseZeroCopyByteString.wrap(clz.getName().getBytes(UTF_8)); } + private final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer = + new ThreadLocal<UnsynchronizedBuffer>() { + @Override protected UnsynchronizedBuffer initialValue() { + return new UnsynchronizedBuffer(); + } + }; + /** * Fetches the concrete message's Parser implementation. * @@ -207,42 +270,79 @@ public class ProtobufTranslationImpl implements ProtobufTranslation { } @Override public byte[] serializeResponse(Response response) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Message responseMsg = response.serialize(); - serializeMessage(out, responseMsg); - return out.toByteArray(); + // Avoid BAOS for its synchronized write methods, we don't need that concurrency control + UnsynchronizedBuffer out = threadLocalBuffer.get(); + try { + Message responseMsg = response.serialize(); + serializeMessage(out, responseMsg); + return out.toArray(); + } finally { + out.reset(); + } } @Override public byte[] serializeRequest(Request request) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Message requestMsg = request.serialize(); - serializeMessage(out, requestMsg); - return out.toByteArray(); + // Avoid BAOS for its synchronized write methods, we don't need that concurrency control + UnsynchronizedBuffer out = threadLocalBuffer.get(); + try { + Message requestMsg = request.serialize(); + serializeMessage(out, requestMsg); + return out.toArray(); + } finally { + out.reset(); + } } void serializeMessage(OutputStream out, Message msg) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - msg.writeTo(baos); + // Serialize the protobuf message + UnsynchronizedBuffer buffer = threadLocalBuffer.get(); + ByteString serializedMsg; + try { + msg.writeTo(buffer); + // Make a bytestring from it + serializedMsg = HBaseZeroCopyByteString.wrap(buffer.toArray()); + } finally { + buffer.reset(); + } - // TODO Using ByteString is copying the bytes of the message which sucks. Could try to - // lift the ZeroCopy implementation from HBase. - WireMessage wireMsg = WireMessage.newBuilder().setName(msg.getClass().getName()). - setWrappedMessage(ByteString.copyFrom(baos.toByteArray())).build(); + // Wrap the serialized message in a WireMessage + WireMessage wireMsg = WireMessage.newBuilder().setNameBytes(getClassNameBytes(msg.getClass())) + .setWrappedMessage(serializedMsg).build(); + // Write the WireMessage to the provided OutputStream wireMsg.writeTo(out); } - @Override public Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException { - WireMessage wireMsg = WireMessage.parseFrom(bytes); + ByteString getClassNameBytes(Class<?> clz) { + ByteString byteString = MESSAGE_CLASSES.get(clz); + if (null == byteString) { + throw new IllegalArgumentException("Missing ByteString for " + clz.getName()); + } + return byteString; + } + + @Override public Request parseRequest(byte[] bytes) throws IOException { + ByteString byteString = HBaseZeroCopyByteString.wrap(bytes); + CodedInputStream inputStream = byteString.newCodedInput(); + // Enable aliasing to avoid an extra copy to get at the serialized Request inside of the + // WireMessage. + inputStream.enableAliasing(true); + WireMessage wireMsg = WireMessage.parseFrom(inputStream); String serializedMessageClassName = wireMsg.getName(); RequestTranslator translator = getParserForRequest(serializedMessageClassName); + // The ByteString should be logical offsets into the original byte array return translator.transform(wireMsg.getWrappedMessage()); } - @Override public Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException { - WireMessage wireMsg = WireMessage.parseFrom(bytes); + @Override public Response parseResponse(byte[] bytes) throws IOException { + ByteString byteString = HBaseZeroCopyByteString.wrap(bytes); + CodedInputStream inputStream = byteString.newCodedInput(); + // Enable aliasing to avoid an extra copy to get at the serialized Response inside of the + // WireMessage. + inputStream.enableAliasing(true); + WireMessage wireMsg = WireMessage.parseFrom(inputStream); String serializedMessageClassName = wireMsg.getName(); ResponseTranslator translator = getParserForResponse(serializedMessageClassName); http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java index 0dadc78..417c6ed 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java @@ -36,7 +36,8 @@ public class RequestTranslator { public Service.Request transform(ByteString serializedMessage) throws InvalidProtocolBufferException { - Message msg = parser.parseFrom(serializedMessage); + // This should already be an aliased CodedInputStream from the WireMessage parsing. + Message msg = parser.parseFrom(serializedMessage.newCodedInput()); return impl.deserialize(msg); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java index aee5b29..5790848 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java @@ -31,8 +31,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; - +import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.Message; import java.io.PrintWriter; @@ -2549,13 +2550,16 @@ public interface Service { private static final FieldDescriptor SERVER_ADDRESS_DESCRIPTOR = Responses.RpcMetadata .getDescriptor().findFieldByNumber(Responses.RpcMetadata.SERVER_ADDRESS_FIELD_NUMBER); public final String serverAddress; + private final ByteString serverAddressAsBytes; public RpcMetadataResponse() { this.serverAddress = null; + this.serverAddressAsBytes = null; } public RpcMetadataResponse(@JsonProperty("serverAddress") String serverAddress) { this.serverAddress = serverAddress; + this.serverAddressAsBytes = HBaseZeroCopyByteString.wrap(serverAddress.getBytes()); } @Override RpcMetadataResponse deserialize(Message genericMsg) { @@ -2566,7 +2570,7 @@ public interface Service { } @Override Responses.RpcMetadata serialize() { - return Responses.RpcMetadata.newBuilder().setServerAddress(serverAddress).build(); + return Responses.RpcMetadata.newBuilder().setServerAddressBytes(serverAddressAsBytes).build(); } static RpcMetadataResponse fromProto(Responses.RpcMetadata msg) { http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java index 5c80816..d96293b 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java @@ -23,6 +23,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.protobuf.HBaseZeroCopyByteString; import java.math.BigDecimal; import java.math.BigInteger; @@ -345,7 +346,7 @@ public class TypedValue { break; case BYTE_STRING: case STRING: - builder.setStringValue((String) value); + builder.setStringValueBytes(HBaseZeroCopyByteString.wrap(((String) value).getBytes())); break; case PRIMITIVE_CHAR: case CHARACTER: http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java b/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java new file mode 100644 index 0000000..8daee60 --- /dev/null +++ b/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.util; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A utility class for reading and writing bytes to byte buffers without synchronization. A + * reduced variant taken from Apache Accumulo. This class is <b>not</b> thread-safe by design. + * It is up to the caller to guarantee mutual exclusion as necessary. + */ +public class UnsynchronizedBuffer extends OutputStream { + // Anything larger than 64K, reap the backing buffer + private static final int LARGE_BUFFER_SIZE = 1024 * 64; + + final int initialCapacity; + int offset = 0; + byte[] data; + + /** + * Creates a new writer. + */ + public UnsynchronizedBuffer() { + this(4096); + } + + /** + * Creates a new writer. + * + * @param initialCapacity initial byte capacity + */ + public UnsynchronizedBuffer(int initialCapacity) { + this.initialCapacity = initialCapacity; + data = new byte[initialCapacity]; + } + + private void reserve(int l) { + if (offset + l > data.length) { + int newSize = UnsynchronizedBuffer.nextArraySize(offset + l); + + byte[] newData = new byte[newSize]; + System.arraycopy(data, 0, newData, 0, offset); + data = newData; + } + + } + + /** + * Adds bytes to this writer's buffer. + * + * @param bytes byte array + * @param off offset into array to start copying bytes + * @param length number of bytes to add + * @throws IndexOutOfBoundsException if off or length are invalid + */ + public void write(byte[] bytes, int off, int length) { + reserve(length); + System.arraycopy(bytes, off, data, offset, length); + offset += length; + } + + @Override public void write(int b) throws IOException { + reserve(1); + data[offset] = (byte) b; + offset++; + } + + /** + * Gets (a copy of) the contents of this writer's buffer. + * + * @return byte buffer contents + */ + public byte[] toArray() { + byte[] ret = new byte[offset]; + System.arraycopy(data, 0, ret, 0, offset); + return ret; + } + + /** + * Resets the internal pointer into the buffer. + */ + public void reset() { + offset = 0; + if (data.length >= LARGE_BUFFER_SIZE) { + data = new byte[this.initialCapacity]; + } + } + + /** + * @return The current offset into the backing array. + */ + public int getOffset() { + return offset; + } + + /** + * @return The current length of the backing array. + */ + public long getSize() { + return data.length; + } + + /** + * Determines what next array size should be by rounding up to next power of two. + * + * @param i current array size + * @return next array size + * @throws IllegalArgumentException if i is negative + */ + public static int nextArraySize(int i) { + if (i < 0) { + throw new IllegalArgumentException(); + } + + if (i > (1 << 30)) { + return Integer.MAX_VALUE; // this is the next power of 2 minus one... a special case + } + + if (i == 0) { + return 1; + } + + // round up to next power of two + int ret = i; + ret--; + ret |= ret >> 1; + ret |= ret >> 2; + ret |= ret >> 4; + ret |= ret >> 8; + ret |= ret >> 16; + ret++; + + return ret; + } +} + +// End UnsynchronizedBuffer.java http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java new file mode 100644 index 0000000..cd8a329 --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ColumnMetaData.Rep; +import org.apache.calcite.avatica.Meta.Signature; +import org.apache.calcite.avatica.Meta.StatementHandle; +import org.apache.calcite.avatica.proto.Common.WireMessage; +import org.apache.calcite.avatica.proto.Requests; +import org.apache.calcite.avatica.remote.Service.Request; + +import com.google.protobuf.HBaseZeroCopyByteString; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.List; + +/** + * Protobuf serialization tests. + */ +public class ProtobufSerializationTest { + + private Signature getSignature() { + return null; + } + + private List<TypedValue> getTypedValues() { + List<TypedValue> paramValues = + Arrays.asList(TypedValue.create(Rep.BOOLEAN.name(), Boolean.TRUE), + TypedValue.create(Rep.STRING.name(), "string")); + return paramValues; + } + + @Test public void testExecuteSerialization() throws Exception { + Service.ExecuteRequest executeRequest = new Service.ExecuteRequest( + new StatementHandle("connection", 12345, getSignature()), getTypedValues(), 0); + + Requests.ExecuteRequest pbExecuteRequest = executeRequest.serialize(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + pbExecuteRequest.writeTo(baos); + + byte[] serialized = baos.toByteArray(); + baos.reset(); + WireMessage wireMsg = WireMessage.newBuilder().setName(Requests.ExecuteRequest.class.getName()) + .setWrappedMessage(HBaseZeroCopyByteString.wrap(serialized)).build(); + wireMsg.writeTo(baos); + serialized = baos.toByteArray(); + + ProtobufTranslation translator = new ProtobufTranslationImpl(); + + Request newRequest = translator.parseRequest(serialized); + + Assert.assertEquals(executeRequest, newRequest); + } + +} + +// End ProtobufSerializationTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java b/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java new file mode 100644 index 0000000..a448d3e --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the UnsynchronizedBuffer. + */ +public class UnsynchronizedBufferTest { + + @Test public void testArrayResizing() { + int size = 64; + int expected = 128; + for (int i = 0; i < 10; i++) { + // We keep being one byte short to contain this message + int next = UnsynchronizedBuffer.nextArraySize(size + 1); + assertEquals(expected, next); + size = next; + expected *= 2; + } + } +} + +// End UnsynchronizedBufferTest.java
