This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 3daf3d7a8fbe08a6ffc095e53a874d4295380cf8 Author: Sadanand Shenoy <[email protected]> AuthorDate: Thu Oct 21 12:57:30 2021 +0530 HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701) --- .../hdds/scm/storage/BlockDataStreamOutput.java | 62 ++++++++++++--- .../hadoop/hdds/scm/storage/StreamBuffer.java | 46 +++++++++++ .../hdds/scm/storage/StreamCommitWatcher.java | 93 ++++++++++++++++++---- .../client/io/BlockDataStreamOutputEntry.java | 33 +++++++- .../client/io/BlockDataStreamOutputEntryPool.java | 14 +++- .../ozone/client/io/KeyDataStreamOutput.java | 12 ++- .../client/rpc/TestBlockDataStreamOutput.java | 30 +++++++ .../apache/hadoop/ozone/container/TestHelper.java | 20 +++++ 8 files changed, 279 insertions(+), 31 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 2ae0ba7525..aada48e2f5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -92,6 +92,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private int chunkIndex; private final AtomicLong chunkOffset = new AtomicLong(); + + // Similar to 'BufferPool' but this list maintains only references + // to the ByteBuffers. + private List<StreamBuffer> bufferList; + // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next // request will fail upfront. @@ -133,7 +138,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { XceiverClientFactory xceiverClientManager, Pipeline pipeline, OzoneClientConfig config, - Token<? extends TokenIdentifier> token + Token<? extends TokenIdentifier> token, + List<StreamBuffer> bufferList ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -148,7 +154,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // Alternatively, stream setup can be delayed till the first chunk write. this.out = setupStream(pipeline); this.token = token; - + this.bufferList = bufferList; flushPeriod = (int) (config.getStreamBufferFlushSize() / config .getStreamBufferSize()); @@ -159,7 +165,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitWatcher = new StreamCommitWatcher(xceiverClient); + commitWatcher = new StreamCommitWatcher(xceiverClient, bufferList); totalDataFlushedLength = 0; writtenDataLength = 0; failedServers = new ArrayList<>(0); @@ -251,8 +257,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { if (len == 0) { return; } - writeChunkToContainer( - (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len)); + + final StreamBuffer buf = new StreamBuffer(b, off, len); + bufferList.add(buf); + + writeChunkToContainer(buf.duplicate()); writtenDataLength += len; } @@ -261,6 +270,10 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { totalDataFlushedLength = writtenDataLength; } + @VisibleForTesting + public long getTotalDataFlushedLength() { + return totalDataFlushedLength; + } /** * Will be called on the retryPath in case closedContainerException/ * TimeoutException. @@ -268,8 +281,27 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { * @throws IOException if error occurred */ - // TODO: We need add new retry policy without depend on bufferPool. public void writeOnRetry(long len) throws IOException { + if (len == 0) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retrying write length {} for blockID {}", len, blockID); + } + int count = 0; + while (len > 0) { + final StreamBuffer buf = bufferList.get(count); + final long writeLen = Math.min(buf.length(), len); + final ByteBuffer duplicated = buf.duplicate(); + if (writeLen != buf.length()) { + duplicated.limit(Math.toIntExact(len)); + } + writeChunkToContainer(duplicated); + len -= writeLen; + count++; + writtenDataLength += writeLen; + } + } @@ -314,6 +346,14 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { boolean force) throws IOException { checkOpen(); long flushPos = totalDataFlushedLength; + final List<StreamBuffer> byteBufferList; + if (!force) { + Preconditions.checkNotNull(bufferList); + byteBufferList = bufferList; + Preconditions.checkNotNull(byteBufferList); + } else { + byteBufferList = null; + } flush(); if (close) { dataStreamCloseReply = out.closeAsync(); @@ -344,12 +384,12 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { if (LOG.isDebugEnabled()) { LOG.debug( "Adding index " + asyncReply.getLogIndex() + " commitMap size " - + commitWatcher.getCommitInfoSetSize() + " flushLength " + + commitWatcher.getCommitInfoMapSize() + " flushLength " + flushPos + " blockID " + blockID); } // for standalone protocol, logIndex will always be 0. - commitWatcher.updateCommitInfoSet( - asyncReply.getLogIndex()); + commitWatcher + .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); } return e; }, responseExecutor).exceptionally(e -> { @@ -589,4 +629,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { setIoException(ex); throw getIoException(); } + + public long getTotalAckDataLength() { + return commitWatcher.getTotalAckDataLength(); + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java new file mode 100644 index 0000000000..f36019e2ae --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import java.nio.ByteBuffer; + +/** + * Used for streaming write. + */ +public class StreamBuffer { + private final ByteBuffer buffer; + + public StreamBuffer(ByteBuffer buffer) { + this.buffer = buffer.asReadOnlyBuffer(); + } + + public StreamBuffer(ByteBuffer buffer, int offset, int length) { + this((ByteBuffer) buffer.asReadOnlyBuffer().position(offset) + .limit(offset + length)); + } + + public ByteBuffer duplicate() { + return buffer.duplicate(); + } + + public int length() { + return buffer.limit() - buffer.position(); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index c187ffe902..3a59d07571 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -24,6 +24,7 @@ */ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -31,13 +32,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Set; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** * This class executes watchForCommit on ratis pipeline and releases @@ -48,7 +52,12 @@ public class StreamCommitWatcher { private static final Logger LOG = LoggerFactory.getLogger(StreamCommitWatcher.class); - private Set<Long> commitIndexSet; + private Map<Long, List<StreamBuffer>> commitIndexMap; + private List<StreamBuffer> bufferList; + + // total data which has been successfully flushed and acknowledged + // by all servers + private long totalAckDataLength; // future Map to hold up all putBlock futures private ConcurrentHashMap<Long, @@ -57,18 +66,22 @@ public class StreamCommitWatcher { private XceiverClientSpi xceiverClient; - public StreamCommitWatcher(XceiverClientSpi xceiverClient) { + public StreamCommitWatcher(XceiverClientSpi xceiverClient, + List<StreamBuffer> bufferList) { this.xceiverClient = xceiverClient; - commitIndexSet = new ConcurrentSkipListSet(); + commitIndexMap = new ConcurrentSkipListMap<>(); futureMap = new ConcurrentHashMap<>(); + this.bufferList = bufferList; + totalAckDataLength = 0; } - public void updateCommitInfoSet(long index) { - commitIndexSet.add(index); + public void updateCommitInfoMap(long index, List<StreamBuffer> buffers) { + commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) + .addAll(buffers); } - int getCommitInfoSetSize() { - return commitIndexSet.size(); + int getCommitInfoMapSize() { + return commitIndexMap.size(); } /** @@ -78,12 +91,12 @@ public class StreamCommitWatcher { * @throws IOException in case watchForCommit fails */ public XceiverClientReply streamWatchOnFirstIndex() throws IOException { - if (!commitIndexSet.isEmpty()) { + if (!commitIndexMap.isEmpty()) { // wait for the first commit index in the commitIndex2flushedDataMap // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.stream().mapToLong(v -> v).min() + commitIndexMap.keySet().stream().mapToLong(v -> v).min() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for first index {} to catch up", index); @@ -102,12 +115,12 @@ public class StreamCommitWatcher { */ public XceiverClientReply streamWatchOnLastIndex() throws IOException { - if (!commitIndexSet.isEmpty()) { + if (!commitIndexMap.isEmpty()) { // wait for the commit index in the commitIndex2flushedDataMap // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.stream().mapToLong(v -> v).max() + commitIndexMap.keySet().stream().mapToLong(v -> v).max() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for last flush Index {} to catch up", index); @@ -127,9 +140,16 @@ public class StreamCommitWatcher { */ public XceiverClientReply streamWatchForCommit(long commitIndex) throws IOException { + final long index; try { XceiverClientReply reply = xceiverClient.watchForCommit(commitIndex); + if (reply == null) { + index = 0; + } else { + index = reply.getLogIndex(); + } + adjustBuffers(index); return reply; } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException @@ -140,11 +160,52 @@ public class StreamCommitWatcher { } } + void releaseBuffersOnException() { + adjustBuffers(xceiverClient.getReplicatedMinCommitIndex()); + } + + private void adjustBuffers(long commitIndex) { + List<Long> keyList = commitIndexMap.keySet().stream() + .filter(p -> p <= commitIndex).collect(Collectors.toList()); + if (!keyList.isEmpty()) { + releaseBuffers(keyList); + } + } + + private long releaseBuffers(List<Long> indexes) { + Preconditions.checkArgument(!commitIndexMap.isEmpty()); + for (long index : indexes) { + Preconditions.checkState(commitIndexMap.containsKey(index)); + final List<StreamBuffer> buffers = commitIndexMap.remove(index); + final long length = + buffers.stream().mapToLong(StreamBuffer::length).sum(); + totalAckDataLength += length; + // clear the future object from the future Map + final CompletableFuture<ContainerCommandResponseProto> remove = + futureMap.remove(totalAckDataLength); + if (remove == null) { + LOG.error("Couldn't find required future for " + totalAckDataLength); + for (Long key : futureMap.keySet()) { + LOG.error("Existing acknowledged data: " + key); + } + } + for (StreamBuffer byteBuffer : buffers) { + bufferList.remove(byteBuffer); + } + } + return totalAckDataLength; + } + + public long getTotalAckDataLength() { + return totalAckDataLength; + } + private IOException getIOExceptionForWatchForCommit(long commitIndex, Exception e) { LOG.warn("watchForCommit failed for index {}", commitIndex, e); IOException ioException = new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); + releaseBuffersOnException(); return ioException; } @@ -155,12 +216,12 @@ public class StreamCommitWatcher { } public void cleanup() { - if (commitIndexSet != null) { - commitIndexSet.clear(); + if (commitIndexMap != null) { + commitIndexMap.clear(); } if (futureMap != null) { futureMap.clear(); } - commitIndexSet = null; + commitIndexMap = null; } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index f0c3a43e89..2cd5630549 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; +import org.apache.hadoop.hdds.scm.storage.StreamBuffer; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -32,6 +33,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.List; /** * Helper class used inside {@link BlockDataStreamOutput}. @@ -50,6 +52,7 @@ public final class BlockDataStreamOutputEntry // the current position of this stream 0 <= currentPosition < length private long currentPosition; private final Token<OzoneBlockTokenIdentifier> token; + private List<StreamBuffer> bufferList; @SuppressWarnings({"parameternumber", "squid:S00107"}) private BlockDataStreamOutputEntry( @@ -58,7 +61,8 @@ public final class BlockDataStreamOutputEntry Pipeline pipeline, long length, Token<OzoneBlockTokenIdentifier> token, - OzoneClientConfig config + OzoneClientConfig config, + List<StreamBuffer> bufferList ) { this.config = config; this.byteBufferStreamOutput = null; @@ -69,6 +73,7 @@ public final class BlockDataStreamOutputEntry this.token = token; this.length = length; this.currentPosition = 0; + this.bufferList = bufferList; } long getLength() { @@ -92,8 +97,8 @@ public final class BlockDataStreamOutputEntry private void checkStream() throws IOException { if (this.byteBufferStreamOutput == null) { this.byteBufferStreamOutput = - new BlockDataStreamOutput(blockID, xceiverClientManager, - pipeline, config, token); + new BlockDataStreamOutput(blockID, xceiverClientManager, pipeline, + config, token, bufferList); } } @@ -151,6 +156,20 @@ public final class BlockDataStreamOutputEntry } } + long getTotalAckDataLength() { + if (byteBufferStreamOutput != null) { + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufferStreamOutput; + blockID = out.getBlockID(); + return out.getTotalAckDataLength(); + } else { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + } + void cleanup(boolean invalidateClient) throws IOException { checkStream(); BlockDataStreamOutput out = @@ -180,6 +199,7 @@ public final class BlockDataStreamOutputEntry private long length; private Token<OzoneBlockTokenIdentifier> token; private OzoneClientConfig config; + private List<StreamBuffer> bufferList; public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -219,13 +239,18 @@ public final class BlockDataStreamOutputEntry return this; } + public Builder setBufferList(List<StreamBuffer> bList) { + this.bufferList = bList; + return this; + } + public BlockDataStreamOutputEntry build() { return new BlockDataStreamOutputEntry(blockID, key, xceiverClientManager, pipeline, length, - token, config); + token, config, bufferList); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 4bc55de262..e49b0b79ad 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.StreamBuffer; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -59,6 +60,7 @@ public class BlockDataStreamOutputEntryPool { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; + private List<StreamBuffer> bufferList; @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockDataStreamOutputEntryPool( @@ -83,6 +85,7 @@ public class BlockDataStreamOutputEntryPool { this.requestID = requestId; this.openID = openID; this.excludeList = new ExcludeList(); + this.bufferList = new ArrayList<>(); } /** @@ -142,7 +145,8 @@ public class BlockDataStreamOutputEntryPool { .setPipeline(subKeyInfo.getPipeline()) .setConfig(config) .setLength(subKeyInfo.getLength()) - .setToken(subKeyInfo.getToken()); + .setToken(subKeyInfo.getToken()) + .setBufferList(bufferList); streamEntries.add(builder.build()); } @@ -301,4 +305,12 @@ public class BlockDataStreamOutputEntryPool { boolean isEmpty() { return streamEntries.isEmpty(); } + + long computeBufferData() { + long totalDataLen =0; + for (StreamBuffer b : bufferList){ + totalDataLen += b.length(); + } + return totalDataLen; + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 9bba89d0a8..2540e42e24 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -278,11 +278,14 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { } Pipeline pipeline = streamEntry.getPipeline(); PipelineID pipelineId = pipeline.getId(); - + long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); + //set the correct length for the current stream + streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long containerId = streamEntry.getBlockID().getContainerID(); Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList(); + long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData(); if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } @@ -316,6 +319,13 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput { blockDataStreamOutputEntryPool .discardPreallocatedBlocks(-1, pipelineId); } + if (bufferedDataLen > 0) { + // If the data is still cached in the underlying stream, we need to + // allocate new block and write this data in the datanode. + handleRetry(exception, bufferedDataLen); + // reset the retryCount after handling the exception + retryCount = 0; + } } private void markStreamClosed() { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index d3b2d22577..05a101951b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -21,15 +21,19 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -127,21 +131,25 @@ public class TestBlockDataStreamOutput { @Test public void testHalfChunkWrite() throws Exception { testWrite(chunkSize / 2); + testWriteWithFailure(chunkSize/2); } @Test public void testSingleChunkWrite() throws Exception { testWrite(chunkSize); + testWriteWithFailure(chunkSize); } @Test public void testMultiChunkWrite() throws Exception { testWrite(chunkSize + 50); + testWriteWithFailure(chunkSize + 50); } @Test public void testMultiBlockWrite() throws Exception { testWrite(blockSize + 50); + testWriteWithFailure(blockSize + 50); } private void testWrite(int dataLength) throws Exception { @@ -156,6 +164,28 @@ public class TestBlockDataStreamOutput { key.close(); validateData(keyName, data); } + + private void testWriteWithFailure(int dataLength) throws Exception { + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + ByteBuffer b = ByteBuffer.wrap(data); + key.write(b); + KeyDataStreamOutput keyDataStreamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + ByteBufferStreamOutput stream = + keyDataStreamOutput.getStreamEntries().get(0).getByteBufStreamOutput(); + Assert.assertTrue(stream instanceof BlockDataStreamOutput); + TestHelper.waitForContainerClose(key, cluster); + key.write(b); + key.close(); + String dataString = new String(data, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + } + private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java index cf3a51241e..14cd1b66f4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java @@ -46,7 +46,9 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry; import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; @@ -205,6 +207,24 @@ public final class TestHelper { waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); } + + public static void waitForContainerClose(OzoneDataStreamOutput outputStream, + MiniOzoneCluster cluster) throws Exception { + KeyDataStreamOutput keyOutputStream = + (KeyDataStreamOutput) outputStream.getByteBufStreamOutput(); + List<BlockDataStreamOutputEntry> streamEntryList = + keyOutputStream.getStreamEntries(); + List<Long> containerIdList = new ArrayList<>(); + for (BlockDataStreamOutputEntry entry : streamEntryList) { + long id = entry.getBlockID().getContainerID(); + if (!containerIdList.contains(id)) { + containerIdList.add(id); + } + } + Assert.assertTrue(!containerIdList.isEmpty()); + waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + } + public static void waitForPipelineClose(OzoneOutputStream outputStream, MiniOzoneCluster cluster, boolean waitForContainerCreation) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
