This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push: new a5461af5bd Use ConfigurableTByteBuffer instead of TByteBuffer in ThriftCommonsSerDeUtils a5461af5bd is described below commit a5461af5bd2532f8f6f85bbddaf286ac7a88dd51 Author: Liao Lanyu <1435078...@qq.com> AuthorDate: Thu Mar 2 08:55:05 2023 +0800 Use ConfigurableTByteBuffer instead of TByteBuffer in ThriftCommonsSerDeUtils --- .../commons/utils/ThriftCommonsSerDeUtils.java | 13 ++- .../apache/iotdb/rpc/ConfigurableTByteBuffer.java | 100 +++++++++++++++++++++ 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java index d615bea6ad..a82fb7c6cb 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java @@ -29,10 +29,10 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.rpc.ConfigurableTByteBuffer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TByteBuffer; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -40,6 +40,8 @@ import org.apache.thrift.transport.TTransportException; import java.io.DataOutputStream; import java.nio.ByteBuffer; +import static org.apache.iotdb.rpc.TConfigurationConst.defaultTConfiguration; + /** Utils for serialize and deserialize all the data struct defined by thrift-commons */ public class ThriftCommonsSerDeUtils { @@ -55,13 +57,13 @@ public class ThriftCommonsSerDeUtils { private static TBinaryProtocol generateWriteProtocol(ByteBuffer buffer) throws TTransportException { - TTransport transport = new TByteBuffer(buffer); + TTransport transport = generateTByteBuffer(buffer); return new TBinaryProtocol(transport); } private static TBinaryProtocol generateReadProtocol(ByteBuffer buffer) throws TTransportException { - TTransport transport = new TByteBuffer(buffer); + TTransport transport = generateTByteBuffer(buffer); return new TBinaryProtocol(transport); } @@ -278,4 +280,9 @@ public class ThriftCommonsSerDeUtils { } return schemaNode; } + + private static ConfigurableTByteBuffer generateTByteBuffer(ByteBuffer buffer) + throws TTransportException { + return new ConfigurableTByteBuffer(buffer, defaultTConfiguration); + } } diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java new file mode 100644 index 0000000000..b1aa66f150 --- /dev/null +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TEndpointTransport; +import org.apache.thrift.transport.TTransportException; + +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Source code of this class is a copy of {@link org.apache.thrift.transport.TByteBuffer}. However, + * TByteBuffer is a final class and has only one construction method which uses the default + * TConfiguration. In some cases, the capacity of our ByteBuffer could be so large that it reaches + * the MaxMessageSize of thrift. We need to customize the TConfiguration and that is why we use this + * class. + */ +public class ConfigurableTByteBuffer extends TEndpointTransport { + private final ByteBuffer byteBuffer; + + public ConfigurableTByteBuffer(ByteBuffer byteBuffer) throws TTransportException { + this(byteBuffer, new TConfiguration()); + } + + public ConfigurableTByteBuffer(ByteBuffer byteBuffer, TConfiguration configuration) + throws TTransportException { + super(configuration); + this.byteBuffer = byteBuffer; + this.updateKnownMessageSize(byteBuffer.capacity()); + } + + public boolean isOpen() { + return true; + } + + public void open() {} + + public void close() {} + + public int read(byte[] buf, int off, int len) throws TTransportException { + this.checkReadBytesAvailable((long) len); + int n = Math.min(this.byteBuffer.remaining(), len); + if (n > 0) { + try { + this.byteBuffer.get(buf, off, n); + } catch (BufferUnderflowException e) { + throw new TTransportException("Unexpected end of input buffer", e); + } + } + + return n; + } + + public void write(byte[] buf, int off, int len) throws TTransportException { + try { + this.byteBuffer.put(buf, off, len); + } catch (BufferOverflowException e) { + throw new TTransportException("Not enough room in output buffer", e); + } + } + + public ByteBuffer getByteBuffer() { + return this.byteBuffer; + } + + public ConfigurableTByteBuffer clear() { + this.byteBuffer.clear(); + return this; + } + + public ConfigurableTByteBuffer flip() { + this.byteBuffer.flip(); + return this; + } + + public byte[] toByteArray() { + byte[] data = new byte[this.byteBuffer.remaining()]; + this.byteBuffer.slice().get(data); + return data; + } +}