This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new e3e498b97b Use ConfigurableTByteBuffer instead of TByteBuffer in
ThriftCommonsSerDeUtils
e3e498b97b is described below
commit e3e498b97bbbf42c7fa41728987e7d7692f71af8
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Mar 2 11:26:58 2023 +0800
Use ConfigurableTByteBuffer instead of TByteBuffer in
ThriftCommonsSerDeUtils
---
.../commons/utils/ThriftCommonsSerDeUtils.java | 13 ++-
.../apache/iotdb/rpc/ConfigurableTByteBuffer.java | 100 +++++++++++++++++++++
2 files changed, 110 insertions(+), 3 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
index d615bea6ad..a82fb7c6cb 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
@@ -29,10 +29,10 @@ import
org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.rpc.ConfigurableTByteBuffer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -40,6 +40,8 @@ import org.apache.thrift.transport.TTransportException;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
+import static org.apache.iotdb.rpc.TConfigurationConst.defaultTConfiguration;
+
/** Utils for serialize and deserialize all the data struct defined by
thrift-commons */
public class ThriftCommonsSerDeUtils {
@@ -55,13 +57,13 @@ public class ThriftCommonsSerDeUtils {
private static TBinaryProtocol generateWriteProtocol(ByteBuffer buffer)
throws TTransportException {
- TTransport transport = new TByteBuffer(buffer);
+ TTransport transport = generateTByteBuffer(buffer);
return new TBinaryProtocol(transport);
}
private static TBinaryProtocol generateReadProtocol(ByteBuffer buffer)
throws TTransportException {
- TTransport transport = new TByteBuffer(buffer);
+ TTransport transport = generateTByteBuffer(buffer);
return new TBinaryProtocol(transport);
}
@@ -278,4 +280,9 @@ public class ThriftCommonsSerDeUtils {
}
return schemaNode;
}
+
+ private static ConfigurableTByteBuffer generateTByteBuffer(ByteBuffer buffer)
+ throws TTransportException {
+ return new ConfigurableTByteBuffer(buffer, defaultTConfiguration);
+ }
}
diff --git
a/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java
new file mode 100644
index 0000000000..b1aa66f150
--- /dev/null
+++
b/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TEndpointTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Source code of this class is a copy of {@link
org.apache.thrift.transport.TByteBuffer}. However,
+ * TByteBuffer is a final class and has only one construction method which
uses the default
+ * TConfiguration. In some cases, the capacity of our ByteBuffer could be so
large that it reaches
+ * the MaxMessageSize of thrift. We need to customize the TConfiguration and
that is why we use this
+ * class.
+ */
+public class ConfigurableTByteBuffer extends TEndpointTransport {
+ private final ByteBuffer byteBuffer;
+
+ public ConfigurableTByteBuffer(ByteBuffer byteBuffer) throws
TTransportException {
+ this(byteBuffer, new TConfiguration());
+ }
+
+ public ConfigurableTByteBuffer(ByteBuffer byteBuffer, TConfiguration
configuration)
+ throws TTransportException {
+ super(configuration);
+ this.byteBuffer = byteBuffer;
+ this.updateKnownMessageSize(byteBuffer.capacity());
+ }
+
+ public boolean isOpen() {
+ return true;
+ }
+
+ public void open() {}
+
+ public void close() {}
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ this.checkReadBytesAvailable((long) len);
+ int n = Math.min(this.byteBuffer.remaining(), len);
+ if (n > 0) {
+ try {
+ this.byteBuffer.get(buf, off, n);
+ } catch (BufferUnderflowException e) {
+ throw new TTransportException("Unexpected end of input buffer", e);
+ }
+ }
+
+ return n;
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ try {
+ this.byteBuffer.put(buf, off, len);
+ } catch (BufferOverflowException e) {
+ throw new TTransportException("Not enough room in output buffer", e);
+ }
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return this.byteBuffer;
+ }
+
+ public ConfigurableTByteBuffer clear() {
+ this.byteBuffer.clear();
+ return this;
+ }
+
+ public ConfigurableTByteBuffer flip() {
+ this.byteBuffer.flip();
+ return this;
+ }
+
+ public byte[] toByteArray() {
+ byte[] data = new byte[this.byteBuffer.remaining()];
+ this.byteBuffer.slice().get(data);
+ return data;
+ }
+}