This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a5461af5bd  Use ConfigurableTByteBuffer instead of TByteBuffer in 
ThriftCommonsSerDeUtils
a5461af5bd is described below

commit a5461af5bd2532f8f6f85bbddaf286ac7a88dd51
Author: Liao Lanyu <1435078...@qq.com>
AuthorDate: Thu Mar 2 08:55:05 2023 +0800

     Use ConfigurableTByteBuffer instead of TByteBuffer in 
ThriftCommonsSerDeUtils
---
 .../commons/utils/ThriftCommonsSerDeUtils.java     |  13 ++-
 .../apache/iotdb/rpc/ConfigurableTByteBuffer.java  | 100 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 3 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
index d615bea6ad..a82fb7c6cb 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
@@ -29,10 +29,10 @@ import 
org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.rpc.ConfigurableTByteBuffer;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TByteBuffer;
 import org.apache.thrift.transport.TIOStreamTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -40,6 +40,8 @@ import org.apache.thrift.transport.TTransportException;
 import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 
+import static org.apache.iotdb.rpc.TConfigurationConst.defaultTConfiguration;
+
 /** Utils for serialize and deserialize all the data struct defined by 
thrift-commons */
 public class ThriftCommonsSerDeUtils {
 
@@ -55,13 +57,13 @@ public class ThriftCommonsSerDeUtils {
 
   private static TBinaryProtocol generateWriteProtocol(ByteBuffer buffer)
       throws TTransportException {
-    TTransport transport = new TByteBuffer(buffer);
+    TTransport transport = generateTByteBuffer(buffer);
     return new TBinaryProtocol(transport);
   }
 
   private static TBinaryProtocol generateReadProtocol(ByteBuffer buffer)
       throws TTransportException {
-    TTransport transport = new TByteBuffer(buffer);
+    TTransport transport = generateTByteBuffer(buffer);
     return new TBinaryProtocol(transport);
   }
 
@@ -278,4 +280,9 @@ public class ThriftCommonsSerDeUtils {
     }
     return schemaNode;
   }
+
+  private static ConfigurableTByteBuffer generateTByteBuffer(ByteBuffer buffer)
+      throws TTransportException {
+    return new ConfigurableTByteBuffer(buffer, defaultTConfiguration);
+  }
 }
diff --git 
a/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java
new file mode 100644
index 0000000000..b1aa66f150
--- /dev/null
+++ 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigurableTByteBuffer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TEndpointTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Source code of this class is a copy of {@link 
org.apache.thrift.transport.TByteBuffer}. However,
+ * TByteBuffer is a final class and has only one construction method which 
uses the default
+ * TConfiguration. In some cases, the capacity of our ByteBuffer could be so 
large that it reaches
+ * the MaxMessageSize of thrift. We need to customize the TConfiguration and 
that is why we use this
+ * class.
+ */
+public class ConfigurableTByteBuffer extends TEndpointTransport {
+  private final ByteBuffer byteBuffer;
+
+  public ConfigurableTByteBuffer(ByteBuffer byteBuffer) throws 
TTransportException {
+    this(byteBuffer, new TConfiguration());
+  }
+
+  public ConfigurableTByteBuffer(ByteBuffer byteBuffer, TConfiguration 
configuration)
+      throws TTransportException {
+    super(configuration);
+    this.byteBuffer = byteBuffer;
+    this.updateKnownMessageSize(byteBuffer.capacity());
+  }
+
+  public boolean isOpen() {
+    return true;
+  }
+
+  public void open() {}
+
+  public void close() {}
+
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    this.checkReadBytesAvailable((long) len);
+    int n = Math.min(this.byteBuffer.remaining(), len);
+    if (n > 0) {
+      try {
+        this.byteBuffer.get(buf, off, n);
+      } catch (BufferUnderflowException e) {
+        throw new TTransportException("Unexpected end of input buffer", e);
+      }
+    }
+
+    return n;
+  }
+
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    try {
+      this.byteBuffer.put(buf, off, len);
+    } catch (BufferOverflowException e) {
+      throw new TTransportException("Not enough room in output buffer", e);
+    }
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return this.byteBuffer;
+  }
+
+  public ConfigurableTByteBuffer clear() {
+    this.byteBuffer.clear();
+    return this;
+  }
+
+  public ConfigurableTByteBuffer flip() {
+    this.byteBuffer.flip();
+    return this;
+  }
+
+  public byte[] toByteArray() {
+    byte[] data = new byte[this.byteBuffer.remaining()];
+    this.byteBuffer.slice().get(data);
+    return data;
+  }
+}

Reply via email to