http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java new file mode 100644 index 0000000..8e4ce92 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ChunkInputStream; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Maintaining a list of ChunkInputStream. Read based on offset. + */ +public class ChunkGroupInputStream extends InputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(ChunkGroupInputStream.class); + + private static final int EOF = -1; + + private final ArrayList<ChunkInputStreamEntry> streamEntries; + private int currentStreamIndex; + + public ChunkGroupInputStream() { + streamEntries = new ArrayList<>(); + currentStreamIndex = 0; + } + + @VisibleForTesting + public synchronized int getCurrentStreamIndex() { + return currentStreamIndex; + } + + @VisibleForTesting + public long getRemainingOfIndex(int index) { + return streamEntries.get(index).getRemaining(); + } + + /** + * Append another stream to the end of the list. + * + * @param stream the stream instance. + * @param length the max number of bytes that should be written to this + * stream. + */ + public synchronized void addStream(InputStream stream, long length) { + streamEntries.add(new ChunkInputStreamEntry(stream, length)); + } + + + @Override + public synchronized int read() throws IOException { + if (streamEntries.size() <= currentStreamIndex) { + throw new IndexOutOfBoundsException(); + } + ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex); + int data = entry.read(); + return data; + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + int totalReadLen = 0; + while (len > 0) { + if (streamEntries.size() <= currentStreamIndex) { + return totalReadLen == 0 ? EOF : totalReadLen; + } + ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex); + int readLen = Math.min(len, (int)current.getRemaining()); + int actualLen = current.read(b, off, readLen); + // this means the underlying stream has nothing at all, return + if (actualLen == EOF) { + return totalReadLen > 0? totalReadLen : EOF; + } + totalReadLen += actualLen; + // this means there is no more data to read beyond this point, return + if (actualLen != readLen) { + return totalReadLen; + } + off += readLen; + len -= readLen; + if (current.getRemaining() <= 0) { + currentStreamIndex += 1; + } + } + return totalReadLen; + } + + private static class ChunkInputStreamEntry extends InputStream { + + private final InputStream inputStream; + private final long length; + private long currentPosition; + + + ChunkInputStreamEntry(InputStream chunkInputStream, long length) { + this.inputStream = chunkInputStream; + this.length = length; + this.currentPosition = 0; + } + + synchronized long getRemaining() { + return length - currentPosition; + } + + @Override + public synchronized int read(byte[] b, int off, int len) + throws IOException { + int readLen = inputStream.read(b, off, len); + currentPosition += readLen; + return readLen; + } + + @Override + public synchronized int read() throws IOException { + int data = inputStream.read(); + currentPosition += 1; + return data; + } + + @Override + public synchronized void close() throws IOException { + inputStream.close(); + } + } + + public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo, + XceiverClientManager xceiverClientManager, + StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient, String requestId) + throws IOException { + int index = 0; + long length = 0; + String containerKey; + ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); + for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) { + // check index as sanity check + Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex()); + String containerName = ksmKeyLocationInfo.getContainerName(); + Pipeline pipeline = + storageContainerLocationClient.getContainer(containerName); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + boolean success = false; + containerKey = ksmKeyLocationInfo.getBlockID(); + try { + LOG.debug("get key accessing {} {}", + xceiverClient.getPipeline().getContainerName(), containerKey); + ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation + .containerKeyDataForRead( + xceiverClient.getPipeline().getContainerName(), containerKey); + ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls + .getKey(xceiverClient, containerKeyData, requestId); + List<ContainerProtos.ChunkInfo> chunks = + response.getKeyData().getChunksList(); + for (ContainerProtos.ChunkInfo chunk : chunks) { + length += chunk.getLen(); + } + success = true; + ChunkInputStream inputStream = new ChunkInputStream( + containerKey, xceiverClientManager, xceiverClient, + chunks, requestId); + groupInputStream.addStream(inputStream, + ksmKeyLocationInfo.getLength()); + } finally { + if (!success) { + xceiverClientManager.releaseClient(xceiverClient); + } + } + } + return new LengthInputStream(groupInputStream, length); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java new file mode 100644 index 0000000..2cc12f4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ChunkOutputStream; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +/** + * Maintaining a list of ChunkInputStream. Write based on offset. + * + * Note that this may write to multiple containers in one write call. In case + * that first container succeeded but later ones failed, the succeeded writes + * are not rolled back. + * + * TODO : currently not support multi-thread access. + */ +public class ChunkGroupOutputStream extends OutputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(ChunkGroupOutputStream.class); + + // array list's get(index) is O(1) + private final ArrayList<ChunkOutputStreamEntry> streamEntries; + private int currentStreamIndex; + private long totalSize; + private long byteOffset; + + //This has to be removed once HDFS-11888 is resolved. + //local cache which will have list of created container names. + private static Set<String> containersCreated = new HashSet<>(); + + public ChunkGroupOutputStream() { + this.streamEntries = new ArrayList<>(); + this.currentStreamIndex = 0; + this.totalSize = 0; + this.byteOffset = 0; + } + + @VisibleForTesting + public long getByteOffset() { + return byteOffset; + } + + /** + * Append another stream to the end of the list. Note that the streams are not + * actually created to this point, only enough meta data about the stream is + * stored. When something is to be actually written to the stream, the stream + * will be created (if not already). + * + * @param containerKey the key to store in the container + * @param key the ozone key + * @param xceiverClientManager xceiver manager instance + * @param xceiverClient xceiver manager instance + * @param requestID the request id + * @param chunkSize the chunk size for this key chunks + * @param length the total length of this key + */ + public synchronized void addStream(String containerKey, String key, + XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, + String requestID, int chunkSize, long length) { + streamEntries.add(new ChunkOutputStreamEntry(containerKey, key, + xceiverClientManager, xceiverClient, requestID, chunkSize, length)); + totalSize += length; + } + + @VisibleForTesting + public synchronized void addStream(OutputStream outputStream, long length) { + streamEntries.add(new ChunkOutputStreamEntry(outputStream, length)); + totalSize += length; + } + + @Override + public synchronized void write(int b) throws IOException { + if (streamEntries.size() <= currentStreamIndex) { + throw new IndexOutOfBoundsException(); + } + ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex); + entry.write(b); + if (entry.getRemaining() <= 0) { + currentStreamIndex += 1; + } + byteOffset += 1; + } + + /** + * Try to write the bytes sequence b[off:off+len) to streams. + * + * NOTE: Throws exception if the data could not fit into the remaining space. + * In which case nothing will be written. + * TODO:May need to revisit this behaviour. + * + * @param b byte data + * @param off starting offset + * @param len length to write + * @throws IOException + */ + @Override + public synchronized void write(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return; + } + if (streamEntries.size() <= currentStreamIndex) { + throw new IOException("Write out of stream range! stream index:" + + currentStreamIndex); + } + if (totalSize - byteOffset < len) { + throw new IOException("Can not write " + len + " bytes with only " + + (totalSize - byteOffset) + " byte space"); + } + while (len > 0) { + // in theory, this condition should never violate due the check above + // still do a sanity check. + Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); + ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); + int writeLen = Math.min(len, (int)current.getRemaining()); + current.write(b, off, writeLen); + if (current.getRemaining() <= 0) { + currentStreamIndex += 1; + } + len -= writeLen; + off += writeLen; + byteOffset += writeLen; + } + } + + @Override + public synchronized void flush() throws IOException { + for (int i = 0; i <= currentStreamIndex; i++) { + streamEntries.get(i).flush(); + } + } + + @Override + public synchronized void close() throws IOException { + for (ChunkOutputStreamEntry entry : streamEntries) { + entry.close(); + } + } + + private static class ChunkOutputStreamEntry extends OutputStream { + private OutputStream outputStream; + private final String containerKey; + private final String key; + private final XceiverClientManager xceiverClientManager; + private final XceiverClientSpi xceiverClient; + private final String requestId; + private final int chunkSize; + // total number of bytes that should be written to this stream + private final long length; + // the current position of this stream 0 <= currentPosition < length + private long currentPosition; + + ChunkOutputStreamEntry(String containerKey, String key, + XceiverClientManager xceiverClientManager, + XceiverClientSpi xceiverClient, String requestId, int chunkSize, + long length) { + this.outputStream = null; + this.containerKey = containerKey; + this.key = key; + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.requestId = requestId; + this.chunkSize = chunkSize; + + this.length = length; + this.currentPosition = 0; + } + + /** + * For testing purpose, taking a some random created stream instance. + * @param outputStream a existing writable output stream + * @param length the length of data to write to the stream + */ + ChunkOutputStreamEntry(OutputStream outputStream, long length) { + this.outputStream = outputStream; + this.containerKey = null; + this.key = null; + this.xceiverClientManager = null; + this.xceiverClient = null; + this.requestId = null; + this.chunkSize = -1; + + this.length = length; + this.currentPosition = 0; + } + + long getLength() { + return length; + } + + long getRemaining() { + return length - currentPosition; + } + + private synchronized void checkStream() { + if (this.outputStream == null) { + this.outputStream = new ChunkOutputStream(containerKey, + key, xceiverClientManager, xceiverClient, + requestId, chunkSize); + } + } + + @Override + public void write(int b) throws IOException { + checkStream(); + outputStream.write(b); + this.currentPosition += 1; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkStream(); + outputStream.write(b, off, len); + this.currentPosition += len; + } + + @Override + public void flush() throws IOException { + if (this.outputStream != null) { + this.outputStream.flush(); + } + } + + @Override + public void close() throws IOException { + if (this.outputStream != null) { + this.outputStream.close(); + } + } + } + + public static ChunkGroupOutputStream getFromKsmKeyInfo( + KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager, + StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient, + int chunkSize, String requestId) throws IOException { + // TODO: the following createContainer and key writes may fail, in which + // case we should revert the above allocateKey to KSM. + // check index as sanity check + int index = 0; + String containerKey; + ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream(); + for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) { + containerKey = subKeyInfo.getBlockID(); + + Preconditions.checkArgument(index++ == subKeyInfo.getIndex()); + String containerName = subKeyInfo.getContainerName(); + Pipeline pipeline = + storageContainerLocationClient.getContainer(containerName); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + // create container if needed + // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now + //The following change has to reverted once HDFS-11888 is fixed. + if(!containersCreated.contains(containerName)) { + synchronized (containerName.intern()) { + //checking again, there is a chance that some other thread has + // created it. + if (!containersCreated.contains(containerName)) { + LOG.debug("Need to create container {}.", containerName); + try { + ContainerProtocolCalls.createContainer(xceiverClient, requestId); + } catch (StorageContainerException ex) { + if (ex.getResult().equals(Result.CONTAINER_EXISTS)) { + //container already exist. + LOG.debug("Container {} already exists.", containerName); + } else { + LOG.error("Container creation failed for {}.", + containerName, ex); + throw ex; + } + } + containersCreated.add(containerName); + } + } + } + + groupOutputStream.addStream(containerKey, keyInfo.getKeyName(), + xceiverClientManager, xceiverClient, requestId, chunkSize, + subKeyInfo.getLength()); + } + return groupOutputStream; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java new file mode 100644 index 0000000..baf1887 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import java.io.FilterInputStream; +import java.io.InputStream; + +/** + * An input stream with length. + */ +public class LengthInputStream extends FilterInputStream { + + private final long length; + + /** + * Create an stream. + * @param in the underlying input stream. + * @param length the length of the stream. + */ + public LengthInputStream(InputStream in, long length) { + super(in); + this.length = length; + } + + /** @return the length. */ + public long getLength() { + return length; + } + + public InputStream getWrappedStream() { + return in; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java new file mode 100644 index 0000000..ca6f7aa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; + +/** + * This class contains methods that define the translation between the Ozone + * domain model and the storage container domain model. + */ +final class OzoneContainerTranslation { + + /** + * Creates key data intended for reading a container key. + * + * @param containerName container name + * @param containerKey container key + * @return KeyData intended for reading the container key + */ + public static KeyData containerKeyDataForRead(String containerName, + String containerKey) { + return KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .build(); + } + + /** + * There is no need to instantiate this class. + */ + private OzoneContainerTranslation() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java new file mode 100644 index 0000000..9551cdb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import org.apache.hadoop.scm.storage.ChunkInputStream; + +import java.io.IOException; +import java.io.InputStream; + +/** + * OzoneInputStream is used to read data from Ozone. + * It uses SCM's {@link ChunkInputStream} for reading the data. + */ +public class OzoneInputStream extends InputStream { + + private final ChunkGroupInputStream inputStream; + + /** + * Constructs OzoneInputStream with ChunkInputStream. + * + * @param inputStream + */ + public OzoneInputStream(ChunkGroupInputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public synchronized void close() throws IOException { + inputStream.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java new file mode 100644 index 0000000..5e2ad94 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.io; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * OzoneOutputStream is used to write data into Ozone. + * It uses SCM's {@link ChunkGroupOutputStream} for writing the data. + */ +public class OzoneOutputStream extends OutputStream { + + private final ChunkGroupOutputStream outputStream; + + /** + * Constructs OzoneOutputStream with ChunkGroupOutputStream. + * + * @param outputStream + */ + public OzoneOutputStream(ChunkGroupOutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputStream.write(b, off, len); + } + + @Override + public synchronized void flush() throws IOException { + outputStream.flush(); + } + + @Override + public synchronized void close() throws IOException { + //commitKey can be done here, if needed. + outputStream.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java new file mode 100644 index 0000000..493ece8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.io; + +/** + * This package contains Ozone I/O classes. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java new file mode 100644 index 0000000..7e2591a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client; + +/** + * This package contains Ozone Client classes. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java new file mode 100644 index 0000000..4955002 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java @@ -0,0 +1,510 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rest; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.OzoneConsts.Versioning; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientUtils; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.ksm.KSMConfigKeys; +import org.apache.hadoop.ozone.client.rest.headers.Header; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.HttpHeaders; +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Set; + +import static java.net.HttpURLConnection.HTTP_CREATED; +import static java.net.HttpURLConnection.HTTP_OK; + +/** + * Ozone REST Client Implementation, it connects Ozone Handler to execute + * client calls. This uses REST protocol for the communication with server. + */ +public class OzoneRestClient implements OzoneClient, Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(OzoneRestClient.class); + + private static final String SCHEMA = "http://"; + private static final int DEFAULT_OZONE_PORT = 50070; + + private final URI uri; + private final UserGroupInformation ugi; + private final OzoneAcl.OzoneACLRights userRights; + private final OzoneAcl.OzoneACLRights groupRights; + + + /** + * Creates OzoneRpcClient instance with new OzoneConfiguration. + * + * @throws IOException + */ + public OzoneRestClient() throws IOException, URISyntaxException { + this(new OzoneConfiguration()); + } + + /** + * Creates OzoneRpcClient instance with the given configuration. + * + * @param conf + * + * @throws IOException + */ + public OzoneRestClient(Configuration conf) + throws IOException { + Preconditions.checkNotNull(conf); + this.ugi = UserGroupInformation.getCurrentUser(); + this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS, + KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT); + this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS, + KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT); + + //TODO: get uri from property ozone.reset.servers + URIBuilder ozoneURI = null; + try { + ozoneURI = new URIBuilder(SCHEMA + "localhost"); + if (ozoneURI.getPort() == 0) { + ozoneURI.setPort(DEFAULT_OZONE_PORT); + } + uri = ozoneURI.build(); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + @Override + public void createVolume(String volumeName) + throws IOException { + createVolume(volumeName, ugi.getUserName()); + } + + @Override + public void createVolume(String volumeName, String owner) + throws IOException { + + createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, + (OzoneAcl[])null); + } + + @Override + public void createVolume(String volumeName, String owner, + OzoneAcl... acls) + throws IOException { + createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, acls); + } + + @Override + public void createVolume(String volumeName, String owner, + long quota) + throws IOException { + createVolume(volumeName, owner, quota, (OzoneAcl[])null); + } + + @Override + public void createVolume(String volumeName, String owner, + long quota, OzoneAcl... acls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(owner); + Preconditions.checkNotNull(quota); + Preconditions.checkState(quota >= 0); + + Set<OzoneAcl> aclSet = new HashSet<>(); + + if(acls != null) { + aclSet.addAll(Arrays.asList(acls)); + } + + LOG.info("Creating Volume: {}, with {} as owner and " + + "quota set to {} bytes.", volumeName, owner, quota); + HttpPost httpPost = null; + HttpEntity entity = null; + try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) { + URIBuilder builder = new URIBuilder(uri); + builder.setPath("/" + volumeName); + String quotaString = quota + Header.OZONE_QUOTA_BYTES; + builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quotaString); + httpPost = getHttpPost(owner, builder.build().toString()); + for (OzoneAcl acl : aclSet) { + httpPost.addHeader( + Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl.toString()); + } + + HttpResponse response = httpClient.execute(httpPost); + entity = response.getEntity(); + int errorCode = response.getStatusLine().getStatusCode(); + if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) { + return; + } + if (entity != null) { + throw new IOException(EntityUtils.toString(entity)); + } else { + throw new IOException("Unexpected null in http payload"); + } + } catch (URISyntaxException | IllegalArgumentException ex) { + throw new IOException(ex.getMessage()); + } finally { + EntityUtils.consume(entity); + OzoneClientUtils.releaseConnection(httpPost); + } + } + + @Override + public void setVolumeOwner(String volumeName, String owner) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(owner); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void setVolumeQuota(String volumeName, long quota) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(quota); + Preconditions.checkState(quota >= 0); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneVolume getVolumeDetails(String volumeName) + throws IOException { + Preconditions.checkNotNull(volumeName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public boolean checkVolumeAccess(String volumeName, OzoneAcl acl) + throws IOException { + Preconditions.checkNotNull(volumeName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void deleteVolume(String volumeName) + throws IOException { + Preconditions.checkNotNull(volumeName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public Iterator<OzoneVolume> listVolumes(String volumePrefix) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public Iterator<OzoneVolume> listVolumes(String volumePrefix, + String user) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void createBucket(String volumeName, String bucketName) + throws IOException { + createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, + StorageType.DEFAULT, (OzoneAcl[])null); + } + + @Override + public void createBucket(String volumeName, String bucketName, + Versioning versioning) + throws IOException { + createBucket(volumeName, bucketName, versioning, + StorageType.DEFAULT, (OzoneAcl[])null); + } + + @Override + public void createBucket(String volumeName, String bucketName, + StorageType storageType) + throws IOException { + createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, + storageType, (OzoneAcl[])null); + } + + @Override + public void createBucket(String volumeName, String bucketName, + OzoneAcl... acls) + throws IOException { + createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, + StorageType.DEFAULT, acls); + } + + @Override + public void createBucket(String volumeName, String bucketName, + Versioning versioning, StorageType storageType, + OzoneAcl... acls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(versioning); + Preconditions.checkNotNull(storageType); + + String owner = ugi.getUserName(); + final List<OzoneAcl> listOfAcls = new ArrayList<>(); + + //User ACL + OzoneAcl userAcl = + new OzoneAcl(OzoneAcl.OzoneACLType.USER, + owner, userRights); + listOfAcls.add(userAcl); + + //Group ACLs of the User + List<String> userGroups = Arrays.asList(UserGroupInformation + .createRemoteUser(owner).getGroupNames()); + userGroups.stream().forEach((group) -> listOfAcls.add( + new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights))); + + //ACLs passed as argument + if(acls != null) { + Arrays.stream(acls).forEach((acl) -> listOfAcls.add(acl)); + } + + LOG.info("Creating Bucket: {}/{}, with Versioning {} and " + + "Storage Type set to {}", volumeName, bucketName, versioning, + storageType); + throw new UnsupportedOperationException("Not yet implemented."); + } + + /** + * Converts OzoneConts.Versioning enum to boolean. + * + * @param version + * @return corresponding boolean value + */ + private boolean getBucketVersioningProtobuf( + Versioning version) { + if(version != null) { + switch(version) { + case ENABLED: + return true; + case NOT_DEFINED: + case DISABLED: + default: + return false; + } + } + return false; + } + + @Override + public void addBucketAcls(String volumeName, String bucketName, + List<OzoneAcl> addAcls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(addAcls); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void removeBucketAcls(String volumeName, String bucketName, + List<OzoneAcl> removeAcls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(removeAcls); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void setBucketVersioning(String volumeName, String bucketName, + Versioning versioning) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(versioning); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void setBucketStorageType(String volumeName, String bucketName, + StorageType storageType) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(storageType); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void deleteBucket(String volumeName, String bucketName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void checkBucketAccess(String volumeName, String bucketName) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneBucket getBucketDetails(String volumeName, + String bucketName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public Iterator<OzoneBucket> listBuckets(String volumeName, + String bucketPrefix) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneOutputStream createKey(String volumeName, String bucketName, + String keyName, long size) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneInputStream getKey(String volumeName, String bucketName, + String keyName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(keyName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void deleteKey(String volumeName, String bucketName, + String keyName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(keyName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public List<OzoneKey> listKeys(String volumeName, String bucketName, + String keyPrefix) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneKey getKeyDetails(String volumeName, String bucketName, + String keyName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(keyName); + throw new UnsupportedOperationException("Not yet implemented."); + } + + /** + * Converts Versioning to boolean. + * + * @param version + * @return corresponding boolean value + */ + private boolean getBucketVersioningFlag( + Versioning version) { + if(version != null) { + switch(version) { + case ENABLED: + return true; + case DISABLED: + case NOT_DEFINED: + default: + return false; + } + } + return false; + } + + /** + * Returns a standard HttpPost Object to use for ozone post requests. + * + * @param user - If the use is being made on behalf of user, that user + * @param uriString - UriString + * @return HttpPost + */ + public HttpPost getHttpPost(String user, String uriString) { + HttpPost httpPost = new HttpPost(uriString); + addOzoneHeaders(httpPost); + if (user != null) { + httpPost.addHeader(Header.OZONE_USER, user); + } + return httpPost; + } + + /** + * Add Ozone Headers. + * + * @param httpRequest - Http Request + */ + private void addOzoneHeaders(HttpRequestBase httpRequest) { + SimpleDateFormat format = + new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US); + + httpRequest.addHeader(Header.OZONE_VERSION_HEADER, + Header.OZONE_V1_VERSION_HEADER); + httpRequest.addHeader(HttpHeaders.DATE, + format.format(new Date(Time.monotonicNow()))); + httpRequest.addHeader(HttpHeaders.AUTHORIZATION, + Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " + + ugi.getUserName()); + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java new file mode 100644 index 0000000..5221a0e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.rest.headers; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * OZONE specific HTTP headers. + */ +@InterfaceAudience.Private +public final class Header { + public static final String OZONE_QUOTA_BYTES = "BYTES"; + public static final String OZONE_QUOTA_MB = "MB"; + public static final String OZONE_QUOTA_GB = "GB"; + public static final String OZONE_QUOTA_TB = "TB"; + public static final String OZONE_QUOTA_REMOVE = "remove"; + public static final String OZONE_QUOTA_UNDEFINED = "undefined"; + public static final String OZONE_EMPTY_STRING=""; + public static final String OZONE_DEFAULT_LIST_SIZE = "1000"; + + public static final String OZONE_USER = "x-ozone-user"; + public static final String OZONE_SIMPLE_AUTHENTICATION_SCHEME = "OZONE"; + public static final String OZONE_VERSION_HEADER = "x-ozone-version"; + public static final String OZONE_V1_VERSION_HEADER ="v1"; + + public static final String OZONE_LIST_QUERY_SERVICE = "service"; + public static final String OZONE_LIST_QUERY_VOLUME = "volume"; + public static final String OZONE_LIST_QUERY_BUCKET = "bucket"; + public static final String OZONE_LIST_QUERY_KEY = "key"; + + public static final String OZONE_REQUEST_ID = "x-ozone-request-id"; + public static final String OZONE_SERVER_NAME = "x-ozone-server-name"; + + public static final String OZONE_STORAGE_TYPE = "x-ozone-storage-type"; + + public static final String OZONE_BUCKET_VERSIONING = + "x-ozone-bucket-versioning"; + + public static final String OZONE_ACLS = "x-ozone-acls"; + public static final String OZONE_ACL_ADD = "ADD"; + public static final String OZONE_ACL_REMOVE = "REMOVE"; + + public static final String OZONE_LIST_QUERY_TAG ="info"; + public static final String OZONE_QUOTA_QUERY_TAG ="quota"; + public static final String CONTENT_MD5 = "Content-MD5"; + public static final String OZONE_LIST_QUERY_PREFIX="prefix"; + public static final String OZONE_LIST_QUERY_MAXKEYS="max-keys"; + public static final String OZONE_LIST_QUERY_PREVKEY="prev-key"; + public static final String OZONE_LIST_QUERY_ROOTSCAN="root-scan"; + + private Header() { + // Never constructed. + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java new file mode 100644 index 0000000..54157f0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Ozone HTTP header definitions. + */ +@InterfaceAudience.Private +package org.apache.hadoop.ozone.client.rest.headers; + +import org.apache.hadoop.classification.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java new file mode 100644 index 0000000..ebcc104 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.rest; + +/** + * This package contains Ozone rest client library classes. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java new file mode 100644 index 0000000..daa9639 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java @@ -0,0 +1,578 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.client.io.LengthInputStream; +import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; +import org.apache.hadoop.ozone.ksm.protocolPB + .KeySpaceManagerProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.ksm.protocolPB + .KeySpaceManagerProtocolPB; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientUtils; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.ksm.KSMConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts.Versioning; +import org.apache.hadoop.ozone.protocolPB.KSMPBHelper; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.scm.ScmConfigKeys; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode + * to execute client calls. This uses RPC protocol for communication + * with the servers. + */ +public class OzoneRpcClient implements OzoneClient, Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(OzoneRpcClient.class); + + private final StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private final KeySpaceManagerProtocolClientSideTranslatorPB + keySpaceManagerClient; + private final XceiverClientManager xceiverClientManager; + private final int chunkSize; + + + private final UserGroupInformation ugi; + private final OzoneAcl.OzoneACLRights userRights; + private final OzoneAcl.OzoneACLRights groupRights; + + /** + * Creates OzoneRpcClient instance with new OzoneConfiguration. + * + * @throws IOException + */ + public OzoneRpcClient() throws IOException { + this(new OzoneConfiguration()); + } + + /** + * Creates OzoneRpcClient instance with the given configuration. + * + * @param conf + * + * @throws IOException + */ + public OzoneRpcClient(Configuration conf) throws IOException { + Preconditions.checkNotNull(conf); + this.ugi = UserGroupInformation.getCurrentUser(); + this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS, + KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT); + this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS, + KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT); + + long scmVersion = + RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); + InetSocketAddress scmAddress = + OzoneClientUtils.getScmAddressForClients(conf); + RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + this.storageContainerLocationClient = + new StorageContainerLocationProtocolClientSideTranslatorPB( + RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion, + scmAddress, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf))); + + long ksmVersion = + RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class); + InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf); + RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class, + ProtobufRpcEngine.class); + this.keySpaceManagerClient = + new KeySpaceManagerProtocolClientSideTranslatorPB( + RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion, + ksmAddress, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf))); + + this.xceiverClientManager = new XceiverClientManager(conf); + + int configuredChunkSize = conf.getInt( + ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, + ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT); + if(configuredChunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) { + LOG.warn("The chunk size ({}) is not allowed to be more than" + + " the maximum size ({})," + + " resetting to the maximum size.", + configuredChunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); + chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; + } else { + chunkSize = configuredChunkSize; + } + } + + @Override + public void createVolume(String volumeName) + throws IOException { + createVolume(volumeName, ugi.getUserName()); + } + + @Override + public void createVolume(String volumeName, String owner) + throws IOException { + + createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, + (OzoneAcl[])null); + } + + @Override + public void createVolume(String volumeName, String owner, + OzoneAcl... acls) + throws IOException { + createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, acls); + } + + @Override + public void createVolume(String volumeName, String owner, + long quota) + throws IOException { + createVolume(volumeName, owner, quota, (OzoneAcl[])null); + } + + @Override + public void createVolume(String volumeName, String owner, + long quota, OzoneAcl... acls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(owner); + Preconditions.checkNotNull(quota); + Preconditions.checkState(quota >= 0); + OzoneAcl userAcl = + new OzoneAcl(OzoneAcl.OzoneACLType.USER, + owner, userRights); + KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder(); + builder.setAdminName(ugi.getUserName()) + .setOwnerName(owner) + .setVolume(volumeName) + .setQuotaInBytes(quota) + .addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl)); + + List<OzoneAcl> listOfAcls = new ArrayList<>(); + + //Group ACLs of the User + List<String> userGroups = Arrays.asList(UserGroupInformation + .createRemoteUser(owner).getGroupNames()); + userGroups.stream().forEach((group) -> listOfAcls.add( + new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights))); + + //ACLs passed as argument + if(acls != null) { + listOfAcls.addAll(Arrays.asList(acls)); + } + + //Remove duplicates and set + for (OzoneAcl ozoneAcl : + listOfAcls.stream().distinct().collect(Collectors.toList())) { + builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(ozoneAcl)); + } + + LOG.info("Creating Volume: {}, with {} as owner and quota set to {} bytes.", + volumeName, owner, quota); + keySpaceManagerClient.createVolume(builder.build()); + } + + @Override + public void setVolumeOwner(String volumeName, String owner) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(owner); + keySpaceManagerClient.setOwner(volumeName, owner); + } + + @Override + public void setVolumeQuota(String volumeName, long quota) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(quota); + Preconditions.checkState(quota >= 0); + keySpaceManagerClient.setQuota(volumeName, quota); + } + + @Override + public OzoneVolume getVolumeDetails(String volumeName) + throws IOException { + Preconditions.checkNotNull(volumeName); + KsmVolumeArgs volumeArgs = + keySpaceManagerClient.getVolumeInfo(volumeName); + return new OzoneVolume(volumeArgs); + } + + @Override + public boolean checkVolumeAccess(String volumeName, OzoneAcl acl) + throws IOException { + Preconditions.checkNotNull(volumeName); + return keySpaceManagerClient.checkVolumeAccess(volumeName, + KSMPBHelper.convertOzoneAcl(acl)); + } + + @Override + public void deleteVolume(String volumeName) + throws IOException { + Preconditions.checkNotNull(volumeName); + keySpaceManagerClient.deleteVolume(volumeName); + } + + @Override + public Iterator<OzoneVolume> listVolumes(String volumePrefix) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public Iterator<OzoneVolume> listVolumes(String volumePrefix, + String user) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public void createBucket(String volumeName, String bucketName) + throws IOException { + createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, + StorageType.DEFAULT, (OzoneAcl[])null); + } + + @Override + public void createBucket(String volumeName, String bucketName, + Versioning versioning) + throws IOException { + createBucket(volumeName, bucketName, versioning, + StorageType.DEFAULT, (OzoneAcl[])null); + } + + @Override + public void createBucket(String volumeName, String bucketName, + StorageType storageType) + throws IOException { + createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, + storageType, (OzoneAcl[])null); + } + + @Override + public void createBucket(String volumeName, String bucketName, + OzoneAcl... acls) + throws IOException { + createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, + StorageType.DEFAULT, acls); + } + + @Override + public void createBucket(String volumeName, String bucketName, + Versioning versioning, StorageType storageType, + OzoneAcl... acls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(versioning); + Preconditions.checkNotNull(storageType); + + KsmBucketInfo.Builder builder = KsmBucketInfo.newBuilder(); + builder.setVolumeName(volumeName) + .setBucketName(bucketName) + .setStorageType(storageType) + .setIsVersionEnabled(getBucketVersioningProtobuf( + versioning)); + + String owner = ugi.getUserName(); + final List<OzoneAcl> listOfAcls = new ArrayList<>(); + + //User ACL + OzoneAcl userAcl = + new OzoneAcl(OzoneAcl.OzoneACLType.USER, + owner, userRights); + listOfAcls.add(userAcl); + + //Group ACLs of the User + List<String> userGroups = Arrays.asList(UserGroupInformation + .createRemoteUser(owner).getGroupNames()); + userGroups.stream().forEach((group) -> listOfAcls.add( + new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights))); + + //ACLs passed as argument + if(acls != null) { + Arrays.stream(acls).forEach((acl) -> listOfAcls.add(acl)); + } + + //Remove duplicates and set + builder.setAcls(listOfAcls.stream().distinct() + .collect(Collectors.toList())); + LOG.info("Creating Bucket: {}/{}, with Versioning {} and " + + "Storage Type set to {}", volumeName, bucketName, versioning, + storageType); + keySpaceManagerClient.createBucket(builder.build()); + } + + /** + * Converts OzoneConts.Versioning enum to boolean. + * + * @param version + * @return corresponding boolean value + */ + private boolean getBucketVersioningProtobuf( + Versioning version) { + if(version != null) { + switch(version) { + case ENABLED: + return true; + case NOT_DEFINED: + case DISABLED: + default: + return false; + } + } + return false; + } + + @Override + public void addBucketAcls(String volumeName, String bucketName, + List<OzoneAcl> addAcls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(addAcls); + KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); + builder.setVolumeName(volumeName) + .setBucketName(bucketName) + .setAddAcls(addAcls); + keySpaceManagerClient.setBucketProperty(builder.build()); + } + + @Override + public void removeBucketAcls(String volumeName, String bucketName, + List<OzoneAcl> removeAcls) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(removeAcls); + KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); + builder.setVolumeName(volumeName) + .setBucketName(bucketName) + .setRemoveAcls(removeAcls); + keySpaceManagerClient.setBucketProperty(builder.build()); + } + + @Override + public void setBucketVersioning(String volumeName, String bucketName, + Versioning versioning) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(versioning); + KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); + builder.setVolumeName(volumeName) + .setBucketName(bucketName) + .setIsVersionEnabled(getBucketVersioningFlag( + versioning)); + keySpaceManagerClient.setBucketProperty(builder.build()); + } + + @Override + public void setBucketStorageType(String volumeName, String bucketName, + StorageType storageType) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(storageType); + KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); + builder.setVolumeName(volumeName) + .setBucketName(bucketName) + .setStorageType(storageType); + keySpaceManagerClient.setBucketProperty(builder.build()); + } + + @Override + public void deleteBucket(String volumeName, String bucketName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + keySpaceManagerClient.deleteBucket(volumeName, bucketName); + } + + @Override + public void checkBucketAccess(String volumeName, String bucketName) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneBucket getBucketDetails(String volumeName, + String bucketName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + KsmBucketInfo bucketInfo = + keySpaceManagerClient.getBucketInfo(volumeName, bucketName); + return new OzoneBucket(bucketInfo); + } + + @Override + public Iterator<OzoneBucket> listBuckets(String volumeName, + String bucketPrefix) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneOutputStream createKey(String volumeName, String bucketName, + String keyName, long size) + throws IOException { + String requestId = UUID.randomUUID().toString(); + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(size) + .build(); + + KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs); + ChunkGroupOutputStream groupOutputStream = + ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager, + storageContainerLocationClient, chunkSize, requestId); + return new OzoneOutputStream(groupOutputStream); + } + + @Override + public OzoneInputStream getKey(String volumeName, String bucketName, + String keyName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(keyName); + String requestId = UUID.randomUUID().toString(); + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); + KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs); + LengthInputStream lengthInputStream = + ChunkGroupInputStream.getFromKsmKeyInfo( + keyInfo, xceiverClientManager, storageContainerLocationClient, + requestId); + return new OzoneInputStream( + (ChunkGroupInputStream)lengthInputStream.getWrappedStream()); + } + + @Override + public void deleteKey(String volumeName, String bucketName, + String keyName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(keyName); + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); + keySpaceManagerClient.deleteKey(keyArgs); + } + + @Override + public List<OzoneKey> listKeys(String volumeName, String bucketName, + String keyPrefix) + throws IOException { + throw new UnsupportedOperationException("Not yet implemented."); + } + + @Override + public OzoneKey getKeyDetails(String volumeName, String bucketName, + String keyName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(keyName); + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); + KsmKeyInfo keyInfo = + keySpaceManagerClient.lookupKey(keyArgs); + return new OzoneKey(keyInfo); + } + + /** + * Converts Versioning to boolean. + * + * @param version + * @return corresponding boolean value + */ + private boolean getBucketVersioningFlag( + Versioning version) { + if(version != null) { + switch(version) { + case ENABLED: + return true; + case DISABLED: + case NOT_DEFINED: + default: + return false; + } + } + return false; + } + + @Override + public void close() throws IOException { + IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient); + IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient); + IOUtils.cleanupWithLogger(LOG, xceiverClientManager); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java new file mode 100644 index 0000000..0fcc3fc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +/** + * This package contains Ozone rpc client library classes. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java new file mode 100644 index 0000000..e69300c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.ksm; + +import org.apache.hadoop.ozone.OzoneAcl; +/** + * KSM Constants. + */ +public final class KSMConfigKeys { + /** + * Never constructed. + */ + private KSMConfigKeys() { + } + + + public static final String OZONE_KSM_HANDLER_COUNT_KEY = + "ozone.ksm.handler.count.key"; + public static final int OZONE_KSM_HANDLER_COUNT_DEFAULT = 20; + + public static final String OZONE_KSM_ADDRESS_KEY = + "ozone.ksm.address"; + public static final String OZONE_KSM_BIND_HOST_DEFAULT = + "0.0.0.0"; + public static final int OZONE_KSM_PORT_DEFAULT = 9862; + + public static final String OZONE_KSM_HTTP_ENABLED_KEY = + "ozone.ksm.http.enabled"; + public static final String OZONE_KSM_HTTP_BIND_HOST_KEY = + "ozone.ksm.http-bind-host"; + public static final String OZONE_KSM_HTTPS_BIND_HOST_KEY = + "ozone.ksm.https-bind-host"; + public static final String OZONE_KSM_HTTP_ADDRESS_KEY = + "ozone.ksm.http-address"; + public static final String OZONE_KSM_HTTPS_ADDRESS_KEY = + "ozone.ksm.https-address"; + public static final String OZONE_KSM_KEYTAB_FILE = + "ozone.ksm.keytab.file"; + public static final String OZONE_KSM_HTTP_BIND_HOST_DEFAULT = "0.0.0.0"; + public static final int OZONE_KSM_HTTP_BIND_PORT_DEFAULT = 9874; + public static final int OZONE_KSM_HTTPS_BIND_PORT_DEFAULT = 9875; + + // LevelDB cache file uses an off-heap cache in LevelDB of 128 MB. + public static final String OZONE_KSM_DB_CACHE_SIZE_MB = + "ozone.ksm.leveldb.cache.size.mb"; + public static final int OZONE_KSM_DB_CACHE_SIZE_DEFAULT = 128; + + public static final String OZONE_KSM_USER_MAX_VOLUME = + "ozone.ksm.user.max.volume"; + public static final int OZONE_KSM_USER_MAX_VOLUME_DEFAULT = 1024; + + // KSM Default user/group permissions + public static final String OZONE_KSM_USER_RIGHTS = + "ozone.ksm.user.rights"; + public static final OzoneAcl.OzoneACLRights OZONE_KSM_USER_RIGHTS_DEFAULT = + OzoneAcl.OzoneACLRights.READ_WRITE; + + public static final String OZONE_KSM_GROUP_RIGHTS = + "ozone.ksm.group.rights"; + public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT = + OzoneAcl.OzoneACLRights.READ_WRITE; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org