This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7f47535705 HDDS-11239. Fix KeyOutputStream's exception handling when
calling hsync concurrently (#7047)
7f47535705 is described below
commit 7f47535705ca57b479a644538f1aeb52419df223
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Aug 21 13:32:45 2024 -0700
HDDS-11239. Fix KeyOutputStream's exception handling when calling hsync
concurrently (#7047)
---
.../org/apache/hadoop/hdds/scm/ErrorInjector.java | 31 +++++
.../hadoop/hdds/scm/XceiverClientCreator.java | 8 +-
.../hadoop/hdds/scm/XceiverClientManager.java | 1 +
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 16 ++-
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 82 ++++++-------
.../server/ratis/ContainerStateMachine.java | 8 +-
.../ozone/client/io/BlockOutputStreamEntry.java | 59 +++++++++-
.../client/io/BlockOutputStreamEntryPool.java | 17 +--
.../client/io/ECBlockOutputStreamEntryPool.java | 2 +-
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 2 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 96 ++++++++++++---
.../ozone/client/io/TestKeyOutputStream.java | 4 +-
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 131 +++++++++++++++++++++
13 files changed, 382 insertions(+), 75 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java
new file mode 100644
index 0000000000..35806967a5
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+
+/**
+ * Client side error injector allowing simulating receiving errors from server
side.
+ */
+@FunctionalInterface
+public interface ErrorInjector {
+ RaftClientReply getResponse(ContainerProtos.ContainerCommandRequestProto
request, ClientId id, Pipeline pipeline);
+}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
index cd46bc49a1..75ae01c100 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
@@ -31,6 +31,12 @@ import java.io.IOException;
* Factory for XceiverClientSpi implementations. Client instances are not
cached.
*/
public class XceiverClientCreator implements XceiverClientFactory {
+ private static ErrorInjector errorInjector;
+
+ public static void enableErrorInjection(ErrorInjector injector) {
+ errorInjector = injector;
+ }
+
private final ConfigurationSource conf;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
@@ -60,7 +66,7 @@ public class XceiverClientCreator implements
XceiverClientFactory {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
- client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
trustManager);
+ client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
trustManager, errorInjector);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, trustManager);
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 285a47ec57..07b7044172 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -61,6 +61,7 @@ import org.slf4j.LoggerFactory;
public class XceiverClientManager extends XceiverClientCreator {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);
+
private final Cache<String, XceiverClientSpi> clientCache;
private final CacheMetrics cacheMetrics;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index eb0ed0a885..b0ef85cfbf 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -79,12 +79,12 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf) {
- return newXceiverClientRatis(pipeline, ozoneConf, null);
+ return newXceiverClientRatis(pipeline, ozoneConf, null, null);
}
public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
- ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
+ ConfigurationSource ozoneConf, ClientTrustManager trustManager,
ErrorInjector errorInjector) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
@@ -93,7 +93,7 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
SecurityConfig(ozoneConf), trustManager);
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType),
- retryPolicy, tlsConfig, ozoneConf);
+ retryPolicy, tlsConfig, ozoneConf, errorInjector);
}
private final Pipeline pipeline;
@@ -110,13 +110,14 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
= XceiverClientManager.getXceiverClientMetrics();
private final RaftProtos.ReplicationLevel watchType;
private final int majority;
+ private final ErrorInjector errorInjector;
/**
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
- ConfigurationSource configuration) {
+ ConfigurationSource configuration, ErrorInjector errorInjector) {
super();
this.pipeline = pipeline;
this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) +
1;
@@ -142,6 +143,7 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
new Throwable("TRACE"));
}
+ this.errorInjector = errorInjector;
}
private long updateCommitInfosMap(RaftClientReply reply,
RaftProtos.ReplicationLevel level) {
@@ -248,6 +250,12 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) {
+ if (errorInjector != null) {
+ RaftClientReply response = errorInjector.getResponse(request,
getClient().getId(), pipeline);
+ if (response != null) {
+ return CompletableFuture.completedFuture(response);
+ }
+ }
return TracingUtil.executeInNewSpan(
"XceiverClientRatis." + request.getCmdType().name(),
() -> {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 43ac69818f..e88b097c49 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -102,7 +102,7 @@ public class BlockOutputStream extends OutputStream {
= new AtomicReference<>();
private final BlockData.Builder containerBlockData;
- private XceiverClientFactory xceiverClientFactory;
+ private volatile XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private OzoneClientConfig config;
private StreamBufferArgs streamBufferArgs;
@@ -216,7 +216,8 @@ public class BlockOutputStream extends OutputStream {
this.token.encodeToUrlString();
//number of buffers used before doing a flush
- refreshCurrentBuffer();
+ currentBuffer = null;
+ currentBufferRemaining = 0;
flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() /
streamBufferArgs
.getStreamBufferSize());
@@ -254,12 +255,6 @@ public class BlockOutputStream extends OutputStream {
return true;
}
- synchronized void refreshCurrentBuffer() {
- currentBuffer = bufferPool.getCurrentBuffer();
- currentBufferRemaining =
- currentBuffer != null ? currentBuffer.remaining() : 0;
- }
-
public BlockID getBlockID() {
return blockID.get();
}
@@ -418,42 +413,44 @@ public class BlockOutputStream extends OutputStream {
* @param len length of data to write
* @throws IOException if error occurred
*/
-
- // In this case, the data is already cached in the currentBuffer.
public synchronized void writeOnRetry(long len) throws IOException {
if (len == 0) {
return;
}
+
+ // In this case, the data from the failing (previous) block already cached
in the allocated buffers in
+ // the BufferPool. For each pending buffers in the BufferPool, we
sequentially flush it and wait synchronously.
+
+ List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
if (LOG.isDebugEnabled()) {
- LOG.debug("Retrying write length {} for blockID {}", len, blockID);
+ LOG.debug("{}: Retrying write length {} on target blockID {}, {}
buffers", this, len, blockID,
+ allocatedBuffers.size());
}
Preconditions.checkArgument(len <=
streamBufferArgs.getStreamBufferMaxSize());
int count = 0;
- List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
while (len > 0) {
ChunkBuffer buffer = allocatedBuffers.get(count);
long writeLen = Math.min(buffer.position(), len);
- if (!buffer.hasRemaining()) {
- writeChunk(buffer);
- }
len -= writeLen;
count++;
writtenDataLength += writeLen;
- // we should not call isBufferFull/shouldFlush here.
- // The buffer might already be full as whole data is already cached in
- // the buffer. We should just validate
- // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
- // call for handling full buffer/flush buffer condition.
- if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() ==
0) {
- // reset the position to zero as now we will be reading the
- // next buffer in the list
- updateWriteChunkLength();
- updatePutBlockLength();
- CompletableFuture<PutBlockResult> putBlockResultFuture =
executePutBlock(false, false);
- recordWatchForCommitAsync(putBlockResultFuture);
+ updateWriteChunkLength();
+ updatePutBlockLength();
+ LOG.debug("Write chunk on retry buffer = {}", buffer);
+ CompletableFuture<PutBlockResult> putBlockFuture;
+ if (allowPutBlockPiggybacking) {
+ putBlockFuture = writeChunkAndPutBlock(buffer, false);
+ } else {
+ writeChunk(buffer);
+ putBlockFuture = executePutBlock(false, false);
}
- if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
- handleFullBuffer();
+ CompletableFuture<Void> watchForCommitAsync =
watchForCommitAsync(putBlockFuture);
+ try {
+ watchForCommitAsync.get();
+ } catch (InterruptedException e) {
+ handleInterruptedException(e, true);
+ } catch (ExecutionException e) {
+ handleExecutionException(e);
}
}
}
@@ -479,14 +476,6 @@ public class BlockOutputStream extends OutputStream {
void releaseBuffersOnException() {
}
- // It may happen that once the exception is encountered , we still might
- // have successfully flushed up to a certain index. Make sure the buffers
- // only contain data which have not been sufficiently replicated
- private void adjustBuffersOnException() {
- releaseBuffersOnException();
- refreshCurrentBuffer();
- }
-
/**
* Watch for a specific commit index.
*/
@@ -633,6 +622,9 @@ public class BlockOutputStream extends OutputStream {
protected void handleFlush(boolean close) throws IOException {
try {
handleFlushInternal(close);
+ if (close) {
+ waitForAllPendingFlushes();
+ }
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
@@ -675,6 +667,17 @@ public class BlockOutputStream extends OutputStream {
}
}
+ public void waitForAllPendingFlushes() throws IOException {
+ // When closing, must wait for all flush futures to complete.
+ try {
+ allPendingFlushFutures.get();
+ } catch (InterruptedException e) {
+ handleInterruptedException(e, true);
+ } catch (ExecutionException e) {
+ handleExecutionException(e);
+ }
+ }
+
private synchronized CompletableFuture<Void>
handleFlushInternalSynchronized(boolean close) throws IOException {
CompletableFuture<PutBlockResult> putBlockResultFuture = null;
// flush the last chunk data residing on the currentBuffer
@@ -740,6 +743,7 @@ public class BlockOutputStream extends OutputStream {
// Preconditions.checkArgument(buffer.position() == 0);
// bufferPool.checkBufferPoolEmpty();
} else {
+ waitForAllPendingFlushes();
cleanup(false);
}
}
@@ -783,7 +787,7 @@ public class BlockOutputStream extends OutputStream {
void cleanup() {
}
- public void cleanup(boolean invalidateClient) {
+ public synchronized void cleanup(boolean invalidateClient) {
if (xceiverClientFactory != null) {
xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
}
@@ -811,7 +815,6 @@ public class BlockOutputStream extends OutputStream {
if (isClosed()) {
throw new IOException("BlockOutputStream has been closed.");
} else if (getIoException() != null) {
- adjustBuffersOnException();
throw getIoException();
}
}
@@ -1148,7 +1151,6 @@ public class BlockOutputStream extends OutputStream {
*/
private void handleExecutionException(Exception ex) throws IOException {
setIoException(ex);
- adjustBuffersOnException();
throw getIoException();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 28b9e151ff..b3398de07a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -581,7 +581,10 @@ public class ContainerStateMachine extends
BaseStateMachine {
writeChunkFuture.thenApply(r -> {
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
- && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
+ && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+ // After concurrent flushes are allowed on the same key, chunk file
inconsistencies can happen and
+ // that should not crash the pipeline.
+ && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY)
{
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
@@ -1061,7 +1064,8 @@ public class ContainerStateMachine extends
BaseStateMachine {
// unhealthy
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
- && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
+ && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+ && r.getResult() !=
ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 5e6ecceefa..2ae3e47553 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -22,6 +22,8 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
import java.util.function.Supplier;
import org.apache.hadoop.fs.Syncable;
@@ -41,6 +43,8 @@ import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A BlockOutputStreamEntry manages the data writes into the DataNodes.
@@ -51,9 +55,9 @@ import org.apache.ratis.util.JavaUtils;
* but there can be other implementations that are using a different way.
*/
public class BlockOutputStreamEntry extends OutputStream {
-
+ public static final Logger LOG =
LoggerFactory.getLogger(BlockOutputStreamEntry.class);
private final OzoneClientConfig config;
- private OutputStream outputStream;
+ private BlockOutputStream outputStream;
private BlockID blockID;
private final String key;
private final XceiverClientFactory xceiverClientManager;
@@ -69,6 +73,18 @@ public class BlockOutputStreamEntry extends OutputStream {
private final StreamBufferArgs streamBufferArgs;
private final Supplier<ExecutorService> executorServiceSupplier;
+ /**
+ * An indicator that this BlockOutputStream is created to handoff writes
from another faulty BlockOutputStream.
+ * Once this flag is on, this BlockOutputStream can only handle writeOnRetry.
+ */
+ private volatile boolean isHandlingRetry;
+
+ /**
+ * To record how many calls(write, flush) are being handled by this block.
+ */
+ private AtomicInteger inflightCalls = new AtomicInteger();
+
+
BlockOutputStreamEntry(Builder b) {
this.config = b.config;
this.outputStream = null;
@@ -83,6 +99,7 @@ public class BlockOutputStreamEntry extends OutputStream {
this.clientMetrics = b.clientMetrics;
this.streamBufferArgs = b.streamBufferArgs;
this.executorServiceSupplier = b.executorServiceSupplier;
+ this.isHandlingRetry = b.forRetry;
}
@Override
@@ -102,6 +119,37 @@ public class BlockOutputStreamEntry extends OutputStream {
}
}
+ /** Register when a call (write or flush) is received on this block. */
+ void registerCallReceived() {
+ inflightCalls.incrementAndGet();
+ }
+
+ /**
+ * Register when a call (write or flush) is finished on this block.
+ * @return true if all the calls are done.
+ */
+ boolean registerCallFinished() {
+ return inflightCalls.decrementAndGet() == 0;
+ }
+
+ void waitForRetryHandling(Condition retryHandlingCond) throws
InterruptedException {
+ while (isHandlingRetry) {
+ LOG.info("{} : Block to wait for retry handling.", this);
+ retryHandlingCond.await();
+ LOG.info("{} : Done waiting for retry handling.", this);
+ }
+ }
+
+ void finishRetryHandling(Condition retryHandlingCond) {
+ LOG.info("{}: Exiting retry handling mode", this);
+ isHandlingRetry = false;
+ retryHandlingCond.signalAll();
+ }
+
+ void waitForAllPendingFlushes() throws IOException {
+ outputStream.waitForAllPendingFlushes();
+ }
+
/**
* Creates the outputStreams that are necessary to start the write.
* Implementors can override this to instantiate multiple streams instead.
@@ -144,6 +192,7 @@ public class BlockOutputStreamEntry extends OutputStream {
BlockOutputStream out = (BlockOutputStream) getOutputStream();
out.writeOnRetry(len);
incCurrentPosition(len);
+ LOG.info("{}: Finish retrying with len {}, currentPosition {}", this, len,
currentPosition);
}
@Override
@@ -368,6 +417,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;
+ private boolean forRetry;
public Pipeline getPipeline() {
return pipeline;
@@ -433,6 +483,11 @@ public class BlockOutputStreamEntry extends OutputStream {
return this;
}
+ public Builder setForRetry(boolean forRetry) {
+ this.forRetry = forRetry;
+ return this;
+ }
+
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(this);
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 99899c6874..3705a13637 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -141,7 +141,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
// only the blocks allocated in this open session (block createVersion
// equals to open session version)
for (OmKeyLocationInfo subKeyInfo : version.getLocationList(openVersion)) {
- addKeyLocationInfo(subKeyInfo);
+ addKeyLocationInfo(subKeyInfo, false);
}
}
@@ -154,7 +154,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
* key to be written.
* @return a BlockOutputStreamEntry instance that handles how data is
written.
*/
- BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo,
boolean forRetry) {
return
new BlockOutputStreamEntry.Builder()
.setBlockID(subKeyInfo.getBlockID())
@@ -168,12 +168,13 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
.setClientMetrics(clientMetrics)
.setStreamBufferArgs(streamBufferArgs)
.setExecutorServiceSupplier(executorServiceSupplier)
+ .setForRetry(forRetry)
.build();
}
- private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+ private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo,
boolean forRetry) {
Preconditions.checkNotNull(subKeyInfo.getPipeline());
- streamEntries.add(createStreamEntry(subKeyInfo));
+ streamEntries.add(createStreamEntry(subKeyInfo, forRetry));
}
/**
@@ -295,13 +296,13 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
*
* @throws IOException
*/
- private void allocateNewBlock() throws IOException {
+ private void allocateNewBlock(boolean forRetry) throws IOException {
if (!excludeList.isEmpty()) {
LOG.debug("Allocating block with {}", excludeList);
}
OmKeyLocationInfo subKeyInfo =
omClient.allocateBlock(keyArgs, openID, excludeList);
- addKeyLocationInfo(subKeyInfo);
+ addKeyLocationInfo(subKeyInfo, forRetry);
}
/**
@@ -379,7 +380,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
* @return the new current open stream to write to
* @throws IOException if the block allocation failed.
*/
- synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws
IOException {
+ synchronized BlockOutputStreamEntry allocateBlockIfNeeded(boolean forRetry)
throws IOException {
BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
if (streamEntry != null && streamEntry.isClosed()) {
// a stream entry gets closed either by :
@@ -391,7 +392,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
Preconditions.checkNotNull(omClient);
// allocate a new block, if a exception happens, log an error and
// throw exception to the caller directly, and the write fails.
- allocateNewBlock();
+ allocateNewBlock(forRetry);
}
// in theory, this condition should never violate due the check above
// still do a sanity check.
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index 6eb9aed0d3..f891724270 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -37,7 +37,7 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
}
@Override
- ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+ ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo,
boolean forRetry) {
final ECBlockOutputStreamEntry.Builder b = new
ECBlockOutputStreamEntry.Builder();
b.setBlockID(subKeyInfo.getBlockID())
.setKey(getKeyName())
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 0de61f8485..ea3a3592a5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -315,7 +315,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
private void writeDataCells(ECChunkBuffers stripe) throws IOException {
final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool =
getBlockOutputStreamEntryPool();
- blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ blockOutputStreamEntryPool.allocateBlockIfNeeded(false);
ByteBuffer[] dataCells = stripe.getDataBuffers();
for (int i = 0; i < numDataBlks; i++) {
if (dataCells[i].limit() > 0) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 549607c59a..4f9e5db49a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -25,6 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -58,6 +61,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,14 +113,28 @@ public class KeyOutputStream extends OutputStream
private boolean atomicKeyCreation;
private ContainerClientMetrics clientMetrics;
private OzoneManagerVersion ozoneManagerVersion;
+ private final Lock writeLock = new ReentrantLock();
+ private final Condition retryHandlingCondition = writeLock.newCondition();
private final int maxConcurrentWritePerKey;
private final KeyOutputStreamSemaphore keyOutputStreamSemaphore;
+ @VisibleForTesting
KeyOutputStreamSemaphore getRequestSemaphore() {
return keyOutputStreamSemaphore;
}
+ /** Required to spy the object in testing. */
+ @VisibleForTesting
+ @SuppressWarnings("unused")
+ KeyOutputStream() {
+ maxConcurrentWritePerKey = 0;
+ keyOutputStreamSemaphore = null;
+ blockOutputStreamEntryPool = null;
+ retryPolicyMap = null;
+ replication = null;
+ }
+
public KeyOutputStream(ReplicationConfig replicationConfig,
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
this.replication = replicationConfig;
closed = false;
@@ -187,7 +205,7 @@ public class KeyOutputStream extends OutputStream
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
*/
- public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup
version, long openVersion) {
+ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long
openVersion) {
blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
}
@@ -227,22 +245,31 @@ public class KeyOutputStream extends OutputStream
return;
}
- synchronized (this) {
+ doInWriteLock(() -> {
handleWrite(b, off, len, false);
writeOffset += len;
- }
+ });
} finally {
getRequestSemaphore().release();
}
}
+ private <E extends Throwable> void doInWriteLock(CheckedRunnable<E> block)
throws E {
+ writeLock.lock();
+ try {
+ block.run();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
@VisibleForTesting
void handleWrite(byte[] b, int off, long len, boolean retry)
throws IOException {
while (len > 0) {
try {
BlockOutputStreamEntry current =
- blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ blockOutputStreamEntryPool.allocateBlockIfNeeded(retry);
// length(len) will be in int range if the call is happening through
// write API of blockOutputStream. Length can be in long range if it
// comes via Exception path.
@@ -272,12 +299,18 @@ public class KeyOutputStream extends OutputStream
boolean retry, long len, byte[] b, int writeLen, int off, long
currentPos)
throws IOException {
try {
+ current.registerCallReceived();
if (retry) {
current.writeOnRetry(len);
} else {
+ waitForRetryHandling(current);
current.write(b, off, writeLen);
offset += writeLen;
}
+ current.registerCallFinished();
+ } catch (InterruptedException e) {
+ current.registerCallFinished();
+ throw new InterruptedIOException();
} catch (IOException ioe) {
// for the current iteration, totalDataWritten - currentPos gives the
// amount of data already written to the buffer
@@ -295,11 +328,24 @@ public class KeyOutputStream extends OutputStream
offset += writeLen;
}
LOG.debug("writeLen {}, total len {}", writeLen, len);
- handleException(current, ioe);
+ handleException(current, ioe, retry);
}
return writeLen;
}
+ private void handleException(BlockOutputStreamEntry entry, IOException
exception, boolean fromRetry)
+ throws IOException {
+ doInWriteLock(() -> {
+ handleExceptionInternal(entry, exception);
+ BlockOutputStreamEntry current =
blockOutputStreamEntryPool.getCurrentStreamEntry();
+ if (!fromRetry && entry.registerCallFinished()) {
+ // When the faulty block finishes handling all its pending call, the
current block can exit retry
+ // handling mode and unblock normal calls.
+ current.finishRetryHandling(retryHandlingCondition);
+ }
+ });
+ }
+
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
@@ -310,8 +356,15 @@ public class KeyOutputStream extends OutputStream
* @param exception actual exception that occurred
* @throws IOException Throws IOException if Write fails
*/
- private synchronized void handleException(BlockOutputStreamEntry streamEntry,
- IOException exception) throws IOException {
+ private void handleExceptionInternal(BlockOutputStreamEntry streamEntry,
IOException exception) throws IOException {
+ try {
+ // Wait for all pending flushes in the faulty stream. It's possible that
a prior write is pending completion
+ // successfully. Errors are ignored here and will be handled by the
individual flush call. We just want to ensure
+ // all the pending are complete before handling exception.
+ streamEntry.waitForAllPendingFlushes();
+ } catch (IOException ignored) {
+ }
+
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
boolean retryFailure = checkForRetryFailure(t);
@@ -338,8 +391,6 @@ public class KeyOutputStream extends OutputStream
}
Preconditions.checkArgument(
bufferedDataLen <= streamBufferArgs.getStreamBufferMaxSize());
- Preconditions.checkArgument(
- offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers);
@@ -498,12 +549,12 @@ public class KeyOutputStream extends OutputStream
final long hsyncPos = writeOffset;
handleFlushOrClose(StreamAction.HSYNC);
- synchronized (this) {
+ doInWriteLock(() -> {
Preconditions.checkState(offset >= hsyncPos,
"offset = %s < hsyncPos = %s", offset, hsyncPos);
MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
() -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
- }
+ });
} finally {
getRequestSemaphore().release();
}
@@ -532,14 +583,23 @@ public class KeyOutputStream extends OutputStream
BlockOutputStreamEntry entry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
if (entry != null) {
+ // If the current block is to handle retries, wait until all the
retries are done.
+ waitForRetryHandling(entry);
+ entry.registerCallReceived();
try {
handleStreamAction(entry, op);
+ entry.registerCallFinished();
} catch (IOException ioe) {
- handleException(entry, ioe);
+ handleException(entry, ioe, false);
continue;
+ } catch (Exception e) {
+ entry.registerCallFinished();
+ throw e;
}
}
return;
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
} catch (Exception e) {
markStreamClosed();
throw e;
@@ -548,6 +608,10 @@ public class KeyOutputStream extends OutputStream
}
}
+ private void waitForRetryHandling(BlockOutputStreamEntry currentEntry)
throws InterruptedException {
+ doInWriteLock(() ->
currentEntry.waitForRetryHandling(retryHandlingCondition));
+ }
+
private void handleStreamAction(BlockOutputStreamEntry entry,
StreamAction op) throws IOException {
Collection<DatanodeDetails> failedServers = entry.getFailedServers();
@@ -583,7 +647,11 @@ public class KeyOutputStream extends OutputStream
* @throws IOException
*/
@Override
- public synchronized void close() throws IOException {
+ public void close() throws IOException {
+ doInWriteLock(this::closeInternal);
+ }
+
+ private void closeInternal() throws IOException {
if (closed) {
return;
}
@@ -781,7 +849,7 @@ public class KeyOutputStream extends OutputStream
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
- private synchronized void checkNotClosed() throws IOException {
+ private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
index 6b6abceff3..d90a335321 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
@@ -34,7 +34,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -54,7 +54,7 @@ public class TestKeyOutputStream {
void testConcurrentWriteLimitOne() throws Exception {
// Verify the semaphore is working to limit the number of concurrent
writes allowed.
KeyOutputStreamSemaphore sema1 = new KeyOutputStreamSemaphore(1);
- KeyOutputStream keyOutputStream = mock(KeyOutputStream.class);
+ KeyOutputStream keyOutputStream = spy(KeyOutputStream.class);
when(keyOutputStream.getRequestSemaphore()).thenReturn(sema1);
final AtomicInteger countWrite = new AtomicInteger(0);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index f27ebaa972..49b515d53c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -45,6 +45,9 @@ import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.ErrorInjector;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.utils.IOUtils;
@@ -95,6 +98,11 @@ import
org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
@@ -518,6 +526,7 @@ public class TestHSync {
@Test
public void testUncommittedBlocks() throws Exception {
+ waitForEmptyDeletedTable();
// Set the fs.defaultFS
final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
@@ -698,6 +707,99 @@ public class TestHSync {
}, 500, 3000);
}
+
+ public static Stream<Arguments> concurrentExceptionHandling() {
+ return Stream.of(
+ Arguments.of(4, 1),
+ Arguments.of(4, 4),
+ Arguments.of(8, 4)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentExceptionHandling")
+ public void testConcurrentExceptionHandling(int syncerThreads, int errors)
throws Exception {
+ final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME,
CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ ErrorInjectorImpl errorInjector = new ErrorInjectorImpl();
+ XceiverClientManager.enableErrorInjection(errorInjector);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName() +
OZONE_URI_DELIMITER + bucket.getName();
+
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ final Path file = new Path(dir, "exceptionhandling");
+ byte[] data = new byte[8];
+ ThreadLocalRandom.current().nextBytes(data);
+ int writes;
+ try (FSDataOutputStream out = fs.create(file, true)) {
+ writes = runConcurrentWriteHSyncWithException(file, out, data,
syncerThreads, errors, errorInjector);
+ }
+ validateWrittenFile(file, fs, data, writes);
+ fs.delete(file, false);
+ }
+ }
+
+ private int runConcurrentWriteHSyncWithException(Path file,
+ final FSDataOutputStream out, byte[] data, int syncThreadsCount, int
errors,
+ ErrorInjectorImpl errorInjector) throws Exception {
+
+ AtomicReference<Exception> writerException = new AtomicReference<>();
+ AtomicReference<Exception> syncerException = new AtomicReference<>();
+
+ LOG.info("runConcurrentWriteHSyncWithException {} with size {}", file,
data.length);
+ AtomicInteger writes = new AtomicInteger();
+ final long start = Time.monotonicNow();
+
+ Runnable syncer = () -> {
+ while ((Time.monotonicNow() - start < 10000)) {
+ try {
+ out.write(data);
+ writes.incrementAndGet();
+ out.hsync();
+ } catch (Exception e) {
+ LOG.error("Error calling hsync", e);
+ syncerException.compareAndSet(null, e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ Thread[] syncThreads = new Thread[syncThreadsCount];
+ for (int i = 0; i < syncThreadsCount; i++) {
+ syncThreads[i] = new Thread(syncer);
+ syncThreads[i].setName("Syncer-" + i);
+ syncThreads[i].start();
+ }
+
+ // Inject error at 3rd second.
+ Runnable startErrorInjector = () -> {
+ while ((Time.monotonicNow() - start <= 3000)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ errorInjector.start(errors);
+ LOG.info("Enabled error injection in XceiverClientRatis");
+ };
+
+ new Thread(startErrorInjector).start();
+
+ for (Thread sync : syncThreads) {
+ sync.join();
+ }
+
+ if (syncerException.get() != null) {
+ throw syncerException.get();
+ }
+ if (writerException.get() != null) {
+ throw writerException.get();
+ }
+ return writes.get();
+ }
+
private int runConcurrentWriteHSync(Path file,
final FSDataOutputStream out, byte[] data, int syncThreadsCount) throws
Exception {
@@ -1320,4 +1422,33 @@ public class TestHSync {
}
return keys;
}
+
+ private static class ErrorInjectorImpl implements ErrorInjector {
+ private final AtomicInteger remaining = new AtomicInteger();
+ void start(int count) {
+ remaining.set(count);
+ }
+ @Override
+ public RaftClientReply
getResponse(ContainerProtos.ContainerCommandRequestProto request, ClientId
clientId,
+ Pipeline pipeline) {
+ int errorNum = remaining.decrementAndGet();
+ if (errorNum >= 0) {
+ ContainerProtos.ContainerCommandResponseProto proto =
ContainerProtos.ContainerCommandResponseProto.newBuilder()
+ .setResult(ContainerProtos.Result.CLOSED_CONTAINER_IO)
+ .setMessage("Simulated error #" + errorNum)
+ .setCmdType(request.getCmdType())
+ .build();
+ RaftClientReply reply = RaftClientReply.newBuilder()
+ .setSuccess(true)
+ .setMessage(Message.valueOf(proto.toByteString()))
+ .setClientId(clientId)
+
.setServerId(RaftPeerId.getRaftPeerId(pipeline.getLeaderId().toString()))
+ .setGroupId(RaftGroupId.randomId())
+ .build();
+ return reply;
+ }
+
+ return null;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]