This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch make-sure-no-frame-size-problem in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c88e1571f2ad681b7b33a2448ddf9f0c207b864f Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Aug 23 20:52:09 2024 +0800 1st commit --- .../pipe/connector/client/IoTDBSyncClient.java | 65 +++++++++ .../payload/thrift/request/PipeRequestType.java | 3 + .../thrift/request/PipeTransferSliceReq.java | 159 +++++++++++++++++++++ 3 files changed, 227 insertions(+) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java index f15934afd5e..70a6f16a9d9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java @@ -22,16 +22,28 @@ package org.apache.iotdb.commons.pipe.connector.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.TimeoutChangeableTransport; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; +import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IoTDBSyncClient extends IClientRPCService.Client implements ThriftClient, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncClient.class); + private final String ipAddress; private final int port; private final TEndPoint endPoint; @@ -82,6 +94,59 @@ public class IoTDBSyncClient extends IClientRPCService.Client ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); } + @Override + public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TException { + final int bodySizeLimit = (int) (RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY * 0.8); + + if (req.getVersion() != IoTDBConnectorRequestVersion.VERSION_1.getVersion() + || req.body.limit() < bodySizeLimit) { + return super.pipeTransfer(req); + } + + try { + // Slice the buffer to avoid the buffer being too large + final int sliceCount = + req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1); + for (int i = 0; i < sliceCount; ++i) { + final int startIndexInBody = i * bodySizeLimit; + final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, req.body.limit()); + final TPipeTransferResp sliceResp = + super.pipeTransfer( + PipeTransferSliceReq.toTPipeTransferReq( + i, sliceCount, req.body.duplicate(), startIndexInBody, endIndexInBody)); + + if (i == sliceCount - 1) { + return sliceResp; + } + + if (sliceResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn( + "Failed to transfer slice. Origin req: {}-{}, slice index: {}, slice count: {}. Reason: {}. Retry the whole transfer.", + req.getVersion(), + req.getType(), + i, + sliceCount, + sliceResp.getStatus()); + throw new PipeConnectionException( + String.format( + "Failed to transfer slice. Origin req: %s-%s, slice index: %d, slice count: %d. Reason: %s", + req.getVersion(), req.getType(), i, sliceCount, sliceResp.getStatus())); + } + } + + // Should not reach here + return super.pipeTransfer(req); + } catch (final Exception e) { + LOGGER.warn( + "Failed to transfer slice. Origin req: {}-{}. Retry the whole transfer.", + req.getVersion(), + req.getType(), + e); + // Fall back to the original behavior + return super.pipeTransfer(req); + } + } + @Override public void close() throws Exception { invalidate(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java index 003c8b9afb3..bccf6787f4a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java @@ -53,6 +53,9 @@ public enum PipeRequestType { // RPC Compression TRANSFER_COMPRESSED((short) 300), + + // Fallback Handling + FALLBACK_SLICE((short) 400), ; private final short type; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java new file mode 100644 index 00000000000..ee5662f6170 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; + +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; + +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; + +public class PipeTransferSliceReq extends TPipeTransferReq { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferSliceReq.class); + + private transient int originBodySize; + + private transient int sliceSize; + private transient byte[] sliceBody; + + private transient int sliceIndex; + private transient int sliceCount; + + public int getOriginBodySize() { + return originBodySize; + } + + public int getSliceSize() { + return sliceSize; + } + + public byte[] getSliceBody() { + return sliceBody; + } + + public int getSliceIndex() { + return sliceIndex; + } + + public int getSliceCount() { + return sliceCount; + } + + /////////////////////////////// Thrift /////////////////////////////// + + public static PipeTransferSliceReq toTPipeTransferReq( + final int sliceIndex, + final int sliceCount, + final ByteBuffer duplicatedOriginBody, + final int startIndexInBody, + final int endIndexInBody) + throws IOException { + final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq(); + + sliceReq.originBodySize = duplicatedOriginBody.limit(); + + sliceReq.sliceSize = endIndexInBody - startIndexInBody; + sliceReq.sliceBody = new byte[sliceReq.sliceSize]; + duplicatedOriginBody.position(startIndexInBody); + duplicatedOriginBody.get(sliceReq.sliceBody); + + sliceReq.sliceIndex = sliceIndex; + sliceReq.sliceCount = sliceCount; + + sliceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion(); + sliceReq.type = PipeRequestType.FALLBACK_SLICE.getType(); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream); + + ReadWriteIOUtils.write(sliceReq.sliceSize, outputStream); + ReadWriteIOUtils.write(new Binary(sliceReq.sliceBody), outputStream); + + ReadWriteIOUtils.write(sliceReq.sliceIndex, outputStream); + ReadWriteIOUtils.write(sliceReq.sliceCount, outputStream); + + sliceReq.body = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + + return sliceReq; + } + + public static PipeTransferSliceReq fromTPipeTransferReq(final TPipeTransferReq transferReq) { + final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq(); + + sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body); + + sliceReq.sliceSize = ReadWriteIOUtils.readInt(transferReq.body); + sliceReq.sliceBody = ReadWriteIOUtils.readBinary(transferReq.body).getValues(); + + sliceReq.sliceIndex = ReadWriteIOUtils.readInt(transferReq.body); + sliceReq.sliceCount = ReadWriteIOUtils.readInt(transferReq.body); + + sliceReq.version = transferReq.version; + sliceReq.type = transferReq.type; + sliceReq.body = transferReq.body; + + return sliceReq; + } + + /////////////////////////////// Object /////////////////////////////// + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final PipeTransferSliceReq that = (PipeTransferSliceReq) obj; + return Objects.equals(originBodySize, that.originBodySize) + && Objects.equals(sliceSize, that.sliceSize) + && Arrays.equals(sliceBody, that.sliceBody) + && Objects.equals(sliceIndex, that.sliceIndex) + && Objects.equals(sliceCount, that.sliceCount) + && Objects.equals(version, that.version) + && Objects.equals(type, that.type) + && Objects.equals(body, that.body); + } + + @Override + public int hashCode() { + return Objects.hash( + originBodySize, + sliceSize, + Arrays.hashCode(sliceBody), + sliceIndex, + sliceCount, + version, + type, + body); + } +}
