Repository: hadoop Updated Branches: refs/heads/HDFS-7240 1fc744c6c -> c70775aff
HDFS-11004. Ozone : move Chunk IO and container protocol calls to hdfs-client. Contributed by Chen Liang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c70775af Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c70775af Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c70775af Branch: refs/heads/HDFS-7240 Commit: c70775aff6113a3bbaa237923fad3c21a73a7793 Parents: 1fc744c Author: Anu Engineer <aengin...@apache.org> Authored: Thu Oct 13 16:34:29 2016 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Thu Oct 13 16:34:29 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/scm/ScmConfigKeys.java | 3 + .../hadoop/scm/storage/ChunkInputStream.java | 191 ++++++++++++++++ .../hadoop/scm/storage/ChunkOutputStream.java | 222 +++++++++++++++++++ .../scm/storage/ContainerProtocolCalls.java | 190 ++++++++++++++++ .../apache/hadoop/scm/storage/package-info.java | 23 ++ .../ozone/web/storage/ChunkInputStream.java | 193 ---------------- .../ozone/web/storage/ChunkOutputStream.java | 219 ------------------ .../web/storage/ContainerProtocolCalls.java | 198 ----------------- .../web/storage/DistributedStorageHandler.java | 22 +- 9 files changed, 641 insertions(+), 620 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index a1b2393..44414ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -29,4 +29,7 @@ public final class ScmConfigKeys { public static final String DFS_CONTAINER_IPC_PORT = "dfs.container.ipc"; public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011; + + // TODO : this is copied from OzoneConsts, may need to move to a better place + public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java new file mode 100644 index 0000000..1206ecd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.storage; + +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.scm.XceiverClient; +import org.apache.hadoop.scm.XceiverClientManager; + +/** + * An {@link InputStream} used by the REST service in combination with the + * SCMClient to read the value of a key from a sequence + * of container chunks. All bytes of the key value are stored in container + * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} + * instances. This class encapsulates all state management for iterating + * through the sequence of chunks and the sequence of buffers within each chunk. + */ +public class ChunkInputStream extends InputStream { + + private static final int EOF = -1; + + private final String key; + private final String traceID; + private XceiverClientManager xceiverClientManager; + private XceiverClient xceiverClient; + private List<ChunkInfo> chunks; + private int chunkOffset; + private List<ByteBuffer> buffers; + private int bufferOffset; + + /** + * Creates a new ChunkInputStream. + * + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param chunks list of chunks to read + * @param traceID container protocol call traceID + */ + public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, + XceiverClient xceiverClient, List<ChunkInfo> chunks, String traceID) { + this.key = key; + this.traceID = traceID; + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.chunks = chunks; + this.chunkOffset = 0; + this.buffers = null; + this.bufferOffset = 0; + } + + @Override + public synchronized int read() + throws IOException { + checkOpen(); + int available = prepareRead(1); + return available == EOF ? EOF : buffers.get(bufferOffset).get(); + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + // According to the JavaDocs for InputStream, it is recommended that + // subclasses provide an override of bulk read if possible for performance + // reasons. In addition to performance, we need to do it for correctness + // reasons. The Ozone REST service uses PipedInputStream and + // PipedOutputStream to relay HTTP response data between a Jersey thread and + // a Netty thread. It turns out that PipedInputStream/PipedOutputStream + // have a subtle dependency (bug?) on the wrapped stream providing separate + // implementations of single-byte read and bulk read. Without this, get key + // responses might close the connection before writing all of the bytes + // advertised in the Content-Length. + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + checkOpen(); + int available = prepareRead(len); + if (available == EOF) { + return EOF; + } + buffers.get(bufferOffset).get(b, off, available); + return available; + } + + @Override + public synchronized void close() { + if (xceiverClientManager != null && xceiverClient != null) { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + } + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkInputStream has been closed."); + } + } + + /** + * Prepares to read by advancing through chunks and buffers as needed until it + * finds data to return or encounters EOF. + * + * @param len desired length of data to read + * @return length of data available to read, possibly less than desired length + */ + private synchronized int prepareRead(int len) throws IOException { + for (;;) { + if (chunks == null || chunks.isEmpty()) { + // This must be an empty key. + return EOF; + } else if (buffers == null) { + // The first read triggers fetching the first chunk. + readChunkFromContainer(0); + } else if (!buffers.isEmpty() && + buffers.get(bufferOffset).hasRemaining()) { + // Data is available from the current buffer. + ByteBuffer bb = buffers.get(bufferOffset); + return len > bb.remaining() ? bb.remaining() : len; + } else if (!buffers.isEmpty() && + !buffers.get(bufferOffset).hasRemaining() && + bufferOffset < buffers.size() - 1) { + // There are additional buffers available. + ++bufferOffset; + } else if (chunkOffset < chunks.size() - 1) { + // There are additional chunks available. + readChunkFromContainer(chunkOffset + 1); + } else { + // All available input has been consumed. + return EOF; + } + } + } + + /** + * Attempts to read the chunk at the specified offset in the chunk list. If + * successful, then the data of the read chunk is saved so that its bytes can + * be returned from subsequent read calls. + * + * @param readChunkOffset offset in the chunk list of which chunk to read + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void readChunkFromContainer(int readChunkOffset) + throws IOException { + final ReadChunkResponseProto readChunkResponse; + try { + readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset), + key, traceID); + } catch (IOException e) { + throw new IOException("Unexpected OzoneException", e); + } + chunkOffset = readChunkOffset; + ByteString byteString = readChunkResponse.getData(); + buffers = byteString.asReadOnlyByteBufferList(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java new file mode 100644 index 0000000..0126e58 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.storage; + +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.UUID; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.scm.ScmConfigKeys; +import org.apache.hadoop.scm.XceiverClient; +import org.apache.hadoop.scm.XceiverClientManager; + +/** + * An {@link OutputStream} used by the REST service in combination with the + * SCMClient to write the value of a key to a sequence + * of container chunks. Writes are buffered locally and periodically written to + * the container as a new chunk. In order to preserve the semantics that + * replacement of a pre-existing key is atomic, each instance of the stream has + * an internal unique identifier. This unique identifier and a monotonically + * increasing chunk index form a composite key that is used as the chunk name. + * After all data is written, a putKey call creates or updates the corresponding + * container key, and this call includes the full list of chunks that make up + * the key data. The list of chunks is updated all at once. Therefore, a + * concurrent reader never can see an intermediate state in which different + * chunks of data from different versions of the key data are interleaved. + * This class encapsulates all state management for buffering and writing + * through to the container. + */ +public class ChunkOutputStream extends OutputStream { + + private final String containerKey; + private final String key; + private final String traceID; + private final KeyData.Builder containerKeyData; + private XceiverClientManager xceiverClientManager; + private XceiverClient xceiverClient; + private ByteBuffer buffer; + private final String streamId; + private int chunkIndex; + + /** + * Creates a new ChunkOutputStream. + * + * @param containerKey container key + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param traceID container protocol call args + */ + public ChunkOutputStream(String containerKey, String key, + XceiverClientManager xceiverClientManager, XceiverClient xceiverClient, + String traceID) { + this.containerKey = containerKey; + this.key = key; + this.traceID = traceID; + KeyValue keyValue = KeyValue.newBuilder() + .setKey("TYPE").setValue("KEY").build(); + this.containerKeyData = KeyData.newBuilder() + .setContainerName(xceiverClient.getPipeline().getContainerName()) + .setName(containerKey) + .addMetadata(keyValue); + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.buffer = ByteBuffer.allocate(ScmConfigKeys.CHUNK_SIZE); + this.streamId = UUID.randomUUID().toString(); + this.chunkIndex = 0; + } + + @Override + public synchronized void write(int b) throws IOException { + checkOpen(); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put((byte)b); + if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return; + } + checkOpen(); + while (len > 0) { + int writeLen = Math.min( + ScmConfigKeys.CHUNK_SIZE - buffer.position(), len); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put(b, off, writeLen); + if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + off += writeLen; + len -= writeLen; + } + } + + @Override + public synchronized void flush() throws IOException { + checkOpen(); + if (buffer.position() > 0) { + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public synchronized void close() throws IOException { + if (xceiverClientManager != null && xceiverClient != null && + buffer != null) { + try { + if (buffer.position() > 0) { + writeChunkToContainer(); + } + putKey(xceiverClient, containerKeyData.build(), traceID); + } catch (IOException e) { + throw new IOException("Unexpected Storage Container Exception", e); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + buffer = null; + } + } + + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkOutputStream has been closed."); + } + } + + /** + * Attempts to flush buffered writes by writing a new chunk to the container. + * If successful, then clears the buffer to prepare to receive writes for a + * new chunk. + * + * @param rollbackPosition position to restore in buffer if write fails + * @param rollbackLimit limit to restore in buffer if write fails + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void flushBufferToChunk(int rollbackPosition, + int rollbackLimit) throws IOException { + boolean success = false; + try { + writeChunkToContainer(); + success = true; + } finally { + if (success) { + buffer.clear(); + } else { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + } + } + } + + /** + * Writes buffered data as a new chunk to the container and saves chunk + * information to be used later in putKey call. + * + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void writeChunkToContainer() throws IOException { + buffer.flip(); + ByteString data = ByteString.copyFrom(buffer); + ChunkInfo chunk = ChunkInfo + .newBuilder() + .setChunkName( + key + "_stream_" + streamId + "_chunk_" + ++chunkIndex) + .setOffset(0) + .setLen(data.size()) + .build(); + try { + writeChunk(xceiverClient, chunk, key, data, traceID); + } catch (IOException e) { + throw new IOException("Unexpected Storage Container Exception", e); + } + containerKeyData.addChunks(chunk); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java new file mode 100644 index 0000000..166b741 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.storage; + +import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; + +import java.io.IOException; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.scm.XceiverClient; + +/** + * Implementation of all container protocol calls performed by + * . + */ +public final class ContainerProtocolCalls { + + /** + * Calls the container protocol to get a container key. + * + * @param xceiverClient client to perform call + * @param containerKeyData key data to identify container + * @param traceID container protocol call args + * @return container protocol get key response + * @throws IOException if there is an I/O error while performing the call + */ + public static GetKeyResponseProto getKey(XceiverClient xceiverClient, + KeyData containerKeyData, String traceID) throws IOException { + GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyData(containerKeyData); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.GetKey) + .setTraceID(traceID) + .setGetKey(readKeyRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, traceID); + return response.getGetKey(); + } + + /** + * Calls the container protocol to put a container key. + * + * @param xceiverClient client to perform call + * @param containerKeyData key data to identify container + * @param traceID container protocol call args + * @throws IOException if there is an I/O error while performing the call + */ + public static void putKey(XceiverClient xceiverClient, + KeyData containerKeyData, String traceID) throws IOException { + PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyData(containerKeyData); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.PutKey) + .setTraceID(traceID) + .setPutKey(createKeyRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, traceID); + } + + /** + * Calls the container protocol to read a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to read + * @param key the key name + * @param traceID container protocol call args + * @return container protocol read chunk response + * @throws IOException if there is an I/O error while performing the call + */ + public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, + ChunkInfo chunk, String key, String traceID) + throws IOException { + ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyName(key) + .setChunkData(chunk); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.ReadChunk) + .setTraceID(traceID) + .setReadChunk(readChunkRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, traceID); + return response.getReadChunk(); + } + + /** + * Calls the container protocol to write a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to write + * @param key the key name + * @param data the data of the chunk to write + * @param traceID container protocol call args + * @throws IOException if there is an I/O error while performing the call + */ + public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, + String key, ByteString data, String traceID) + throws IOException { + WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyName(key) + .setChunkData(chunk) + .setData(data); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.WriteChunk) + .setTraceID(traceID) + .setWriteChunk(writeChunkRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, traceID); + } + + /** + * Validates a response from a container protocol call. Any non-successful + * return code is mapped to a corresponding exception and thrown. + * + * @param response container protocol call response + * @param traceID container protocol call args + * @throws IOException if the container protocol call failed + */ + private static void validateContainerResponse( + ContainerCommandResponseProto response, String traceID + ) throws IOException { + // TODO : throw the right type of exception + switch (response.getResult()) { + case SUCCESS: + break; + case MALFORMED_REQUEST: + throw new IOException(HTTP_BAD_REQUEST + + ":Bad container request: " + traceID); + case UNSUPPORTED_REQUEST: + throw new IOException(HTTP_INTERNAL_ERROR + + "Unsupported container request: " + traceID); + case CONTAINER_INTERNAL_ERROR: + throw new IOException(HTTP_INTERNAL_ERROR + + "Container internal error:" + traceID); + default: + throw new IOException(HTTP_INTERNAL_ERROR + + "Unrecognized container response:" + traceID); + } + } + + /** + * There is no need to instantiate this class. + */ + private ContainerProtocolCalls() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java new file mode 100644 index 0000000..aa89af0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.storage; + +/** + * This package contains StorageContainerManager classes. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java deleted file mode 100644 index f639b4a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.web.storage; - -import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.List; - -import com.google.protobuf.ByteString; - -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.scm.XceiverClient; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.ozone.web.exceptions.OzoneException; -import org.apache.hadoop.ozone.web.handlers.UserArgs; - -/** - * An {@link InputStream} used by the REST service in combination with the - * {@link DistributedStorageHandler} to read the value of a key from a sequence - * of container chunks. All bytes of the key value are stored in container - * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} - * instances. This class encapsulates all state management for iterating - * through the sequence of chunks and the sequence of buffers within each chunk. - */ -class ChunkInputStream extends InputStream { - - private static final int EOF = -1; - - private final String key; - private final UserArgs args; - private XceiverClientManager xceiverClientManager; - private XceiverClient xceiverClient; - private List<ChunkInfo> chunks; - private int chunkOffset; - private List<ByteBuffer> buffers; - private int bufferOffset; - - /** - * Creates a new ChunkInputStream. - * - * @param key chunk key - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param chunks list of chunks to read - * @param args container protocol call args - */ - public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, - XceiverClient xceiverClient, List<ChunkInfo> chunks, UserArgs args) { - this.key = key; - this.args = args; - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.chunks = chunks; - this.chunkOffset = 0; - this.buffers = null; - this.bufferOffset = 0; - } - - @Override - public synchronized int read() - throws IOException { - checkOpen(); - int available = prepareRead(1); - return available == EOF ? EOF : buffers.get(bufferOffset).get(); - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - // According to the JavaDocs for InputStream, it is recommended that - // subclasses provide an override of bulk read if possible for performance - // reasons. In addition to performance, we need to do it for correctness - // reasons. The Ozone REST service uses PipedInputStream and - // PipedOutputStream to relay HTTP response data between a Jersey thread and - // a Netty thread. It turns out that PipedInputStream/PipedOutputStream - // have a subtle dependency (bug?) on the wrapped stream providing separate - // implementations of single-byte read and bulk read. Without this, get key - // responses might close the connection before writing all of the bytes - // advertised in the Content-Length. - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - checkOpen(); - int available = prepareRead(len); - if (available == EOF) { - return EOF; - } - buffers.get(bufferOffset).get(b, off, available); - return available; - } - - @Override - public synchronized void close() { - if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - } - } - - /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed - */ - private synchronized void checkOpen() throws IOException { - if (xceiverClient == null) { - throw new IOException("ChunkInputStream has been closed."); - } - } - - /** - * Prepares to read by advancing through chunks and buffers as needed until it - * finds data to return or encounters EOF. - * - * @param len desired length of data to read - * @return length of data available to read, possibly less than desired length - */ - private synchronized int prepareRead(int len) throws IOException { - for (;;) { - if (chunks == null || chunks.isEmpty()) { - // This must be an empty key. - return EOF; - } else if (buffers == null) { - // The first read triggers fetching the first chunk. - readChunkFromContainer(0); - } else if (!buffers.isEmpty() && - buffers.get(bufferOffset).hasRemaining()) { - // Data is available from the current buffer. - ByteBuffer bb = buffers.get(bufferOffset); - return len > bb.remaining() ? bb.remaining() : len; - } else if (!buffers.isEmpty() && - !buffers.get(bufferOffset).hasRemaining() && - bufferOffset < buffers.size() - 1) { - // There are additional buffers available. - ++bufferOffset; - } else if (chunkOffset < chunks.size() - 1) { - // There are additional chunks available. - readChunkFromContainer(chunkOffset + 1); - } else { - // All available input has been consumed. - return EOF; - } - } - } - - /** - * Attempts to read the chunk at the specified offset in the chunk list. If - * successful, then the data of the read chunk is saved so that its bytes can - * be returned from subsequent read calls. - * - * @param readChunkOffset offset in the chunk list of which chunk to read - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void readChunkFromContainer(int readChunkOffset) - throws IOException { - final ReadChunkResponseProto readChunkResponse; - try { - readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset), - key, args); - } catch (OzoneException e) { - throw new IOException("Unexpected OzoneException", e); - } - chunkOffset = readChunkOffset; - ByteString byteString = readChunkResponse.getData(); - buffers = byteString.asReadOnlyByteBufferList(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java deleted file mode 100644 index 1796a69..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.web.storage; - -import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE; -import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; -import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.UUID; - -import com.google.protobuf.ByteString; - -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; -import org.apache.hadoop.scm.XceiverClient; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.ozone.web.exceptions.OzoneException; -import org.apache.hadoop.ozone.web.handlers.UserArgs; -import org.apache.hadoop.ozone.web.response.KeyInfo; - -/** - * An {@link OutputStream} used by the REST service in combination with the - * {@link DistributedStorageHandler} to write the value of a key to a sequence - * of container chunks. Writes are buffered locally and periodically written to - * the container as a new chunk. In order to preserve the semantics that - * replacement of a pre-existing key is atomic, each instance of the stream has - * an internal unique identifier. This unique identifier and a monotonically - * increasing chunk index form a composite key that is used as the chunk name. - * After all data is written, a putKey call creates or updates the corresponding - * container key, and this call includes the full list of chunks that make up - * the key data. The list of chunks is updated all at once. Therefore, a - * concurrent reader never can see an intermediate state in which different - * chunks of data from different versions of the key data are interleaved. - * This class encapsulates all state management for buffering and writing - * through to the container. - */ -class ChunkOutputStream extends OutputStream { - - private final String containerKey; - private final KeyInfo key; - private final UserArgs args; - private final KeyData.Builder containerKeyData; - private XceiverClientManager xceiverClientManager; - private XceiverClient xceiverClient; - private ByteBuffer buffer; - private final String streamId; - private int chunkIndex; - - /** - * Creates a new ChunkOutputStream. - * - * @param containerKey container key - * @param key chunk key - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param args container protocol call args - */ - public ChunkOutputStream(String containerKey, KeyInfo key, - XceiverClientManager xceiverClientManager, XceiverClient xceiverClient, - UserArgs args) { - this.containerKey = containerKey; - this.key = key; - this.args = args; - this.containerKeyData = fromKeyToContainerKeyDataBuilder( - xceiverClient.getPipeline().getContainerName(), containerKey, key); - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(CHUNK_SIZE); - this.streamId = UUID.randomUUID().toString(); - this.chunkIndex = 0; - } - - @Override - public synchronized void write(int b) throws IOException { - checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put((byte)b); - if (buffer.position() == CHUNK_SIZE) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return; - } - checkOpen(); - while (len > 0) { - int writeLen = Math.min(CHUNK_SIZE - buffer.position(), len); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put(b, off, writeLen); - if (buffer.position() == CHUNK_SIZE) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - off += writeLen; - len -= writeLen; - } - } - - @Override - public synchronized void flush() throws IOException { - checkOpen(); - if (buffer.position() > 0) { - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - } - - @Override - public synchronized void close() throws IOException { - if (xceiverClientManager != null && xceiverClient != null && - buffer != null) { - try { - if (buffer.position() > 0) { - writeChunkToContainer(); - } - putKey(xceiverClient, containerKeyData.build(), args); - } catch (OzoneException e) { - throw new IOException("Unexpected OzoneException", e); - } finally { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - buffer = null; - } - } - - } - - /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed - */ - private synchronized void checkOpen() throws IOException { - if (xceiverClient == null) { - throw new IOException("ChunkOutputStream has been closed."); - } - } - - /** - * Attempts to flush buffered writes by writing a new chunk to the container. - * If successful, then clears the buffer to prepare to receive writes for a - * new chunk. - * - * @param rollbackPosition position to restore in buffer if write fails - * @param rollbackLimit limit to restore in buffer if write fails - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void flushBufferToChunk(int rollbackPosition, - int rollbackLimit) throws IOException { - boolean success = false; - try { - writeChunkToContainer(); - success = true; - } finally { - if (success) { - buffer.clear(); - } else { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - } - } - } - - /** - * Writes buffered data as a new chunk to the container and saves chunk - * information to be used later in putKey call. - * - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void writeChunkToContainer() throws IOException { - buffer.flip(); - ByteString data = ByteString.copyFrom(buffer); - ChunkInfo chunk = ChunkInfo - .newBuilder() - .setChunkName( - key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex) - .setOffset(0) - .setLen(data.size()) - .build(); - try { - writeChunk(xceiverClient, chunk, key.getKeyName(), data, args); - } catch (OzoneException e) { - throw new IOException("Unexpected OzoneException", e); - } - containerKeyData.addChunks(chunk); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java deleted file mode 100644 index c683a74..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.web.storage; - -import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; -import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; - -import java.io.IOException; - -import com.google.protobuf.ByteString; - -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto; -import org.apache.hadoop.scm.XceiverClient; -import org.apache.hadoop.ozone.web.exceptions.ErrorTable; -import org.apache.hadoop.ozone.web.exceptions.OzoneException; -import org.apache.hadoop.ozone.web.handlers.UserArgs; - -/** - * Implementation of all container protocol calls performed by - * {@link DistributedStorageHandler}. - */ -final class ContainerProtocolCalls { - - /** - * Calls the container protocol to get a container key. - * - * @param xceiverClient client to perform call - * @param containerKeyData key data to identify container - * @param args container protocol call args - * @returns container protocol get key response - * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed - */ - public static GetKeyResponseProto getKey(XceiverClient xceiverClient, - KeyData containerKeyData, UserArgs args) throws IOException, - OzoneException { - GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.GetKey) - .setTraceID(args.getRequestID()) - .setGetKey(readKeyRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); - return response.getGetKey(); - } - - /** - * Calls the container protocol to put a container key. - * - * @param xceiverClient client to perform call - * @param containerKeyData key data to identify container - * @param args container protocol call args - * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed - */ - public static void putKey(XceiverClient xceiverClient, - KeyData containerKeyData, UserArgs args) throws IOException, - OzoneException { - PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyData(containerKeyData); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.PutKey) - .setTraceID(args.getRequestID()) - .setPutKey(createKeyRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); - } - - /** - * Calls the container protocol to read a chunk. - * - * @param xceiverClient client to perform call - * @param chunk information about chunk to read - * @param key the key name - * @param args container protocol call args - * @returns container protocol read chunk response - * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed - */ - public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, - ChunkInfo chunk, String key, UserArgs args) - throws IOException, OzoneException { - ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) - .setChunkData(chunk); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.ReadChunk) - .setTraceID(args.getRequestID()) - .setReadChunk(readChunkRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); - return response.getReadChunk(); - } - - /** - * Calls the container protocol to write a chunk. - * - * @param xceiverClient client to perform call - * @param chunk information about chunk to write - * @param key the key name - * @param data the data of the chunk to write - * @param args container protocol call args - * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed - */ - public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, - String key, ByteString data, UserArgs args) - throws IOException, OzoneException { - WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto - .newBuilder() - .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) - .setKeyName(key) - .setChunkData(chunk) - .setData(data); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.WriteChunk) - .setTraceID(args.getRequestID()) - .setWriteChunk(writeChunkRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); - } - - /** - * Validates a response from a container protocol call. Any non-successful - * return code is mapped to a corresponding exception and thrown. - * - * @param response container protocol call response - * @param args container protocol call args - * @throws OzoneException if the container protocol call failed - */ - private static void validateContainerResponse( - ContainerCommandResponseProto response, UserArgs args) - throws OzoneException { - switch (response.getResult()) { - case SUCCESS: - break; - case MALFORMED_REQUEST: - throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST, - "badRequest", "Bad container request."), args); - case UNSUPPORTED_REQUEST: - throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, - "internalServerError", "Unsupported container request."), args); - case CONTAINER_INTERNAL_ERROR: - throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, - "internalServerError", "Container internal error."), args); - default: - throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, - "internalServerError", "Unrecognized container response."), args); - } - } - - /** - * There is no need to instantiate this class. - */ - private ContainerProtocolCalls() { - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 143d058..e8e5830 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.web.storage; -import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*; import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; import java.io.IOException; @@ -57,6 +57,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.ozone.web.response.VolumeInfo; import org.apache.hadoop.ozone.web.response.VolumeOwner; +import org.apache.hadoop.scm.storage.ChunkInputStream; +import org.apache.hadoop.scm.storage.ChunkOutputStream; import org.apache.hadoop.util.StringUtils; /** @@ -95,7 +97,7 @@ public final class DistributedStorageHandler implements StorageHandler { volume.setCreatedBy(args.getAdminName()); KeyData containerKeyData = fromVolumeToContainerKeyData( xceiverClient.getPipeline().getContainerName(), containerKey, volume); - putKey(xceiverClient, containerKeyData, args); + putKey(xceiverClient, containerKeyData, args.getRequestID()); } finally { xceiverClientManager.releaseClient(xceiverClient); } @@ -140,7 +142,7 @@ public final class DistributedStorageHandler implements StorageHandler { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, - args); + args.getRequestID()); return fromContainerKeyValueListToVolume( response.getKeyData().getMetadataList()); } finally { @@ -163,7 +165,7 @@ public final class DistributedStorageHandler implements StorageHandler { bucket.setStorageType(args.getStorageType()); KeyData containerKeyData = fromBucketToContainerKeyData( xceiverClient.getPipeline().getContainerName(), containerKey, bucket); - putKey(xceiverClient, containerKeyData, args); + putKey(xceiverClient, containerKeyData, args.getRequestID()); } finally { xceiverClientManager.releaseClient(xceiverClient); } @@ -218,7 +220,7 @@ public final class DistributedStorageHandler implements StorageHandler { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, - args); + args.getRequestID()); return fromContainerKeyValueListToBucket( response.getKeyData().getMetadataList()); } finally { @@ -235,8 +237,8 @@ public final class DistributedStorageHandler implements StorageHandler { key.setKeyName(args.getKeyName()); key.setCreatedOn(dateToString(new Date())); XceiverClient xceiverClient = acquireXceiverClient(containerKey); - return new ChunkOutputStream(containerKey, key, xceiverClientManager, - xceiverClient, args); + return new ChunkOutputStream(containerKey, key.getKeyName(), + xceiverClientManager, xceiverClient, args.getRequestID()); } @Override @@ -256,7 +258,7 @@ public final class DistributedStorageHandler implements StorageHandler { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, - args); + args.getRequestID()); long length = 0; List<ChunkInfo> chunks = response.getKeyData().getChunksList(); for (ChunkInfo chunk : chunks) { @@ -264,8 +266,8 @@ public final class DistributedStorageHandler implements StorageHandler { } success = true; return new LengthInputStream(new ChunkInputStream( - containerKey, xceiverClientManager, xceiverClient, chunks, args), - length); + containerKey, xceiverClientManager, xceiverClient, + chunks, args.getRequestID()), length); } finally { if (!success) { xceiverClientManager.releaseClient(xceiverClient); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org