http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java deleted file mode 100644 index 2b10578..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.client.io; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.ratis.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Maintaining a list of ChunkInputStream. Read based on offset. - */ -public class ChunkGroupInputStream extends InputStream implements Seekable { - - private static final Logger LOG = - LoggerFactory.getLogger(ChunkGroupInputStream.class); - - private static final int EOF = -1; - - private final ArrayList<ChunkInputStreamEntry> streamEntries; - // streamOffset[i] stores the offset at which chunkInputStream i stores - // data in the key - private long[] streamOffset = null; - private int currentStreamIndex; - private long length = 0; - private boolean closed = false; - private String key; - - public ChunkGroupInputStream() { - streamEntries = new ArrayList<>(); - currentStreamIndex = 0; - } - - @VisibleForTesting - public synchronized int getCurrentStreamIndex() { - return currentStreamIndex; - } - - @VisibleForTesting - public long getRemainingOfIndex(int index) throws IOException { - return streamEntries.get(index).getRemaining(); - } - - /** - * Append another stream to the end of the list. - * - * @param stream the stream instance. - * @param streamLength the max number of bytes that should be written to this - * stream. - */ - public synchronized void addStream(ChunkInputStream stream, - long streamLength) { - streamEntries.add(new ChunkInputStreamEntry(stream, streamLength)); - } - - - @Override - public synchronized int read() throws IOException { - byte[] buf = new byte[1]; - if (read(buf, 0, 1) == EOF) { - return EOF; - } - return Byte.toUnsignedInt(buf[0]); - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - checkNotClosed(); - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - int totalReadLen = 0; - while (len > 0) { - if (streamEntries.size() <= currentStreamIndex) { - return totalReadLen == 0 ? EOF : totalReadLen; - } - ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex); - int numBytesToRead = Math.min(len, (int)current.getRemaining()); - int numBytesRead = current.read(b, off, numBytesToRead); - if (numBytesRead != numBytesToRead) { - // This implies that there is either data loss or corruption in the - // chunk entries. Even EOF in the current stream would be covered in - // this case. - throw new IOException(String.format( - "Inconsistent read for blockID=%s length=%d numBytesRead=%d", - current.chunkInputStream.getBlockID(), current.length, - numBytesRead)); - } - totalReadLen += numBytesRead; - off += numBytesRead; - len -= numBytesRead; - if (current.getRemaining() <= 0) { - currentStreamIndex += 1; - } - } - return totalReadLen; - } - - @Override - public void seek(long pos) throws IOException { - checkNotClosed(); - if (pos < 0 || pos >= length) { - if (pos == 0) { - // It is possible for length and pos to be zero in which case - // seek should return instead of throwing exception - return; - } - throw new EOFException( - "EOF encountered at pos: " + pos + " for key: " + key); - } - Preconditions.assertTrue(currentStreamIndex >= 0); - if (currentStreamIndex >= streamEntries.size()) { - currentStreamIndex = Arrays.binarySearch(streamOffset, pos); - } else if (pos < streamOffset[currentStreamIndex]) { - currentStreamIndex = - Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos); - } else if (pos >= streamOffset[currentStreamIndex] + streamEntries - .get(currentStreamIndex).length) { - currentStreamIndex = Arrays - .binarySearch(streamOffset, currentStreamIndex + 1, - streamEntries.size(), pos); - } - if (currentStreamIndex < 0) { - // Binary search returns -insertionPoint - 1 if element is not present - // in the array. insertionPoint is the point at which element would be - // inserted in the sorted array. We need to adjust the currentStreamIndex - // accordingly so that currentStreamIndex = insertionPoint - 1 - currentStreamIndex = -currentStreamIndex - 2; - } - // seek to the proper offset in the ChunkInputStream - streamEntries.get(currentStreamIndex) - .seek(pos - streamOffset[currentStreamIndex]); - } - - @Override - public long getPos() throws IOException { - return length == 0 ? 0 : - streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex) - .getPos(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - - @Override - public int available() throws IOException { - checkNotClosed(); - long remaining = length - getPos(); - return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; - } - - @Override - public void close() throws IOException { - closed = true; - for (int i = 0; i < streamEntries.size(); i++) { - streamEntries.get(i).close(); - } - } - - /** - * Encapsulates ChunkInputStream. - */ - public static class ChunkInputStreamEntry extends InputStream - implements Seekable { - - private final ChunkInputStream chunkInputStream; - private final long length; - - public ChunkInputStreamEntry(ChunkInputStream chunkInputStream, - long length) { - this.chunkInputStream = chunkInputStream; - this.length = length; - } - - synchronized long getRemaining() throws IOException { - return length - getPos(); - } - - @Override - public synchronized int read(byte[] b, int off, int len) - throws IOException { - int readLen = chunkInputStream.read(b, off, len); - return readLen; - } - - @Override - public synchronized int read() throws IOException { - int data = chunkInputStream.read(); - return data; - } - - @Override - public synchronized void close() throws IOException { - chunkInputStream.close(); - } - - @Override - public void seek(long pos) throws IOException { - chunkInputStream.seek(pos); - } - - @Override - public long getPos() throws IOException { - return chunkInputStream.getPos(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - } - - public static LengthInputStream getFromOmKeyInfo( - OmKeyInfo keyInfo, - XceiverClientManager xceiverClientManager, - StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient, - String requestId) throws IOException { - long length = 0; - long containerKey; - ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); - groupInputStream.key = keyInfo.getKeyName(); - List<OmKeyLocationInfo> keyLocationInfos = - keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); - groupInputStream.streamOffset = new long[keyLocationInfos.size()]; - for (int i = 0; i < keyLocationInfos.size(); i++) { - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i); - BlockID blockID = omKeyLocationInfo.getBlockID(); - long containerID = blockID.getContainerID(); - ContainerWithPipeline containerWithPipeline = - storageContainerLocationClient.getContainerWithPipeline(containerID); - XceiverClientSpi xceiverClient = xceiverClientManager - .acquireClient(containerWithPipeline.getPipeline(), containerID); - boolean success = false; - containerKey = omKeyLocationInfo.getLocalID(); - try { - LOG.debug("get key accessing {} {}", - containerID, containerKey); - groupInputStream.streamOffset[i] = length; - ContainerProtos.DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); - ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, requestId); - List<ContainerProtos.ChunkInfo> chunks = - response.getBlockData().getChunksList(); - for (ContainerProtos.ChunkInfo chunk : chunks) { - length += chunk.getLen(); - } - success = true; - ChunkInputStream inputStream = new ChunkInputStream( - omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient, - chunks, requestId); - groupInputStream.addStream(inputStream, - omKeyLocationInfo.getLength()); - } finally { - if (!success) { - xceiverClientManager.releaseClient(xceiverClient); - } - } - } - groupInputStream.length = length; - return new LengthInputStream(groupInputStream, length); - } - - /** - * Verify that the input stream is open. Non blocking; this gives - * the last state of the volatile {@link #closed} field. - * @throws IOException if the connection is closed. - */ - private void checkNotClosed() throws IOException { - if (closed) { - throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java deleted file mode 100644 index 3742a9a..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ /dev/null @@ -1,733 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.client.io; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.om.helpers.OpenKeySession; -import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.hdds.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.ListIterator; - -/** - * Maintaining a list of ChunkInputStream. Write based on offset. - * - * Note that this may write to multiple containers in one write call. In case - * that first container succeeded but later ones failed, the succeeded writes - * are not rolled back. - * - * TODO : currently not support multi-thread access. - */ -public class ChunkGroupOutputStream extends OutputStream { - - public static final Logger LOG = - LoggerFactory.getLogger(ChunkGroupOutputStream.class); - - // array list's get(index) is O(1) - private final ArrayList<ChunkOutputStreamEntry> streamEntries; - private int currentStreamIndex; - private long byteOffset; - private final OzoneManagerProtocolClientSideTranslatorPB omClient; - private final - StorageContainerLocationProtocolClientSideTranslatorPB scmClient; - private final OmKeyArgs keyArgs; - private final long openID; - private final XceiverClientManager xceiverClientManager; - private final int chunkSize; - private final String requestID; - private boolean closed; - private final RetryPolicy retryPolicy; - /** - * A constructor for testing purpose only. - */ - @VisibleForTesting - public ChunkGroupOutputStream() { - streamEntries = new ArrayList<>(); - omClient = null; - scmClient = null; - keyArgs = null; - openID = -1; - xceiverClientManager = null; - chunkSize = 0; - requestID = null; - closed = false; - retryPolicy = null; - } - - /** - * For testing purpose only. Not building output stream from blocks, but - * taking from externally. - * - * @param outputStream - * @param length - */ - @VisibleForTesting - public void addStream(OutputStream outputStream, long length) { - streamEntries.add(new ChunkOutputStreamEntry(outputStream, length)); - } - - @VisibleForTesting - public List<ChunkOutputStreamEntry> getStreamEntries() { - return streamEntries; - } - - public List<OmKeyLocationInfo> getLocationInfoList() { - List<OmKeyLocationInfo> locationInfoList = new ArrayList<>(); - for (ChunkOutputStreamEntry streamEntry : streamEntries) { - OmKeyLocationInfo info = - new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) - .setShouldCreateContainer(false) - .setLength(streamEntry.currentPosition).setOffset(0).build(); - locationInfoList.add(info); - } - return locationInfoList; - } - - public ChunkGroupOutputStream( - OpenKeySession handler, XceiverClientManager xceiverClientManager, - StorageContainerLocationProtocolClientSideTranslatorPB scmClient, - OzoneManagerProtocolClientSideTranslatorPB omClient, - int chunkSize, String requestId, ReplicationFactor factor, - ReplicationType type, RetryPolicy retryPolicy) throws IOException { - this.streamEntries = new ArrayList<>(); - this.currentStreamIndex = 0; - this.byteOffset = 0; - this.omClient = omClient; - this.scmClient = scmClient; - OmKeyInfo info = handler.getKeyInfo(); - this.keyArgs = new OmKeyArgs.Builder() - .setVolumeName(info.getVolumeName()) - .setBucketName(info.getBucketName()) - .setKeyName(info.getKeyName()) - .setType(type) - .setFactor(factor) - .setDataSize(info.getDataSize()).build(); - this.openID = handler.getId(); - this.xceiverClientManager = xceiverClientManager; - this.chunkSize = chunkSize; - this.requestID = requestId; - this.retryPolicy = retryPolicy; - LOG.debug("Expecting open key with one block, but got" + - info.getKeyLocationVersions().size()); - } - - /** - * When a key is opened, it is possible that there are some blocks already - * allocated to it for this open session. In this case, to make use of these - * blocks, we need to add these blocks to stream entries. But, a key's version - * also includes blocks from previous versions, we need to avoid adding these - * old blocks to stream entries, because these old blocks should not be picked - * for write. To do this, the following method checks that, only those - * blocks created in this particular open version are added to stream entries. - * - * @param version the set of blocks that are pre-allocated. - * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException - */ - public void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { - // server may return any number of blocks, (0 to any) - // only the blocks allocated in this open session (block createVersion - // equals to open session version) - for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) { - if (subKeyInfo.getCreateVersion() == openVersion) { - checkKeyLocationInfo(subKeyInfo); - } - } - } - - private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo) - throws IOException { - ContainerWithPipeline containerWithPipeline = scmClient - .getContainerWithPipeline(subKeyInfo.getContainerID()); - ContainerInfo container = containerWithPipeline.getContainerInfo(); - - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(containerWithPipeline.getPipeline(), - container.getContainerID()); - // create container if needed - if (subKeyInfo.getShouldCreateContainer()) { - try { - ContainerProtocolCalls.createContainer(xceiverClient, - container.getContainerID(), requestID); - scmClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - subKeyInfo.getContainerID(), - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.complete); - } catch (StorageContainerException ex) { - if (ex.getResult().equals(Result.CONTAINER_EXISTS)) { - //container already exist, this should never happen - LOG.debug("Container {} already exists.", - container.getContainerID()); - } else { - LOG.error("Container creation failed for {}.", - container.getContainerID(), ex); - throw ex; - } - } - } - streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), - keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength())); - } - - @VisibleForTesting - public long getByteOffset() { - return byteOffset; - } - - - @Override - public void write(int b) throws IOException { - byte[] buf = new byte[1]; - buf[0] = (byte) b; - write(buf, 0, 1); - } - - /** - * Try to write the bytes sequence b[off:off+len) to streams. - * - * NOTE: Throws exception if the data could not fit into the remaining space. - * In which case nothing will be written. - * TODO:May need to revisit this behaviour. - * - * @param b byte data - * @param off starting offset - * @param len length to write - * @throws IOException - */ - @Override - public void write(byte[] b, int off, int len) - throws IOException { - checkNotClosed(); - handleWrite(b, off, len); - } - - private void handleWrite(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return; - } - int succeededAllocates = 0; - while (len > 0) { - if (streamEntries.size() <= currentStreamIndex) { - Preconditions.checkNotNull(omClient); - // allocate a new block, if a exception happens, log an error and - // throw exception to the caller directly, and the write fails. - try { - allocateNewBlock(currentStreamIndex); - succeededAllocates += 1; - } catch (IOException ioe) { - LOG.error("Try to allocate more blocks for write failed, already " + - "allocated " + succeededAllocates + " blocks for this write."); - throw ioe; - } - } - // in theory, this condition should never violate due the check above - // still do a sanity check. - Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); - int writeLen = Math.min(len, (int) current.getRemaining()); - try { - current.write(b, off, writeLen); - } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe)) { - handleCloseContainerException(current, currentStreamIndex); - continue; - } else { - throw ioe; - } - } - if (current.getRemaining() <= 0) { - // since the current block is already written close the stream. - handleFlushOrClose(true); - currentStreamIndex += 1; - } - len -= writeLen; - off += writeLen; - byteOffset += writeLen; - } - } - - private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry) - throws IOException { - long blockLength; - ContainerProtos.GetCommittedBlockLengthResponseProto responseProto; - RetryPolicy.RetryAction action; - int numRetries = 0; - while (true) { - try { - responseProto = ContainerProtocolCalls - .getCommittedBlockLength(streamEntry.xceiverClient, - streamEntry.blockID, requestID); - blockLength = responseProto.getBlockLength(); - return blockLength; - } catch (StorageContainerException sce) { - try { - action = retryPolicy.shouldRetry(sce, numRetries, 0, true); - } catch (Exception e) { - throw e instanceof IOException ? (IOException) e : new IOException(e); - } - if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - if (action.reason != null) { - LOG.error( - "GetCommittedBlockLength request failed. " + action.reason, - sce); - } - throw sce; - } - - // Throw the exception if the thread is interrupted - if (Thread.currentThread().isInterrupted()) { - LOG.warn("Interrupted while trying for connection"); - throw sce; - } - Preconditions.checkArgument( - action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); - try { - Thread.sleep(action.delayMillis); - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException( - "Interrupted: action=" + action + ", retry policy=" + retryPolicy) - .initCause(e); - } - numRetries++; - LOG.trace("Retrying GetCommittedBlockLength request. Already tried " - + numRetries + " time(s); retry policy is " + retryPolicy); - continue; - } - } - } - - /** - * Discards the subsequent pre allocated blocks and removes the streamEntries - * from the streamEntries list for the container which is closed. - * @param containerID id of the closed container - */ - private void discardPreallocatedBlocks(long containerID) { - // currentStreamIndex < streamEntries.size() signifies that, there are still - // pre allocated blocks available. - if (currentStreamIndex < streamEntries.size()) { - ListIterator<ChunkOutputStreamEntry> streamEntryIterator = - streamEntries.listIterator(currentStreamIndex); - while (streamEntryIterator.hasNext()) { - if (streamEntryIterator.next().blockID.getContainerID() - == containerID) { - streamEntryIterator.remove(); - } - } - } - } - - /** - * It might be possible that the blocks pre allocated might never get written - * while the stream gets closed normally. In such cases, it would be a good - * idea to trim down the locationInfoList by removing the unused blocks if any - * so as only the used block info gets updated on OzoneManager during close. - */ - private void removeEmptyBlocks() { - if (currentStreamIndex < streamEntries.size()) { - ListIterator<ChunkOutputStreamEntry> streamEntryIterator = - streamEntries.listIterator(currentStreamIndex); - while (streamEntryIterator.hasNext()) { - if (streamEntryIterator.next().currentPosition == 0) { - streamEntryIterator.remove(); - } - } - } - } - /** - * It performs following actions : - * a. Updates the committed length at datanode for the current stream in - * datanode. - * b. Reads the data from the underlying buffer and writes it the next stream. - * - * @param streamEntry StreamEntry - * @param streamIndex Index of the entry - * @throws IOException Throws IOexception if Write fails - */ - private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry, - int streamIndex) throws IOException { - long committedLength = 0; - ByteBuffer buffer = streamEntry.getBuffer(); - if (buffer == null) { - // the buffer here will be null only when closeContainerException is - // hit while calling putKey during close on chunkOutputStream. - // Since closeContainer auto commit pending keys, no need to do - // anything here. - return; - } - - // In case where not a single chunk of data has been written to the Datanode - // yet. This block does not yet exist on the datanode but cached on the - // outputStream buffer. No need to call GetCommittedBlockLength here - // for this block associated with the stream here. - if (streamEntry.currentPosition >= chunkSize - || streamEntry.currentPosition != buffer.position()) { - committedLength = getCommittedBlockLength(streamEntry); - // update the length of the current stream - streamEntry.currentPosition = committedLength; - } - - if (buffer.position() > 0) { - // If the data is still cached in the underlying stream, we need to - // allocate new block and write this data in the datanode. The cached - // data in the buffer does not exceed chunkSize. - Preconditions.checkState(buffer.position() < chunkSize); - currentStreamIndex += 1; - // readjust the byteOffset value to the length actually been written. - byteOffset -= buffer.position(); - handleWrite(buffer.array(), 0, buffer.position()); - } - - // just clean up the current stream. Since the container is already closed, - // it will be auto committed. No need to call close again here. - streamEntry.cleanup(); - // This case will arise when while writing the first chunk itself fails. - // In such case, the current block associated with the stream has no data - // written. Remove it from the current stream list. - if (committedLength == 0) { - streamEntries.remove(streamIndex); - Preconditions.checkArgument(currentStreamIndex != 0); - currentStreamIndex -= 1; - } - // discard subsequent pre allocated blocks from the streamEntries list - // from the closed container - discardPreallocatedBlocks(streamEntry.blockID.getContainerID()); - } - - private boolean checkIfContainerIsClosed(IOException ioe) { - return Optional.of(ioe.getCause()) - .filter(e -> e instanceof StorageContainerException) - .map(e -> (StorageContainerException) e) - .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO) - .isPresent(); - } - - private long getKeyLength() { - return streamEntries.parallelStream().mapToLong(e -> e.currentPosition) - .sum(); - } - - /** - * Contact OM to get a new block. Set the new block with the index (e.g. - * first block has index = 0, second has index = 1 etc.) - * - * The returned block is made to new ChunkOutputStreamEntry to write. - * - * @param index the index of the block. - * @throws IOException - */ - private void allocateNewBlock(int index) throws IOException { - OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID); - checkKeyLocationInfo(subKeyInfo); - } - - @Override - public void flush() throws IOException { - checkNotClosed(); - handleFlushOrClose(false); - } - - /** - * Close or Flush the latest outputStream. - * @param close Flag which decides whether to call close or flush on the - * outputStream. - * @throws IOException In case, flush or close fails with exception. - */ - private void handleFlushOrClose(boolean close) throws IOException { - if (streamEntries.size() == 0) { - return; - } - int size = streamEntries.size(); - int streamIndex = - currentStreamIndex >= size ? size - 1 : currentStreamIndex; - ChunkOutputStreamEntry entry = streamEntries.get(streamIndex); - if (entry != null) { - try { - if (close) { - entry.close(); - } else { - entry.flush(); - } - } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe)) { - // This call will allocate a new streamEntry and write the Data. - // Close needs to be retried on the newly allocated streamEntry as - // as well. - handleCloseContainerException(entry, streamIndex); - handleFlushOrClose(close); - } else { - throw ioe; - } - } - } - } - - /** - * Commit the key to OM, this will add the blocks as the new key blocks. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - if (closed) { - return; - } - closed = true; - handleFlushOrClose(true); - if (keyArgs != null) { - // in test, this could be null - removeEmptyBlocks(); - Preconditions.checkState(byteOffset == getKeyLength()); - keyArgs.setDataSize(byteOffset); - keyArgs.setLocationInfoList(getLocationInfoList()); - omClient.commitKey(keyArgs, openID); - } else { - LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); - } - } - - /** - * Builder class of ChunkGroupOutputStream. - */ - public static class Builder { - private OpenKeySession openHandler; - private XceiverClientManager xceiverManager; - private StorageContainerLocationProtocolClientSideTranslatorPB scmClient; - private OzoneManagerProtocolClientSideTranslatorPB omClient; - private int chunkSize; - private String requestID; - private ReplicationType type; - private ReplicationFactor factor; - private RetryPolicy retryPolicy; - - public Builder setHandler(OpenKeySession handler) { - this.openHandler = handler; - return this; - } - - public Builder setXceiverClientManager(XceiverClientManager manager) { - this.xceiverManager = manager; - return this; - } - - public Builder setScmClient( - StorageContainerLocationProtocolClientSideTranslatorPB client) { - this.scmClient = client; - return this; - } - - public Builder setOmClient( - OzoneManagerProtocolClientSideTranslatorPB client) { - this.omClient = client; - return this; - } - - public Builder setChunkSize(int size) { - this.chunkSize = size; - return this; - } - - public Builder setRequestID(String id) { - this.requestID = id; - return this; - } - - public Builder setType(ReplicationType replicationType) { - this.type = replicationType; - return this; - } - - public Builder setFactor(ReplicationFactor replicationFactor) { - this.factor = replicationFactor; - return this; - } - - public ChunkGroupOutputStream build() throws IOException { - return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - omClient, chunkSize, requestID, factor, type, retryPolicy); - } - - public Builder setRetryPolicy(RetryPolicy rPolicy) { - this.retryPolicy = rPolicy; - return this; - } - - } - - private static class ChunkOutputStreamEntry extends OutputStream { - private OutputStream outputStream; - private final BlockID blockID; - private final String key; - private final XceiverClientManager xceiverClientManager; - private final XceiverClientSpi xceiverClient; - private final String requestId; - private final int chunkSize; - // total number of bytes that should be written to this stream - private final long length; - // the current position of this stream 0 <= currentPosition < length - private long currentPosition; - - ChunkOutputStreamEntry(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, String requestId, int chunkSize, - long length) { - this.outputStream = null; - this.blockID = blockID; - this.key = key; - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.requestId = requestId; - this.chunkSize = chunkSize; - - this.length = length; - this.currentPosition = 0; - } - - /** - * For testing purpose, taking a some random created stream instance. - * @param outputStream a existing writable output stream - * @param length the length of data to write to the stream - */ - ChunkOutputStreamEntry(OutputStream outputStream, long length) { - this.outputStream = outputStream; - this.blockID = null; - this.key = null; - this.xceiverClientManager = null; - this.xceiverClient = null; - this.requestId = null; - this.chunkSize = -1; - - this.length = length; - this.currentPosition = 0; - } - - long getLength() { - return length; - } - - long getRemaining() { - return length - currentPosition; - } - - private void checkStream() { - if (this.outputStream == null) { - this.outputStream = new ChunkOutputStream(blockID, - key, xceiverClientManager, xceiverClient, - requestId, chunkSize); - } - } - - @Override - public void write(int b) throws IOException { - checkStream(); - outputStream.write(b); - this.currentPosition += 1; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - checkStream(); - outputStream.write(b, off, len); - this.currentPosition += len; - } - - @Override - public void flush() throws IOException { - if (this.outputStream != null) { - this.outputStream.flush(); - } - } - - @Override - public void close() throws IOException { - if (this.outputStream != null) { - this.outputStream.close(); - } - } - - ByteBuffer getBuffer() throws IOException { - if (this.outputStream instanceof ChunkOutputStream) { - ChunkOutputStream out = (ChunkOutputStream) this.outputStream; - return out.getBuffer(); - } - throw new IOException("Invalid Output Stream for Key: " + key); - } - - public void cleanup() { - checkStream(); - if (this.outputStream instanceof ChunkOutputStream) { - ChunkOutputStream out = (ChunkOutputStream) this.outputStream; - out.cleanup(); - } - } - - } - - /** - * Verify that the output stream is open. Non blocking; this gives - * the last state of the volatile {@link #closed} field. - * @throws IOException if the connection is closed. - */ - private void checkNotClosed() throws IOException { - if (closed) { - throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs - .getKeyName()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java deleted file mode 100644 index e1f65e6..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.client.io; - -import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; - -import java.io.IOException; -import java.io.InputStream; - -/** - * OzoneInputStream is used to read data from Ozone. - * It uses SCM's {@link ChunkInputStream} for reading the data. - */ -public class OzoneInputStream extends InputStream { - - private final InputStream inputStream; - - /** - * Constructs OzoneInputStream with ChunkInputStream. - * - * @param inputStream - */ - public OzoneInputStream(InputStream inputStream) { - this.inputStream = inputStream; - } - - @Override - public int read() throws IOException { - return inputStream.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inputStream.read(b, off, len); - } - - @Override - public synchronized void close() throws IOException { - inputStream.close(); - } - - @Override - public int available() throws IOException { - return inputStream.available(); - } - - public InputStream getInputStream() { - return inputStream; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java deleted file mode 100644 index 5369220..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.client.io; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * OzoneOutputStream is used to write data into Ozone. - * It uses SCM's {@link ChunkGroupOutputStream} for writing the data. - */ -public class OzoneOutputStream extends OutputStream { - - private final OutputStream outputStream; - - /** - * Constructs OzoneOutputStream with ChunkGroupOutputStream. - * - * @param outputStream - */ - public OzoneOutputStream(OutputStream outputStream) { - this.outputStream = outputStream; - } - - @Override - public void write(int b) throws IOException { - outputStream.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputStream.write(b, off, len); - } - - @Override - public synchronized void flush() throws IOException { - outputStream.flush(); - } - - @Override - public synchronized void close() throws IOException { - //commitKey can be done here, if needed. - outputStream.close(); - } - - public OutputStream getOutputStream() { - return outputStream; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java deleted file mode 100644 index 493ece8..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.io; - -/** - * This package contains Ozone I/O classes. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java deleted file mode 100644 index 7e2591a..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -/** - * This package contains Ozone Client classes. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java deleted file mode 100644 index 008b69d..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ /dev/null @@ -1,329 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.protocol; - -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.ozone.OzoneAcl; -import org.apache.hadoop.ozone.client.*; -import org.apache.hadoop.hdds.client.OzoneQuota; -import org.apache.hadoop.hdds.client.ReplicationFactor; -import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.ozone.client.io.OzoneInputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; - -import java.io.IOException; -import java.util.List; - -/** - * An implementer of this interface is capable of connecting to Ozone Cluster - * and perform client operations. The protocol used for communication is - * determined by the implementation class specified by - * property <code>ozone.client.protocol</code>. The build-in implementation - * includes: {@link org.apache.hadoop.ozone.client.rpc.RpcClient} for RPC and - * {@link org.apache.hadoop.ozone.client.rest.RestClient} for REST. - */ -public interface ClientProtocol { - - /** - * Creates a new Volume. - * @param volumeName Name of the Volume - * @throws IOException - */ - void createVolume(String volumeName) - throws IOException; - - /** - * Creates a new Volume with properties set in VolumeArgs. - * @param volumeName Name of the Volume - * @param args Properties to be set for the Volume - * @throws IOException - */ - void createVolume(String volumeName, VolumeArgs args) - throws IOException; - - /** - * Sets the owner of volume. - * @param volumeName Name of the Volume - * @param owner to be set for the Volume - * @throws IOException - */ - void setVolumeOwner(String volumeName, String owner) throws IOException; - - /** - * Set Volume Quota. - * @param volumeName Name of the Volume - * @param quota Quota to be set for the Volume - * @throws IOException - */ - void setVolumeQuota(String volumeName, OzoneQuota quota) - throws IOException; - - /** - * Returns {@link OzoneVolume}. - * @param volumeName Name of the Volume - * @return {@link OzoneVolume} - * @throws IOException - * */ - OzoneVolume getVolumeDetails(String volumeName) - throws IOException; - - /** - * Checks if a Volume exists and the user with a role specified has access - * to the Volume. - * @param volumeName Name of the Volume - * @param acl requested acls which needs to be checked for access - * @return Boolean - True if the user with a role can access the volume. - * This is possible for owners of the volume and admin users - * @throws IOException - */ - boolean checkVolumeAccess(String volumeName, OzoneAcl acl) - throws IOException; - - /** - * Deletes an empty Volume. - * @param volumeName Name of the Volume - * @throws IOException - */ - void deleteVolume(String volumeName) throws IOException; - - /** - * Lists all volumes in the cluster that matches the volumePrefix, - * size of the returned list depends on maxListResult. If volume prefix - * is null, returns all the volumes. The caller has to make multiple calls - * to read all volumes. - * - * @param volumePrefix Volume prefix to match - * @param prevVolume Starting point of the list, this volume is excluded - * @param maxListResult Max number of volumes to return. - * @return {@code List<OzoneVolume>} - * @throws IOException - */ - List<OzoneVolume> listVolumes(String volumePrefix, String prevVolume, - int maxListResult) - throws IOException; - - /** - * Lists all volumes in the cluster that are owned by the specified - * user and matches the volumePrefix, size of the returned list depends on - * maxListResult. If the user is null, return volumes owned by current user. - * If volume prefix is null, returns all the volumes. The caller has to make - * multiple calls to read all volumes. - * - * @param user User Name - * @param volumePrefix Volume prefix to match - * @param prevVolume Starting point of the list, this volume is excluded - * @param maxListResult Max number of volumes to return. - * @return {@code List<OzoneVolume>} - * @throws IOException - */ - List<OzoneVolume> listVolumes(String user, String volumePrefix, - String prevVolume, int maxListResult) - throws IOException; - - /** - * Creates a new Bucket in the Volume. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @throws IOException - */ - void createBucket(String volumeName, String bucketName) - throws IOException; - - /** - * Creates a new Bucket in the Volume, with properties set in BucketArgs. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param bucketArgs Bucket Arguments - * @throws IOException - */ - void createBucket(String volumeName, String bucketName, - BucketArgs bucketArgs) - throws IOException; - - /** - * Adds ACLs to the Bucket. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param addAcls ACLs to be added - * @throws IOException - */ - void addBucketAcls(String volumeName, String bucketName, - List<OzoneAcl> addAcls) - throws IOException; - - /** - * Removes ACLs from a Bucket. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param removeAcls ACLs to be removed - * @throws IOException - */ - void removeBucketAcls(String volumeName, String bucketName, - List<OzoneAcl> removeAcls) - throws IOException; - - - /** - * Enables or disables Bucket Versioning. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param versioning True to enable Versioning, False to disable. - * @throws IOException - */ - void setBucketVersioning(String volumeName, String bucketName, - Boolean versioning) - throws IOException; - - /** - * Sets the Storage Class of a Bucket. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param storageType StorageType to be set - * @throws IOException - */ - void setBucketStorageType(String volumeName, String bucketName, - StorageType storageType) - throws IOException; - - /** - * Deletes a bucket if it is empty. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @throws IOException - */ - void deleteBucket(String volumeName, String bucketName) - throws IOException; - - /** - * True if the bucket exists and user has read access - * to the bucket else throws Exception. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @throws IOException - */ - void checkBucketAccess(String volumeName, String bucketName) - throws IOException; - - /** - * Returns {@link OzoneBucket}. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @return {@link OzoneBucket} - * @throws IOException - */ - OzoneBucket getBucketDetails(String volumeName, String bucketName) - throws IOException; - - /** - * Returns the List of Buckets in the Volume that matches the bucketPrefix, - * size of the returned list depends on maxListResult. The caller has to make - * multiple calls to read all volumes. - * @param volumeName Name of the Volume - * @param bucketPrefix Bucket prefix to match - * @param prevBucket Starting point of the list, this bucket is excluded - * @param maxListResult Max number of buckets to return. - * @return {@code List<OzoneBucket>} - * @throws IOException - */ - List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix, - String prevBucket, int maxListResult) - throws IOException; - - /** - * Writes a key in an existing bucket. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param keyName Name of the Key - * @param size Size of the data - * @return {@link OzoneOutputStream} - * - */ - OzoneOutputStream createKey(String volumeName, String bucketName, - String keyName, long size, ReplicationType type, - ReplicationFactor factor) - throws IOException; - - /** - * Reads a key from an existing bucket. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param keyName Name of the Key - * @return {@link OzoneInputStream} - * @throws IOException - */ - OzoneInputStream getKey(String volumeName, String bucketName, String keyName) - throws IOException; - - - /** - * Deletes an existing key. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param keyName Name of the Key - * @throws IOException - */ - void deleteKey(String volumeName, String bucketName, String keyName) - throws IOException; - - /** - * Renames an existing key within a bucket. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param fromKeyName Name of the Key to be renamed - * @param toKeyName New name to be used for the Key - * @throws IOException - */ - void renameKey(String volumeName, String bucketName, String fromKeyName, - String toKeyName) throws IOException; - - /** - * Returns list of Keys in {Volume/Bucket} that matches the keyPrefix, - * size of the returned list depends on maxListResult. The caller has - * to make multiple calls to read all keys. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param keyPrefix Bucket prefix to match - * @param prevKey Starting point of the list, this key is excluded - * @param maxListResult Max number of buckets to return. - * @return {@code List<OzoneKey>} - * @throws IOException - */ - List<OzoneKey> listKeys(String volumeName, String bucketName, - String keyPrefix, String prevKey, int maxListResult) - throws IOException; - - - /** - * Get OzoneKey. - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param keyName Key name - * @return {@link OzoneKey} - * @throws IOException - */ - OzoneKeyDetails getKeyDetails(String volumeName, String bucketName, - String keyName) - throws IOException; - - /** - * Close and release the resources. - */ - void close() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java deleted file mode 100644 index f4890a1..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.protocol; - -/** - * This package contains Ozone client protocol library classes. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java deleted file mode 100644 index abdc2fb..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.rest; - -import org.apache.hadoop.ozone.om.helpers.ServiceInfo; - -import java.util.List; -import java.util.Random; - -/** - * Default selector randomly picks one of the REST Server from the list. - */ -public class DefaultRestServerSelector implements RestServerSelector { - - @Override - public ServiceInfo getRestServer(List<ServiceInfo> restServices) { - return restServices.get( - new Random().nextInt(restServices.size())); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java deleted file mode 100644 index 6c479f7..0000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneExceptionMapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.rest; - - -import javax.ws.rs.core.Response; -import javax.ws.rs.ext.ExceptionMapper; - -import org.slf4j.MDC; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class the represents various errors returned by the - * Object Layer. - */ -public class OzoneExceptionMapper implements ExceptionMapper<OzoneException> { - private static final Logger LOG = - LoggerFactory.getLogger(OzoneExceptionMapper.class); - - @Override - public Response toResponse(OzoneException exception) { - LOG.debug("Returning exception. ex: {}", exception.toJsonString()); - MDC.clear(); - return Response.status((int)exception.getHttpCode()) - .entity(exception.toJsonString()).build(); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
