Repository: hadoop Updated Branches: refs/heads/HDFS-7240 6ce5ec676 -> 18f9fea7c
Revert "HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee." This reverts commit 6ce5ec676164b84a9e2f8dc65b5f2199a141506d. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/18f9fea7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/18f9fea7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/18f9fea7 Branch: refs/heads/HDFS-7240 Commit: 18f9fea7c42bbce0d6f6c3480ac1cd894261f358 Parents: 6ce5ec6 Author: Mukul Kumar Singh <msi...@apache.org> Authored: Wed Jan 17 01:09:48 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Wed Jan 17 01:09:48 2018 +0530 ---------------------------------------------------------------------- .../apache/hadoop/ozone/OzoneConfigKeys.java | 3 - .../ozone/client/io/ChunkGroupOutputStream.java | 54 +--- .../hadoop/ozone/client/rpc/RpcClient.java | 10 - .../hadoop/scm/storage/ChunkOutputStream.java | 257 ++++++------------- .../scm/storage/ContainerProtocolCalls.java | 13 +- .../web/storage/DistributedStorageHandler.java | 10 - .../src/main/resources/ozone-default.xml | 9 - .../hadoop/ozone/ksm/TestKeySpaceManager.java | 2 +- 8 files changed, 85 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index cb3f0f6..8059b5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -107,9 +107,6 @@ public final class OzoneConfigKeys { "ozone.scm.block.size.in.mb"; public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256; - public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB = - "ozone.output.stream.buffer.size.in.mb"; - public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256; /** * Ozone administrator users delimited by comma. * If not set, only the user who launches an ozone service will be the http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index a44a009..fe248e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -19,10 +19,7 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; -import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; @@ -49,9 +46,6 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; - /** * Maintaining a list of ChunkInputStream. Write based on offset. * @@ -78,7 +72,6 @@ public class ChunkGroupOutputStream extends OutputStream { private final XceiverClientManager xceiverClientManager; private final int chunkSize; private final String requestID; - private final long streamBufferSize; /** * A constructor for testing purpose only. @@ -93,7 +86,6 @@ public class ChunkGroupOutputStream extends OutputStream { xceiverClientManager = null; chunkSize = 0; requestID = null; - streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB; } /** @@ -113,26 +105,12 @@ public class ChunkGroupOutputStream extends OutputStream { return streamEntries; } - /** - * Chunkoutput stream, making this package visible since this can be - * created only via builder. - * @param handler - Open Key state. - * @param xceiverClientManager - Communication Manager. - * @param scmClient - SCM protocol Client. - * @param ksmClient - KSM Protocol client - * @param chunkSize - Chunk Size - I/O - * @param requestId - Seed for trace ID generation. - * @param factor - Replication factor - * @param type - Replication Type - RATIS/Standalone etc. - * @param maxBufferSize - Maximum stream buffer Size. - * @throws IOException - Throws this exception if there is an error. - */ - ChunkGroupOutputStream( + public ChunkGroupOutputStream( OpenKeySession handler, XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, KeySpaceManagerProtocolClientSideTranslatorPB ksmClient, int chunkSize, String requestId, ReplicationFactor factor, - ReplicationType type, long maxBufferSize) throws IOException { + ReplicationType type) throws IOException { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.byteOffset = 0; @@ -152,7 +130,6 @@ public class ChunkGroupOutputStream extends OutputStream { this.requestID = requestId; LOG.debug("Expecting open key with one block, but got" + info.getKeyLocationVersions().size()); - this.streamBufferSize = maxBufferSize; } /** @@ -207,7 +184,7 @@ public class ChunkGroupOutputStream extends OutputStream { } streamEntries.add(new ChunkOutputStreamEntry(containerKey, keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength(), this.streamBufferSize)); + chunkSize, subKeyInfo.getLength())); } @@ -347,7 +324,6 @@ public class ChunkGroupOutputStream extends OutputStream { private String requestID; private ReplicationType type; private ReplicationFactor factor; - private long streamBufferSize; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -391,23 +367,9 @@ public class ChunkGroupOutputStream extends OutputStream { return this; } - public Builder setStreamBufferSize(long blockSize) { - this.streamBufferSize = blockSize; - return this; - } - public ChunkGroupOutputStream build() throws IOException { - Preconditions.checkNotNull(openHandler); - Preconditions.checkNotNull(xceiverManager); - Preconditions.checkNotNull(scmClient); - Preconditions.checkNotNull(ksmClient); - Preconditions.checkState(chunkSize > 0); - Preconditions.checkState(StringUtils.isNotEmpty(requestID)); - Preconditions - .checkState(streamBufferSize > 0 && streamBufferSize > chunkSize); - return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - ksmClient, chunkSize, requestID, factor, type, streamBufferSize); + ksmClient, chunkSize, requestID, factor, type); } } @@ -423,12 +385,11 @@ public class ChunkGroupOutputStream extends OutputStream { private final long length; // the current position of this stream 0 <= currentPosition < length private long currentPosition; - private long streamBufferSize; // Max block size. ChunkOutputStreamEntry(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, - long length, long streamBufferSize) { + long length) { this.outputStream = null; this.containerKey = containerKey; this.key = key; @@ -439,7 +400,6 @@ public class ChunkGroupOutputStream extends OutputStream { this.length = length; this.currentPosition = 0; - this.streamBufferSize = streamBufferSize; } /** @@ -458,8 +418,6 @@ public class ChunkGroupOutputStream extends OutputStream { this.length = length; this.currentPosition = 0; - this.streamBufferSize = - OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB; } long getLength() { @@ -474,7 +432,7 @@ public class ChunkGroupOutputStream extends OutputStream { if (this.outputStream == null) { this.outputStream = new ChunkOutputStream(containerKey, key, xceiverClientManager, xceiverClient, - requestId, chunkSize, Ints.checkedCast(streamBufferSize)); + requestId, chunkSize); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 20f2b54..94038e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -74,11 +74,6 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; - /** * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode * to execute client calls. This uses RPC protocol for communication @@ -99,7 +94,6 @@ public class RpcClient implements ClientProtocol { private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; - private final long streamBufferSize; /** * Creates RpcClient instance with the given configuration. @@ -154,9 +148,6 @@ public class RpcClient implements ClientProtocol { } else { chunkSize = configuredChunkSize; } - // streamBufferSize by default is set equal to default scm block size. - streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB, - OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB; } @Override @@ -472,7 +463,6 @@ public class RpcClient implements ClientProtocol { .setRequestID(requestId) .setType(OzoneProtos.ReplicationType.valueOf(type.toString())) .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue())) - .setStreamBufferSize(streamBufferSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java index 916a506..64c10da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java @@ -18,32 +18,22 @@ package org.apache.hadoop.scm.storage; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.util.Time; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.SUCCESS; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; +import com.google.protobuf.ByteString; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; /** * An {@link OutputStream} used by the REST service in combination with the @@ -67,12 +57,12 @@ public class ChunkOutputStream extends OutputStream { private final String key; private final String traceID; private final KeyData.Builder containerKeyData; - private final String streamId; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; private ByteBuffer buffer; + private final String streamId; + private int chunkIndex; private int chunkSize; - private int streamBufferSize; /** * Creates a new ChunkOutputStream. @@ -83,18 +73,14 @@ public class ChunkOutputStream extends OutputStream { * @param xceiverClient client to perform container calls * @param traceID container protocol call args * @param chunkSize chunk size - * @param maxBufferSize -- Controls the maximum amount of memory that we need - * to allocate data buffering. */ public ChunkOutputStream(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, - String traceID, int chunkSize, int maxBufferSize) { + String traceID, int chunkSize) { this.containerKey = containerKey; this.key = key; this.traceID = traceID; this.chunkSize = chunkSize; - this.streamBufferSize = maxBufferSize; - KeyValue keyValue = KeyValue.newBuilder() .setKey("TYPE").setValue("KEY").build(); this.containerKeyData = KeyData.newBuilder() @@ -103,24 +89,22 @@ public class ChunkOutputStream extends OutputStream { .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(maxBufferSize); + this.buffer = ByteBuffer.allocate(chunkSize); this.streamId = UUID.randomUUID().toString(); + this.chunkIndex = 0; } - /** - * {@inheritDoc} - */ @Override public synchronized void write(int b) throws IOException { checkOpen(); - byte[] c = new byte[1]; - c[0] = (byte) b; - write(c, 0, 1); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put((byte)b); + if (buffer.position() == chunkSize) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } } - /** - * {@inheritDoc} - */ @Override public void write(byte[] b, int off, int len) throws IOException { if (b == null) { @@ -134,90 +118,17 @@ public class ChunkOutputStream extends OutputStream { return; } checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - try { - List<ImmutablePair<CompletableFuture<ContainerProtos - .ContainerCommandResponseProto>, ChunkInfo>> - writeFutures = writeInParallel(b, off, len); - // This is a rendezvous point for this function call, all chunk I/O - // for this block must complete before we can declare this call as - // complete. - - // Wait until all the futures complete or throws an exception if any of - // the calls ended with an exception this call will throw. - // if futures is null, it means that we wrote the data to the buffer and - // returned. - if (writeFutures != null) { - CompletableFuture.allOf(writeFutures.toArray(new - CompletableFuture[writeFutures.size()])).join(); - - // Wrote this data, we will clear this buffer now. - buffer.clear(); - } - } catch (InterruptedException | ExecutionException e) { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - throw new IOException("Unexpected error in write. ", e); - } - } - - /** - * Write a given block into many small chunks in parallel. - * - * @param b - * @param off - * @param len - * @throws IOException - * @throws ExecutionException - * @throws InterruptedException - */ - public List<ImmutablePair<CompletableFuture<ContainerProtos - .ContainerCommandResponseProto>, ChunkInfo>> - writeInParallel(byte[] b, int off, int len) - throws IOException, ExecutionException, InterruptedException { - - Preconditions.checkArgument(len <= streamBufferSize, - "A chunk write cannot be " + "larger than max buffer size limit."); - long newBlockCount = len / chunkSize; - buffer.put(b, off, len); - List<ImmutablePair<CompletableFuture<ContainerProtos - .ContainerCommandResponseProto>, ChunkInfo>> - writeFutures = new LinkedList<>(); - - // We if must have at least a chunkSize of data ready to write, if so we - // will go ahead and start writing that data. - if (buffer.position() >= chunkSize) { - // Allocate new byte slices which will point to each chunk of data - // that we want to write. Divide the byte buffer into individual chunks - // each of length equals to chunkSize max where each chunk will be - // assigned a chunkId where, for each chunk the async write requests will - // be made and wait for all of them to return before the write call - // returns. - for (int chunkId = 0; chunkId < newBlockCount; chunkId++) { - // Please note : We are not flipping the slice when we write since - // the slices are pointing the buffer start and end as needed for - // the chunk write. Also please note, Duplicate does not create a - // copy of data, it only creates metadata that points to the data - // stream. - ByteBuffer chunk = buffer.duplicate(); - Preconditions.checkState((chunkId * chunkSize) < buffer.limit(), - "Chunk offset cannot be beyond the limits of the buffer."); - chunk.position(chunkId * chunkSize); - // Min handles the case where the last block might be lesser than - // chunk Size. - chunk.limit(chunk.position() + - Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize))); - - // Schedule all the writes, this is a non-block call which returns - // futures. We collect these futures and wait for all of them to - // complete in the next line. - writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize)); + while (len > 0) { + int writeLen = Math.min(chunkSize - buffer.position(), len); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put(b, off, writeLen); + if (buffer.position() == chunkSize) { + flushBufferToChunk(rollbackPosition, rollbackLimit); } - return writeFutures; + off += writeLen; + len -= writeLen; } - // Nothing to do , return null. - return null; } @Override @@ -226,19 +137,7 @@ public class ChunkOutputStream extends OutputStream { if (buffer.position() > 0) { int rollbackPosition = buffer.position(); int rollbackLimit = buffer.limit(); - ByteBuffer chunk = buffer.duplicate(); - try { - - ImmutablePair<CompletableFuture<ContainerProtos - .ContainerCommandResponseProto>, ChunkInfo> - result = writeChunkToContainer(chunk, 0, chunkSize); - updateChunkInfo(result); - buffer.clear(); - } catch (ExecutionException | InterruptedException e) { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - throw new IOException("Failure in flush", e); - } + flushBufferToChunk(rollbackPosition, rollbackLimit); } } @@ -248,20 +147,10 @@ public class ChunkOutputStream extends OutputStream { buffer != null) { try { if (buffer.position() > 0) { - // This flip is needed since this is the real buffer to which we - // are writing and position will have moved each time we did a put. - buffer.flip(); - - // Call get immediately to make this call Synchronous. - - ImmutablePair<CompletableFuture<ContainerProtos - .ContainerCommandResponseProto>, ChunkInfo> - result = writeChunkToContainer(buffer, 0, buffer.limit()); - updateChunkInfo(result); - buffer.clear(); + writeChunkToContainer(); } putKey(xceiverClient, containerKeyData.build(), traceID); - } catch (IOException | InterruptedException | ExecutionException e) { + } catch (IOException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { @@ -274,24 +163,6 @@ public class ChunkOutputStream extends OutputStream { } - private void updateChunkInfo( - ImmutablePair< - CompletableFuture<ContainerProtos.ContainerCommandResponseProto>, - ChunkInfo - > result) throws InterruptedException, ExecutionException { - // Wait for this call to complete. - ContainerProtos.ContainerCommandResponseProto response = - result.getLeft().get(); - - // If the write call to the chunk is successful, we need to add that - // chunk information to the containerKeyData. - // TODO: Clean up the garbage in case of failure. - if(response.getResult() == SUCCESS) { - ChunkInfo chunk = result.getRight(); - containerKeyData.addChunks(chunk); - } - } - /** * Checks if the stream is open. If not, throws an exception. * @@ -304,35 +175,53 @@ public class ChunkOutputStream extends OutputStream { } /** + * Attempts to flush buffered writes by writing a new chunk to the container. + * If successful, then clears the buffer to prepare to receive writes for a + * new chunk. + * + * @param rollbackPosition position to restore in buffer if write fails + * @param rollbackLimit limit to restore in buffer if write fails + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void flushBufferToChunk(int rollbackPosition, + int rollbackLimit) throws IOException { + boolean success = false; + try { + writeChunkToContainer(); + success = true; + } finally { + if (success) { + buffer.clear(); + } else { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + } + } + } + + /** * Writes buffered data as a new chunk to the container and saves chunk * information to be used later in putKey call. * - * @param data -- Data to write. - * @param offset - offset to the data buffer - * @param len - Length in bytes - * @return Returns a Immutable pair -- A future object that will contian - * the result of the operation, and the chunkInfo that we wrote. - * - * @throws IOException - * @throws ExecutionException - * @throws InterruptedException + * @throws IOException if there is an I/O error while performing the call */ - private ImmutablePair< - CompletableFuture<ContainerProtos.ContainerCommandResponseProto>, - ChunkInfo> - writeChunkToContainer(ByteBuffer data, int offset, int len) - throws IOException, ExecutionException, InterruptedException { - - - ByteString dataString = ByteString.copyFrom(data); - ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName( + private synchronized void writeChunkToContainer() throws IOException { + buffer.flip(); + ByteString data = ByteString.copyFrom(buffer); + ChunkInfo chunk = ChunkInfo + .newBuilder() + .setChunkName( DigestUtils.md5Hex(key) + "_stream_" - + streamId + "_chunk_" + Time.monotonicNowNanos()) + + streamId + "_chunk_" + ++chunkIndex) .setOffset(0) - .setLen(len) + .setLen(data.size()) .build(); - CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response = - writeChunk(xceiverClient, chunk, key, dataString, traceID); - return new ImmutablePair(response, chunk); + try { + writeChunk(xceiverClient, chunk, key, data, traceID); + } catch (IOException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + containerKeyData.addChunks(chunk); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java index 7d4c72d..1cde67c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -53,9 +53,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - import org.apache.hadoop.scm.XceiverClientSpi; /** @@ -165,10 +162,9 @@ public final class ContainerProtocolCalls { * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ - public static CompletableFuture<ContainerCommandResponseProto> writeChunk( - XceiverClientSpi xceiverClient, ChunkInfo chunk, String key, - ByteString data, String traceID) - throws IOException, ExecutionException, InterruptedException { + public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, + String key, ByteString data, String traceID) + throws IOException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto .newBuilder() .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) @@ -183,7 +179,8 @@ public final class ContainerProtocolCalls { .setDatanodeID(id) .setWriteChunk(writeChunkRequest) .build(); - return xceiverClient.sendCommandAsync(request); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 137f8f9..1830c71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -67,11 +67,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.List; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; - /** * A {@link StorageHandler} implementation that distributes object storage * across the nodes of an HDFS cluster. @@ -91,7 +86,6 @@ public final class DistributedStorageHandler implements StorageHandler { private final boolean useRatis; private final OzoneProtos.ReplicationType type; private final OzoneProtos.ReplicationFactor factor; - private final long streamBufferSize; /** * Creates a new DistributedStorageHandler. @@ -133,9 +127,6 @@ public final class DistributedStorageHandler implements StorageHandler { chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; } - // streamBufferSize by default is set to default scm block size. - streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB, - OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB; } @Override @@ -427,7 +418,6 @@ public final class DistributedStorageHandler implements StorageHandler { .setRequestID(args.getRequestID()) .setType(xceiverClientManager.getType()) .setFactor(xceiverClientManager.getFactor()) - .setStreamBufferSize(streamBufferSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 4df99f9..31c3901 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -691,15 +691,6 @@ </description> </property> <property> - <name>ozone.output.stream.buffer.size.in.mb</name> - <value>256</value> - <tag>OZONE</tag> - <description> - The maximum size of the buffer allocated for the ozone output stream for - write. Default size is equals to scm block size. - </description> - </property> - <property> <name>ozone.scm.chunk.size</name> <value>16777216</value> <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag> http://git-wip-us.apache.org/repos/asf/hadoop/blob/18f9fea7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java index fc4bedc..c8427f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java @@ -1114,7 +1114,7 @@ public class TestKeySpaceManager { .getMetadataManager().getExpiredOpenKeys(); Assert.assertEquals(0, openKeys.size()); - //Thread.sleep(2000); + Thread.sleep(2000); openKeys = cluster.getKeySpaceManager().getMetadataManager() .getExpiredOpenKeys(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org