szetszwo commented on a change in pull request #2860: URL: https://github.com/apache/ozone/pull/2860#discussion_r758346764
########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/SmallFileDataStreamOutput.java ########## @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.security.token.Token; +import org.apache.ratis.client.api.DataStreamOutput; +import org.apache.ratis.io.StandardWriteOption; +import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RoutingTable; +import org.apache.ratis.protocol.exceptions.AlreadyClosedException; +import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * SmallFileDataStreamOutput, only used to write requests smaller than ChunkSize + * <p> + * TODO : currently not support multi-thread access. + */ +public class SmallFileDataStreamOutput implements ByteBufferStreamOutput { + + public static final Logger LOG = + LoggerFactory.getLogger(SmallFileDataStreamOutput.class); + + private final AtomicReference<BlockID> blockID; + + private final XceiverClientFactory xceiverClientFactory; + private XceiverClientRatis xceiverClient; + private final OzoneClientConfig config; + + private final OzoneManagerProtocol omClient; + + private final OpenKeySession openKeySession; + private OmKeyLocationInfo keyLocationInfo; + private final OmKeyArgs keyArgs; + + private DataStreamOutput dataStreamOutput; + + private boolean unsafeByteBufferConversion; + + private final ByteBuffer currentBuffer; + private long versionID; + private final Token<OzoneBlockTokenIdentifier> token; + + // error handler + private final ExcludeList excludeList; + private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap; + private int retryCount; + + public SmallFileDataStreamOutput( + OpenKeySession handler, + XceiverClientFactory xceiverClientManager, + OzoneManagerProtocol omClient, + OzoneClientConfig config, + boolean unsafeByteBufferConversion + ) throws IOException { + this.xceiverClientFactory = xceiverClientManager; + this.omClient = omClient; + this.config = config; + this.openKeySession = handler; + + this.keyLocationInfo = handler.getKeyInfo().getLatestVersionLocations() + .getLocationList(handler.getOpenVersion()).get(0); + this.blockID = new AtomicReference<>(keyLocationInfo.getBlockID()); + this.versionID = keyLocationInfo.getCreateVersion(); + + this.unsafeByteBufferConversion = unsafeByteBufferConversion; + + OmKeyInfo info = handler.getKeyInfo(); + this.currentBuffer = ByteBuffer.allocate((int) info.getDataSize()); + + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setReplicationConfig(info.getReplicationConfig()) + .setDataSize(info.getDataSize()) + .setIsMultipartKey(false).build(); + + this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( + config.getMaxRetryCount(), config.getRetryInterval()); + this.retryCount = 0; + + this.excludeList = new ExcludeList(); + + this.token = null; + } + + + @VisibleForTesting + public BlockID getBlockID() { + return blockID.get(); + } + + @VisibleForTesting + public OmKeyLocationInfo getKeyLocationInfo() { + return keyLocationInfo; + } + + private OmKeyLocationInfo allocateNewBlock() throws IOException { + if (!excludeList.isEmpty()) { + LOG.info("Allocating block with {}", excludeList); + } + OmKeyLocationInfo omKeyLocationInfo = + omClient.allocateBlock(keyArgs, openKeySession.getId(), excludeList); + + this.keyLocationInfo = omKeyLocationInfo; + this.blockID.set(keyLocationInfo.getBlockID()); + this.versionID = keyLocationInfo.getCreateVersion(); + + return omKeyLocationInfo; + } + + @Override + public void write(ByteBuffer bb) throws IOException { + if (bb == null) { + throw new NullPointerException(); + } + + byte b = bb.array()[0]; + currentBuffer.put(b); + } + + @Override + public void write(ByteBuffer bb, int off, int len) throws IOException { + if (bb == null) { + throw new NullPointerException(); + } + + byte[] b = bb.array(); Review comment: We cannot call array() since the ByteBuffer may not have an array. Also, using an array means there must be buffer copying inside the code. This is the reason that we use ByteBuffer but not byte[]. ########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/SmallFileDataStreamOutput.java ########## @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.security.token.Token; +import org.apache.ratis.client.api.DataStreamOutput; +import org.apache.ratis.io.StandardWriteOption; +import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RoutingTable; +import org.apache.ratis.protocol.exceptions.AlreadyClosedException; +import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * SmallFileDataStreamOutput, only used to write requests smaller than ChunkSize + * <p> + * TODO : currently not support multi-thread access. + */ +public class SmallFileDataStreamOutput implements ByteBufferStreamOutput { + + public static final Logger LOG = + LoggerFactory.getLogger(SmallFileDataStreamOutput.class); + + private final AtomicReference<BlockID> blockID; + + private final XceiverClientFactory xceiverClientFactory; + private XceiverClientRatis xceiverClient; + private final OzoneClientConfig config; + + private final OzoneManagerProtocol omClient; + + private final OpenKeySession openKeySession; + private OmKeyLocationInfo keyLocationInfo; + private final OmKeyArgs keyArgs; + + private DataStreamOutput dataStreamOutput; + + private boolean unsafeByteBufferConversion; + + private final ByteBuffer currentBuffer; + private long versionID; + private final Token<OzoneBlockTokenIdentifier> token; + + // error handler + private final ExcludeList excludeList; + private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap; + private int retryCount; + + public SmallFileDataStreamOutput( + OpenKeySession handler, + XceiverClientFactory xceiverClientManager, + OzoneManagerProtocol omClient, + OzoneClientConfig config, + boolean unsafeByteBufferConversion + ) throws IOException { + this.xceiverClientFactory = xceiverClientManager; + this.omClient = omClient; + this.config = config; + this.openKeySession = handler; + + this.keyLocationInfo = handler.getKeyInfo().getLatestVersionLocations() + .getLocationList(handler.getOpenVersion()).get(0); + this.blockID = new AtomicReference<>(keyLocationInfo.getBlockID()); + this.versionID = keyLocationInfo.getCreateVersion(); + + this.unsafeByteBufferConversion = unsafeByteBufferConversion; + + OmKeyInfo info = handler.getKeyInfo(); + this.currentBuffer = ByteBuffer.allocate((int) info.getDataSize()); + + this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) + .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) + .setReplicationConfig(info.getReplicationConfig()) + .setDataSize(info.getDataSize()) + .setIsMultipartKey(false).build(); + + this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( + config.getMaxRetryCount(), config.getRetryInterval()); + this.retryCount = 0; + + this.excludeList = new ExcludeList(); + + this.token = null; + } + + + @VisibleForTesting + public BlockID getBlockID() { + return blockID.get(); + } + + @VisibleForTesting + public OmKeyLocationInfo getKeyLocationInfo() { + return keyLocationInfo; + } + + private OmKeyLocationInfo allocateNewBlock() throws IOException { + if (!excludeList.isEmpty()) { + LOG.info("Allocating block with {}", excludeList); + } + OmKeyLocationInfo omKeyLocationInfo = + omClient.allocateBlock(keyArgs, openKeySession.getId(), excludeList); + + this.keyLocationInfo = omKeyLocationInfo; + this.blockID.set(keyLocationInfo.getBlockID()); + this.versionID = keyLocationInfo.getCreateVersion(); + + return omKeyLocationInfo; + } + + @Override + public void write(ByteBuffer bb) throws IOException { + if (bb == null) { + throw new NullPointerException(); + } + + byte b = bb.array()[0]; + currentBuffer.put(b); + } + + @Override + public void write(ByteBuffer bb, int off, int len) throws IOException { + if (bb == null) { + throw new NullPointerException(); + } + + byte[] b = bb.array(); + + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return; + } + + if (len > 0) { + currentBuffer.put(b, off, len); + } + } + + @Override + public void flush() throws IOException { + + } + + @Override + public void close() throws IOException { + while (true) { + try { + checkOpen(); + DataStreamOutput out = maybeInitStream(); + int i = handleWrite(out); + if (LOG.isDebugEnabled()) { + LOG.debug("write small file success, size: {}", i); + } + } catch (IOException ee) { + handleException(ee); + continue; + } finally { + if (dataStreamOutput != null) { + try { + dataStreamOutput.close(); + } catch (Exception e) { + LOG.warn("close DataStreamOutput error:", e); + throw new IOException("close DataStreamOutput error:", e); + } + } + } + return; + } + } + + private int handleWrite(DataStreamOutput out) throws IOException { + if (out != null) { + int size = currentBuffer.position(); + byte[] b = new byte[size]; + System.arraycopy(currentBuffer.array(), 0, b, 0, size); + + ContainerProtos.ContainerCommandRequestProto putSmallFileRequest = + getPutSmallFileRequest(b); + putSmallFileToContainer(putSmallFileRequest, out); + + keyArgs.setDataSize(size); + keyLocationInfo.setLength(size); + + Map<String, String> metadata = keyArgs.getMetadata(); + keyArgs.setMetadata(metadata); + + keyArgs.setLocationInfoList(Collections.singletonList(keyLocationInfo)); + omClient.commitKey(keyArgs, openKeySession.getId()); + return b.length; + } else if (currentBuffer != null && currentBuffer.position() == 0) { + keyArgs.setDataSize(0); + keyLocationInfo.setLength(0); + + Map<String, String> metadata = keyArgs.getMetadata(); + keyArgs.setMetadata(metadata); + + keyArgs.setLocationInfoList(Collections.emptyList()); + + omClient.commitKey(keyArgs, openKeySession.getId()); + return 0; + } + return 0; + } + + private void setExceptionAndThrow(IOException ioe) throws IOException { + throw ioe; + } + + /** + * It performs following actions : + * a. Updates the committed length at datanode for the current stream in + * datanode. + * b. Reads the data from the underlying buffer and writes it the next stream. + * + * @param exception actual exception that occurred + * @throws IOException Throws IOException if Write fails + */ + private void handleException(IOException exception) throws IOException { + Throwable t = HddsClientUtils.checkForException(exception); + Preconditions.checkNotNull(t); + boolean retryFailure = checkForRetryFailure(t); + boolean containerExclusionException = false; + if (!retryFailure) { + containerExclusionException = checkIfContainerToExclude(t); + } + + long totalSuccessfulFlushedData = 0L; + long bufferedDataLen = currentBuffer.position(); + + if (containerExclusionException) { + LOG.debug( + "Encountered exception {}. The last committed block length is {}, " + + "uncommitted data length is {} retry count {}", exception, + totalSuccessfulFlushedData, bufferedDataLen, retryCount); + excludeList + .addConatinerId(ContainerID.valueOf(blockID.get().getContainerID())); + } else if (xceiverClient != null) { + LOG.warn( + "Encountered exception {} on the pipeline {}. " + + "The last committed block length is {}, " + + "uncommitted data length is {} retry count {}", exception, + xceiverClient.getPipeline(), totalSuccessfulFlushedData, + bufferedDataLen, retryCount); + excludeList.addPipeline(xceiverClient.getPipeline().getId()); + } + allocateNewBlock(); + + // just clean up the current stream. + cleanup(retryFailure); + + if (bufferedDataLen > 0) { + // If the data is still cached in the underlying stream, we need to + // allocate new block and write this data in the datanode. + handleRetry(exception); + // reset the retryCount after handling the exception + // retryCount = 0; + } + } + + private void handleRetry(IOException exception) throws IOException { + RetryPolicy retryPolicy = retryPolicyMap + .get(HddsClientUtils.checkForException(exception).getClass()); + if (retryPolicy == null) { + retryPolicy = retryPolicyMap.get(Exception.class); + } + RetryPolicy.RetryAction action = null; + try { + action = retryPolicy.shouldRetry(exception, retryCount, 0, true); + } catch (Exception e) { + setExceptionAndThrow(new IOException(e)); + } + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + String msg = ""; + if (action.reason != null) { + msg = "Retry request failed. " + action.reason; + LOG.error(msg, exception); + } + setExceptionAndThrow(new IOException(msg, exception)); + } + + // Throw the exception if the thread is interrupted + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Interrupted while trying for retry"); + setExceptionAndThrow(exception); + } + Preconditions.checkArgument( + action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); + if (action.delayMillis > 0) { + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + IOException ioe = (IOException) new InterruptedIOException( + "Interrupted: action=" + action + ", retry policy=" + retryPolicy) + .initCause(e); + setExceptionAndThrow(ioe); + } + } + retryCount++; + if (LOG.isTraceEnabled()) { + LOG.trace("Retrying Write request. Already tried {} time(s); " + + "retry policy is {} ", retryCount, retryPolicy); + } + //handleWrite(); + } + + /** + * Checks if the provided exception signifies retry failure in ratis client. + * In case of retry failure, ratis client throws RaftRetryFailureException + * and all succeeding operations are failed with AlreadyClosedException. + */ + private boolean checkForRetryFailure(Throwable t) { + return t instanceof RaftRetryFailureException + || t instanceof AlreadyClosedException; + } + + // Every container specific exception from datatnode will be seen as + // StorageContainerException + private boolean checkIfContainerToExclude(Throwable t) { + return t instanceof StorageContainerException; + } + + private void cleanup(boolean invalidateClient) throws IOException { + if (xceiverClientFactory != null) { + xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); + } + if (dataStreamOutput != null) { + try { + dataStreamOutput.close(); + } catch (Exception e) { + throw new IOException("close dataStreamOutput error", e); + } + } + dataStreamOutput = null; + xceiverClient = null; + } + + private ContainerProtos.ContainerCommandRequestProto getPutSmallFileRequest( + byte[] b) throws IOException { + + ContainerProtos.BlockData containerBlockData = + ContainerProtos.BlockData.newBuilder() + .setBlockID(blockID.get().getDatanodeBlockIDProtobuf()) + .build(); + ContainerProtos.PutBlockRequestProto.Builder createBlockRequest = + ContainerProtos.PutBlockRequestProto.newBuilder() + .setBlockData(containerBlockData); + + int size = b.length; + + ByteString bytes = ByteString.copyFrom(b); + + Checksum checksum = + new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + final ChecksumData checksumData = checksum.computeChecksum(b); + + ContainerProtos.ChunkInfo chunk = + ContainerProtos.ChunkInfo.newBuilder() + .setChunkName(blockID.get().getLocalID() + "_chunk_0") + .setOffset(0) + .setLen(size) + .setChecksumData(checksumData.getProtoBufMessage()) + .build(); + + ContainerProtos.PutSmallFileRequestProto putSmallFileRequest = + ContainerProtos.PutSmallFileRequestProto.newBuilder() + .setChunkInfo(chunk) + .setBlock(createBlockRequest) + .setData(bytes) Review comment: Do not set the data inside the proto. We should send only the header in the proto and then send the raw data to the stream. ########## File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/SmallFileStreamDataChannel.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + + +/** + * This class is used to get the DataChannel for streaming. + */ +class SmallFileStreamDataChannel extends StreamDataChannelBase { + public static final Logger LOG = + LoggerFactory.getLogger(SmallFileStreamDataChannel.class); + + private final Container kvContainer; + private final BlockManager blockManager; + private BlockData blockData; + + private int len = 0; + private final List<ByteBuffer> bufferList = new ArrayList<>(); + + SmallFileStreamDataChannel(File file, Container container, + BlockManager blockManager, + ContainerMetrics metrics) + throws StorageContainerException { + super(file, container.getContainerData(), metrics); + this.blockManager = blockManager; + this.kvContainer = container; + } + + @Override + ContainerProtos.Type getType() { + return ContainerProtos.Type.PutSmallFile; + } + + @Override + public int write(ByteBuffer src) throws IOException { + int srcLen = src.capacity(); + len += srcLen; + bufferList.add(src); + return srcLen; + } + + private ByteString asByteString() { + ByteBuffer buffer = ByteBuffer.allocate(len); + for (ByteBuffer byteBuffer : bufferList) { + buffer.put(byteBuffer); + } + buffer.flip(); + return ByteString.copyFrom(buffer); + } Review comment: This method copies buffer twice so that it is not zero buffer copying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
