http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java deleted file mode 100644 index 09d4054..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ /dev/null @@ -1,576 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.keyvalue; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; -import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerLifeCycleState; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; -import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; -import org.apache.hadoop.ozone.container.keyvalue.helpers - .KeyValueContainerLocationUtil; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; -import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.utils.MetadataStore; - -import com.google.common.base.Preconditions; -import org.apache.commons.io.FileUtils; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_ALREADY_EXISTS; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_FILES_CREATE_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.DISK_OUT_OF_SPACE; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.ERROR_IN_COMPACT_DB; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.INVALID_CONTAINER_STATE; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.UNSUPPORTED_REQUEST; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class to perform KeyValue Container operations. - */ -public class KeyValueContainer implements Container<KeyValueContainerData> { - - private static final Logger LOG = LoggerFactory.getLogger(Container.class); - - // Use a non-fair RW lock for better throughput, we may revisit this decision - // if this causes fairness issues. - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - private final KeyValueContainerData containerData; - private Configuration config; - - public KeyValueContainer(KeyValueContainerData containerData, Configuration - ozoneConfig) { - Preconditions.checkNotNull(containerData, "KeyValueContainerData cannot " + - "be null"); - Preconditions.checkNotNull(ozoneConfig, "Ozone configuration cannot " + - "be null"); - this.config = ozoneConfig; - this.containerData = containerData; - } - - @Override - public void create(VolumeSet volumeSet, VolumeChoosingPolicy - volumeChoosingPolicy, String scmId) throws StorageContainerException { - Preconditions.checkNotNull(volumeChoosingPolicy, "VolumeChoosingPolicy " + - "cannot be null"); - Preconditions.checkNotNull(volumeSet, "VolumeSet cannot be null"); - Preconditions.checkNotNull(scmId, "scmId cannot be null"); - - File containerMetaDataPath = null; - //acquiring volumeset lock and container lock - volumeSet.acquireLock(); - long maxSize = containerData.getMaxSize(); - try { - HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet - .getVolumesList(), maxSize); - String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); - - long containerID = containerData.getContainerID(); - - containerMetaDataPath = KeyValueContainerLocationUtil - .getContainerMetaDataPath(hddsVolumeDir, scmId, containerID); - containerData.setMetadataPath(containerMetaDataPath.getPath()); - - File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath( - hddsVolumeDir, scmId, containerID); - - // Check if it is new Container. - ContainerUtils.verifyIsNewContainer(containerMetaDataPath); - - //Create Metadata path chunks path and metadata db - File dbFile = getContainerDBFile(); - KeyValueContainerUtil.createContainerMetaData(containerMetaDataPath, - chunksPath, dbFile, config); - - String impl = config.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, - OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT); - - //Set containerData for the KeyValueContainer. - containerData.setChunksPath(chunksPath.getPath()); - containerData.setContainerDBType(impl); - containerData.setDbFile(dbFile); - containerData.setVolume(containerVolume); - - // Create .container file - File containerFile = getContainerFile(); - createContainerFile(containerFile); - - } catch (StorageContainerException ex) { - if (containerMetaDataPath != null && containerMetaDataPath.getParentFile() - .exists()) { - FileUtil.fullyDelete(containerMetaDataPath.getParentFile()); - } - throw ex; - } catch (DiskOutOfSpaceException ex) { - throw new StorageContainerException("Container creation failed, due to " + - "disk out of space", ex, DISK_OUT_OF_SPACE); - } catch (FileAlreadyExistsException ex) { - throw new StorageContainerException("Container creation failed because " + - "ContainerFile already exists", ex, CONTAINER_ALREADY_EXISTS); - } catch (IOException ex) { - if (containerMetaDataPath != null && containerMetaDataPath.getParentFile() - .exists()) { - FileUtil.fullyDelete(containerMetaDataPath.getParentFile()); - } - throw new StorageContainerException("Container creation failed.", ex, - CONTAINER_INTERNAL_ERROR); - } finally { - volumeSet.releaseLock(); - } - } - - /** - * Set all of the path realted container data fields based on the name - * conventions. - * - * @param scmId - * @param containerVolume - * @param hddsVolumeDir - */ - public void populatePathFields(String scmId, - HddsVolume containerVolume, String hddsVolumeDir) { - - long containerId = containerData.getContainerID(); - - File containerMetaDataPath = KeyValueContainerLocationUtil - .getContainerMetaDataPath(hddsVolumeDir, scmId, containerId); - - File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath( - hddsVolumeDir, scmId, containerId); - File dbFile = KeyValueContainerLocationUtil.getContainerDBFile( - containerMetaDataPath, containerId); - - //Set containerData for the KeyValueContainer. - containerData.setMetadataPath(containerMetaDataPath.getPath()); - containerData.setChunksPath(chunksPath.getPath()); - containerData.setDbFile(dbFile); - containerData.setVolume(containerVolume); - } - - /** - * Writes to .container file. - * - * @param containerFile container file name - * @param isCreate True if creating a new file. False is updating an - * existing container file. - * @throws StorageContainerException - */ - private void writeToContainerFile(File containerFile, boolean isCreate) - throws StorageContainerException { - File tempContainerFile = null; - long containerId = containerData.getContainerID(); - try { - tempContainerFile = createTempFile(containerFile); - ContainerDataYaml.createContainerFile( - ContainerType.KeyValueContainer, containerData, tempContainerFile); - - // NativeIO.renameTo is an atomic function. But it might fail if the - // container file already exists. Hence, we handle the two cases - // separately. - if (isCreate) { - NativeIO.renameTo(tempContainerFile, containerFile); - } else { - Files.move(tempContainerFile.toPath(), containerFile.toPath(), - StandardCopyOption.REPLACE_EXISTING); - } - - } catch (IOException ex) { - throw new StorageContainerException("Error while creating/ updating " + - ".container file. ContainerID: " + containerId, ex, - CONTAINER_FILES_CREATE_ERROR); - } finally { - if (tempContainerFile != null && tempContainerFile.exists()) { - if (!tempContainerFile.delete()) { - LOG.warn("Unable to delete container temporary file: {}.", - tempContainerFile.getAbsolutePath()); - } - } - } - } - - private void createContainerFile(File containerFile) - throws StorageContainerException { - writeToContainerFile(containerFile, true); - } - - private void updateContainerFile(File containerFile) - throws StorageContainerException { - writeToContainerFile(containerFile, false); - } - - - @Override - public void delete(boolean forceDelete) - throws StorageContainerException { - long containerId = containerData.getContainerID(); - try { - KeyValueContainerUtil.removeContainer(containerData, config, forceDelete); - } catch (StorageContainerException ex) { - throw ex; - } catch (IOException ex) { - // TODO : An I/O error during delete can leave partial artifacts on the - // disk. We will need the cleaner thread to cleanup this information. - String errMsg = String.format("Failed to cleanup container. ID: %d", - containerId); - LOG.error(errMsg, ex); - throw new StorageContainerException(errMsg, ex, CONTAINER_INTERNAL_ERROR); - } - } - - @Override - public void close() throws StorageContainerException { - - //TODO: writing .container file and compaction can be done - // asynchronously, otherwise rpc call for this will take a lot of time to - // complete this action - try { - writeLock(); - - containerData.closeContainer(); - File containerFile = getContainerFile(); - // update the new container data to .container File - updateContainerFile(containerFile); - - } catch (StorageContainerException ex) { - // Failed to update .container file. Reset the state to CLOSING - containerData.setState(ContainerLifeCycleState.CLOSING); - throw ex; - } finally { - writeUnlock(); - } - - // It is ok if this operation takes a bit of time. - // Close container is not expected to be instantaneous. - try { - MetadataStore db = BlockUtils.getDB(containerData, config); - db.compactDB(); - } catch (StorageContainerException ex) { - throw ex; - } catch (IOException ex) { - LOG.error("Error in DB compaction while closing container", ex); - throw new StorageContainerException(ex, ERROR_IN_COMPACT_DB); - } - } - - @Override - public KeyValueContainerData getContainerData() { - return containerData; - } - - @Override - public ContainerLifeCycleState getContainerState() { - return containerData.getState(); - } - - @Override - public ContainerType getContainerType() { - return ContainerType.KeyValueContainer; - } - - @Override - public void update(Map<String, String> metadata, boolean forceUpdate) - throws StorageContainerException { - - // TODO: Now, when writing the updated data to .container file, we are - // holding lock and writing data to disk. We can have async implementation - // to flush the update container data to disk. - long containerId = containerData.getContainerID(); - if(!containerData.isValid()) { - LOG.debug("Invalid container data. ContainerID: {}", containerId); - throw new StorageContainerException("Invalid container data. " + - "ContainerID: " + containerId, INVALID_CONTAINER_STATE); - } - if (!forceUpdate && !containerData.isOpen()) { - throw new StorageContainerException( - "Updating a closed container without force option is not allowed. " + - "ContainerID: " + containerId, UNSUPPORTED_REQUEST); - } - - Map<String, String> oldMetadata = containerData.getMetadata(); - try { - writeLock(); - for (Map.Entry<String, String> entry : metadata.entrySet()) { - containerData.addMetadata(entry.getKey(), entry.getValue()); - } - - File containerFile = getContainerFile(); - // update the new container data to .container File - updateContainerFile(containerFile); - } catch (StorageContainerException ex) { - containerData.setMetadata(oldMetadata); - throw ex; - } finally { - writeUnlock(); - } - } - - @Override - public void updateDeleteTransactionId(long deleteTransactionId) { - containerData.updateDeleteTransactionId(deleteTransactionId); - } - - @Override - public KeyValueBlockIterator blockIterator() throws IOException{ - return new KeyValueBlockIterator(containerData.getContainerID(), new File( - containerData.getContainerPath())); - } - - @Override - public void importContainerData(InputStream input, - ContainerPacker<KeyValueContainerData> packer) throws IOException { - writeLock(); - try { - if (getContainerFile().exists()) { - String errorMessage = String.format( - "Can't import container (cid=%d) data to a specific location" - + " as the container descriptor (%s) has already been exist.", - getContainerData().getContainerID(), - getContainerFile().getAbsolutePath()); - throw new IOException(errorMessage); - } - //copy the values from the input stream to the final destination - // directory. - byte[] descriptorContent = packer.unpackContainerData(this, input); - - Preconditions.checkNotNull(descriptorContent, - "Container descriptor is missing from the container archive: " - + getContainerData().getContainerID()); - - //now, we have extracted the container descriptor from the previous - //datanode. We can load it and upload it with the current data - // (original metadata + current filepath fields) - KeyValueContainerData originalContainerData = - (KeyValueContainerData) ContainerDataYaml - .readContainer(descriptorContent); - - - containerData.setState(originalContainerData.getState()); - containerData - .setContainerDBType(originalContainerData.getContainerDBType()); - containerData.setBytesUsed(originalContainerData.getBytesUsed()); - - //rewriting the yaml file with new checksum calculation. - update(originalContainerData.getMetadata(), true); - - //fill in memory stat counter (keycount, byte usage) - KeyValueContainerUtil.parseKVContainerData(containerData, config); - - } catch (Exception ex) { - //delete all the temporary data in case of any exception. - try { - FileUtils.deleteDirectory(new File(containerData.getMetadataPath())); - FileUtils.deleteDirectory(new File(containerData.getChunksPath())); - FileUtils.deleteDirectory(getContainerFile()); - } catch (Exception deleteex) { - LOG.error( - "Can not cleanup destination directories after a container import" - + " error (cid" + - containerData.getContainerID() + ")", deleteex); - } - throw ex; - } finally { - writeUnlock(); - } - } - - @Override - public void exportContainerData(OutputStream destination, - ContainerPacker<KeyValueContainerData> packer) throws IOException { - if (getContainerData().getState() != ContainerLifeCycleState.CLOSED) { - throw new IllegalStateException( - "Only closed containers could be exported: ContainerId=" - + getContainerData().getContainerID()); - } - packer.pack(this, destination); - } - - /** - * Acquire read lock. - */ - public void readLock() { - this.lock.readLock().lock(); - - } - - /** - * Release read lock. - */ - public void readUnlock() { - this.lock.readLock().unlock(); - } - - /** - * Check if the current thread holds read lock. - */ - public boolean hasReadLock() { - return this.lock.readLock().tryLock(); - } - - /** - * Acquire write lock. - */ - public void writeLock() { - this.lock.writeLock().lock(); - } - - /** - * Release write lock. - */ - public void writeUnlock() { - this.lock.writeLock().unlock(); - - } - - /** - * Check if the current thread holds write lock. - */ - public boolean hasWriteLock() { - return this.lock.writeLock().isHeldByCurrentThread(); - } - - /** - * Acquire read lock, unless interrupted while waiting. - * @throws InterruptedException - */ - @Override - public void readLockInterruptibly() throws InterruptedException { - this.lock.readLock().lockInterruptibly(); - } - - /** - * Acquire write lock, unless interrupted while waiting. - * @throws InterruptedException - */ - @Override - public void writeLockInterruptibly() throws InterruptedException { - this.lock.writeLock().lockInterruptibly(); - - } - - /** - * Returns containerFile. - * @return .container File name - */ - @Override - public File getContainerFile() { - return new File(containerData.getMetadataPath(), containerData - .getContainerID() + OzoneConsts.CONTAINER_EXTENSION); - } - - /** - * Returns KeyValueContainerReport for the KeyValueContainer. - */ - @Override - public StorageContainerDatanodeProtocolProtos.ContainerInfo - getContainerReport() throws StorageContainerException{ - StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = - StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); - ciBuilder.setContainerID(containerData.getContainerID()) - .setReadCount(containerData.getReadCount()) - .setWriteCount(containerData.getWriteCount()) - .setReadBytes(containerData.getReadBytes()) - .setWriteBytes(containerData.getWriteBytes()) - .setKeyCount(containerData.getKeyCount()) - .setUsed(containerData.getBytesUsed()) - .setState(getHddsState()) - .setDeleteTransactionId(containerData.getDeleteTransactionId()); - return ciBuilder.build(); - } - - /** - * Returns LifeCycle State of the container. - * @return LifeCycle State of the container in HddsProtos format - * @throws StorageContainerException - */ - private HddsProtos.LifeCycleState getHddsState() - throws StorageContainerException { - HddsProtos.LifeCycleState state; - switch (containerData.getState()) { - case OPEN: - state = HddsProtos.LifeCycleState.OPEN; - break; - case CLOSING: - state = HddsProtos.LifeCycleState.CLOSING; - break; - case CLOSED: - state = HddsProtos.LifeCycleState.CLOSED; - break; - default: - throw new StorageContainerException("Invalid Container state found: " + - containerData.getContainerID(), INVALID_CONTAINER_STATE); - } - return state; - } - - /** - * Returns container DB file. - * @return - */ - public File getContainerDBFile() { - return new File(containerData.getMetadataPath(), containerData - .getContainerID() + OzoneConsts.DN_CONTAINER_DB); - } - - /** - * Creates a temporary file. - * @param file - * @return - * @throws IOException - */ - private File createTempFile(File file) throws IOException{ - return File.createTempFile("tmp_" + System.currentTimeMillis() + "_", - file.getName(), file.getParentFile()); - } - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java deleted file mode 100644 index 7ffdbf5..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.keyvalue; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import java.util.Collections; - -import org.apache.hadoop.conf.StorageSize; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.yaml.snakeyaml.nodes.Tag; - - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.lang.Math.max; -import static org.apache.hadoop.ozone.OzoneConsts.CHUNKS_PATH; -import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE; -import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH; - -/** - * This class represents the KeyValueContainer metadata, which is the - * in-memory representation of container metadata and is represented on disk - * by the .container file. - */ -public class KeyValueContainerData extends ContainerData { - - // Yaml Tag used for KeyValueContainerData. - public static final Tag KEYVALUE_YAML_TAG = new Tag("KeyValueContainerData"); - - // Fields need to be stored in .container file. - private static final List<String> KV_YAML_FIELDS; - - // Path to Container metadata Level DB/RocksDB Store and .container file. - private String metadataPath; - - // Path to Physical file system where chunks are stored. - private String chunksPath; - - //Type of DB used to store key to chunks mapping - private String containerDBType; - - private File dbFile = null; - - /** - * Number of pending deletion blocks in KeyValueContainer. - */ - private final AtomicInteger numPendingDeletionBlocks; - - private long deleteTransactionId; - - static { - // Initialize YAML fields - KV_YAML_FIELDS = Lists.newArrayList(); - KV_YAML_FIELDS.addAll(YAML_FIELDS); - KV_YAML_FIELDS.add(METADATA_PATH); - KV_YAML_FIELDS.add(CHUNKS_PATH); - KV_YAML_FIELDS.add(CONTAINER_DB_TYPE); - } - - /** - * Constructs KeyValueContainerData object. - * @param id - ContainerId - * @param size - maximum size of the container in bytes - */ - public KeyValueContainerData(long id, long size) { - super(ContainerProtos.ContainerType.KeyValueContainer, id, size); - this.numPendingDeletionBlocks = new AtomicInteger(0); - this.deleteTransactionId = 0; - } - - /** - * Constructs KeyValueContainerData object. - * @param id - ContainerId - * @param layOutVersion - * @param size - maximum size of the container in bytes - */ - public KeyValueContainerData(long id, int layOutVersion, long size) { - super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion, - size); - this.numPendingDeletionBlocks = new AtomicInteger(0); - this.deleteTransactionId = 0; - } - - - /** - * Sets Container dbFile. This should be called only during creation of - * KeyValue container. - * @param containerDbFile - */ - public void setDbFile(File containerDbFile) { - dbFile = containerDbFile; - } - - /** - * Returns container DB file. - * @return dbFile - */ - public File getDbFile() { - return dbFile; - } - - /** - * Returns container metadata path. - * @return - Physical path where container file and checksum is stored. - */ - public String getMetadataPath() { - return metadataPath; - } - - /** - * Sets container metadata path. - * - * @param path - String. - */ - public void setMetadataPath(String path) { - this.metadataPath = path; - } - - /** - * Returns the path to base dir of the container. - * @return Path to base dir - */ - public String getContainerPath() { - if (metadataPath == null) { - return null; - } - return new File(metadataPath).getParent(); - } - - /** - * Get chunks path. - * @return - Path where chunks are stored - */ - public String getChunksPath() { - return chunksPath; - } - - /** - * Set chunks Path. - * @param chunkPath - File path. - */ - public void setChunksPath(String chunkPath) { - this.chunksPath = chunkPath; - } - - /** - * Returns the DBType used for the container. - * @return containerDBType - */ - public String getContainerDBType() { - return containerDBType; - } - - /** - * Sets the DBType used for the container. - * @param containerDBType - */ - public void setContainerDBType(String containerDBType) { - this.containerDBType = containerDBType; - } - - /** - * Increase the count of pending deletion blocks. - * - * @param numBlocks increment number - */ - public void incrPendingDeletionBlocks(int numBlocks) { - this.numPendingDeletionBlocks.addAndGet(numBlocks); - } - - /** - * Decrease the count of pending deletion blocks. - * - * @param numBlocks decrement number - */ - public void decrPendingDeletionBlocks(int numBlocks) { - this.numPendingDeletionBlocks.addAndGet(-1 * numBlocks); - } - - /** - * Get the number of pending deletion blocks. - */ - public int getNumPendingDeletionBlocks() { - return this.numPendingDeletionBlocks.get(); - } - - /** - * Sets deleteTransactionId to latest delete transactionId for the container. - * - * @param transactionId latest transactionId of the container. - */ - public void updateDeleteTransactionId(long transactionId) { - deleteTransactionId = max(transactionId, deleteTransactionId); - } - - /** - * Return the latest deleteTransactionId of the container. - */ - public long getDeleteTransactionId() { - return deleteTransactionId; - } - - /** - * Returns a ProtoBuf Message from ContainerData. - * - * @return Protocol Buffer Message - */ - public ContainerProtos.ContainerData getProtoBufMessage() { - ContainerProtos.ContainerData.Builder builder = ContainerProtos - .ContainerData.newBuilder(); - builder.setContainerID(this.getContainerID()); - builder.setContainerPath(this.getMetadataPath()); - builder.setState(this.getState()); - - for (Map.Entry<String, String> entry : getMetadata().entrySet()) { - ContainerProtos.KeyValue.Builder keyValBuilder = - ContainerProtos.KeyValue.newBuilder(); - builder.addMetadata(keyValBuilder.setKey(entry.getKey()) - .setValue(entry.getValue()).build()); - } - - if (this.getBytesUsed() >= 0) { - builder.setBytesUsed(this.getBytesUsed()); - } - - if(this.getContainerType() != null) { - builder.setContainerType(ContainerProtos.ContainerType.KeyValueContainer); - } - - return builder.build(); - } - - public static List<String> getYamlFields() { - return Collections.unmodifiableList(KV_YAML_FIELDS); - } - - /** - * Constructs a KeyValueContainerData object from ProtoBuf classes. - * - * @param protoData - ProtoBuf Message - * @throws IOException - */ - @VisibleForTesting - public static KeyValueContainerData getFromProtoBuf( - ContainerProtos.ContainerData protoData) throws IOException { - // TODO: Add containerMaxSize to ContainerProtos.ContainerData - StorageSize storageSize = StorageSize.parse( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); - KeyValueContainerData data = new KeyValueContainerData( - protoData.getContainerID(), - (long)storageSize.getUnit().toBytes(storageSize.getValue())); - for (int x = 0; x < protoData.getMetadataCount(); x++) { - data.addMetadata(protoData.getMetadata(x).getKey(), - protoData.getMetadata(x).getValue()); - } - - if (protoData.hasContainerPath()) { - String metadataPath = protoData.getContainerPath()+ File.separator + - OzoneConsts.CONTAINER_META_PATH; - data.setMetadataPath(metadataPath); - } - - if (protoData.hasState()) { - data.setState(protoData.getState()); - } - - if (protoData.hasBytesUsed()) { - data.setBytesUsed(protoData.getBytesUsed()); - } - - return data; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java deleted file mode 100644 index 5be6e28..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ /dev/null @@ -1,850 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.keyvalue; - -import java.io.FileInputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerLifeCycleState; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerType; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetSmallFileRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .PutSmallFileRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.ozone.container.common.helpers.BlockData; -import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; -import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; -import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume - .RoundRobinVolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; -import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; -import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; -import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; -import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; -import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; -import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; -import org.apache.hadoop.ozone.container.keyvalue.statemachine.background - .BlockDeletingService; -import org.apache.hadoop.util.AutoCloseableLock; -import org.apache.hadoop.util.ReflectionUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.BLOCK_NOT_COMMITTED; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CLOSED_CONTAINER_IO; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.DELETE_ON_OPEN_CONTAINER; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.GET_SMALL_FILE_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.INVALID_CONTAINER_STATE; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.IO_EXCEPTION; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.PUT_SMALL_FILE_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Stage; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handler for KeyValue Container type. - */ -public class KeyValueHandler extends Handler { - - private static final Logger LOG = LoggerFactory.getLogger( - KeyValueHandler.class); - - private final ContainerType containerType; - private final BlockManager blockManager; - private final ChunkManager chunkManager; - private final BlockDeletingService blockDeletingService; - private final VolumeChoosingPolicy volumeChoosingPolicy; - private final long maxContainerSize; - private final AutoCloseableLock handlerLock; - private final OpenContainerBlockMap openContainerBlockMap; - - public KeyValueHandler(Configuration config, ContainerSet contSet, - VolumeSet volSet, ContainerMetrics metrics) { - super(config, contSet, volSet, metrics); - containerType = ContainerType.KeyValueContainer; - blockManager = new BlockManagerImpl(config); - chunkManager = new ChunkManagerImpl(); - long svcInterval = config - .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, - OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - long serviceTimeout = config - .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - this.blockDeletingService = - new BlockDeletingService(containerSet, svcInterval, serviceTimeout, - TimeUnit.MILLISECONDS, config); - blockDeletingService.start(); - volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass( - HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy - .class, VolumeChoosingPolicy.class), conf); - maxContainerSize = (long)config.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - // this handler lock is used for synchronizing createContainer Requests, - // so using a fair lock here. - handlerLock = new AutoCloseableLock(new ReentrantLock(true)); - openContainerBlockMap = new OpenContainerBlockMap(); - } - - @VisibleForTesting - public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() { - return volumeChoosingPolicy; - } - /** - * Returns OpenContainerBlockMap instance. - * - * @return OpenContainerBlockMap - */ - public OpenContainerBlockMap getOpenContainerBlockMap() { - return openContainerBlockMap; - } - - @Override - public ContainerCommandResponseProto handle( - ContainerCommandRequestProto request, Container container) { - - Type cmdType = request.getCmdType(); - KeyValueContainer kvContainer = (KeyValueContainer) container; - switch(cmdType) { - case CreateContainer: - return handleCreateContainer(request, kvContainer); - case ReadContainer: - return handleReadContainer(request, kvContainer); - case UpdateContainer: - return handleUpdateContainer(request, kvContainer); - case DeleteContainer: - return handleDeleteContainer(request, kvContainer); - case ListContainer: - return handleUnsupportedOp(request); - case CloseContainer: - return handleCloseContainer(request, kvContainer); - case PutBlock: - return handlePutBlock(request, kvContainer); - case GetBlock: - return handleGetBlock(request, kvContainer); - case DeleteBlock: - return handleDeleteBlock(request, kvContainer); - case ListBlock: - return handleUnsupportedOp(request); - case ReadChunk: - return handleReadChunk(request, kvContainer); - case DeleteChunk: - return handleDeleteChunk(request, kvContainer); - case WriteChunk: - return handleWriteChunk(request, kvContainer); - case ListChunk: - return handleUnsupportedOp(request); - case CompactChunk: - return handleUnsupportedOp(request); - case PutSmallFile: - return handlePutSmallFile(request, kvContainer); - case GetSmallFile: - return handleGetSmallFile(request, kvContainer); - case GetCommittedBlockLength: - return handleGetCommittedBlockLength(request, kvContainer); - default: - return null; - } - } - - @VisibleForTesting - public ChunkManager getChunkManager() { - return this.chunkManager; - } - - @VisibleForTesting - public BlockManager getBlockManager() { - return this.blockManager; - } - - /** - * Handles Create Container Request. If successful, adds the container to - * ContainerSet. - */ - ContainerCommandResponseProto handleCreateContainer( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - if (!request.hasCreateContainer()) { - LOG.debug("Malformed Create Container request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - // Create Container request should be passed a null container as the - // container would be created here. - Preconditions.checkArgument(kvContainer == null); - - long containerID = request.getContainerID(); - - KeyValueContainerData newContainerData = new KeyValueContainerData( - containerID, maxContainerSize); - // TODO: Add support to add metadataList to ContainerData. Add metadata - // to container during creation. - KeyValueContainer newContainer = new KeyValueContainer( - newContainerData, conf); - - try { - handlerLock.acquire(); - if (containerSet.getContainer(containerID) == null) { - newContainer.create(volumeSet, volumeChoosingPolicy, scmID); - containerSet.addContainer(newContainer); - } else { - throw new StorageContainerException("Container already exists with " + - "container Id " + containerID, ContainerProtos.Result - .CONTAINER_EXISTS); - } - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } finally { - handlerLock.release(); - } - - return ContainerUtils.getSuccessResponse(request); - } - - public void populateContainerPathFields(KeyValueContainer container, - long maxSize) throws IOException { - volumeSet.acquireLock(); - try { - HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet - .getVolumesList(), maxSize); - String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); - container.populatePathFields(scmID, containerVolume, hddsVolumeDir); - } finally { - volumeSet.releaseLock(); - } - } - - /** - * Handles Read Container Request. Returns the ContainerData as response. - */ - ContainerCommandResponseProto handleReadContainer( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - if (!request.hasReadContainer()) { - LOG.debug("Malformed Read Container request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - KeyValueContainerData containerData = kvContainer.getContainerData(); - return KeyValueContainerUtil.getReadContainerResponse( - request, containerData); - } - - - /** - * Handles Update Container Request. If successful, the container metadata - * is updated. - */ - ContainerCommandResponseProto handleUpdateContainer( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasUpdateContainer()) { - LOG.debug("Malformed Update Container request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - boolean forceUpdate = request.getUpdateContainer().getForceUpdate(); - List<KeyValue> keyValueList = - request.getUpdateContainer().getMetadataList(); - Map<String, String> metadata = new HashMap<>(); - for (KeyValue keyValue : keyValueList) { - metadata.put(keyValue.getKey(), keyValue.getValue()); - } - - try { - if (!metadata.isEmpty()) { - kvContainer.update(metadata, forceUpdate); - } - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } - return ContainerUtils.getSuccessResponse(request); - } - - /** - * Handles Delete Container Request. - * Open containers cannot be deleted. - * Holds writeLock on ContainerSet till the container is removed from - * containerMap. On disk deletion of container files will happen - * asynchronously without the lock. - */ - ContainerCommandResponseProto handleDeleteContainer( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasDeleteContainer()) { - LOG.debug("Malformed Delete container request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - boolean forceDelete = request.getDeleteContainer().getForceDelete(); - kvContainer.writeLock(); - try { - // Check if container is open - if (kvContainer.getContainerData().isOpen()) { - kvContainer.writeUnlock(); - throw new StorageContainerException( - "Deletion of Open Container is not allowed.", - DELETE_ON_OPEN_CONTAINER); - } else if (!forceDelete && kvContainer.getContainerData().getKeyCount() - > 0) { - // If the container is not empty and cannot be deleted forcibly, - // then throw a SCE to stop deleting. - kvContainer.writeUnlock(); - throw new StorageContainerException( - "Container cannot be deleted because it is not empty.", - ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY); - } else { - long containerId = kvContainer.getContainerData().getContainerID(); - containerSet.removeContainer(containerId); - openContainerBlockMap.removeContainer(containerId); - // Release the lock first. - // Avoid holding write locks for disk operations - kvContainer.writeUnlock(); - - kvContainer.delete(forceDelete); - } - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } finally { - if (kvContainer.hasWriteLock()) { - kvContainer.writeUnlock(); - } - } - return ContainerUtils.getSuccessResponse(request); - } - - /** - * Handles Close Container Request. An open container is closed. - */ - ContainerCommandResponseProto handleCloseContainer( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasCloseContainer()) { - LOG.debug("Malformed Update Container request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - long containerID = kvContainer.getContainerData().getContainerID(); - ContainerLifeCycleState containerState = kvContainer.getContainerState(); - - try { - if (containerState == ContainerLifeCycleState.CLOSED) { - LOG.debug("Container {} is already closed.", containerID); - return ContainerUtils.getSuccessResponse(request); - } else if (containerState == ContainerLifeCycleState.INVALID) { - LOG.debug("Invalid container data. ContainerID: {}", containerID); - throw new StorageContainerException("Invalid container data. " + - "ContainerID: " + containerID, INVALID_CONTAINER_STATE); - } - - KeyValueContainerData kvData = kvContainer.getContainerData(); - - // remove the container from open block map once, all the blocks - // have been committed and the container is closed - kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING); - commitPendingBlocks(kvContainer); - kvContainer.close(); - // make sure the the container open keys from BlockMap gets removed - openContainerBlockMap.removeContainer(kvData.getContainerID()); - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Close Container failed", ex, - IO_EXCEPTION), request); - } - - return ContainerUtils.getSuccessResponse(request); - } - - /** - * Handle Put Block operation. Calls BlockManager to process the request. - */ - ContainerCommandResponseProto handlePutBlock( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - long blockLength; - if (!request.hasPutBlock()) { - LOG.debug("Malformed Put Key request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - try { - checkContainerOpen(kvContainer); - - BlockData blockData = BlockData.getFromProtoBuf( - request.getPutBlock().getBlockData()); - long numBytes = blockData.getProtoBufMessage().toByteArray().length; - blockLength = commitKey(blockData, kvContainer); - metrics.incContainerBytesStats(Type.PutBlock, numBytes); - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Put Key failed", ex, IO_EXCEPTION), - request); - } - - return BlockUtils.putBlockResponseSuccess(request, blockLength); - } - - private void commitPendingBlocks(KeyValueContainer kvContainer) - throws IOException { - long containerId = kvContainer.getContainerData().getContainerID(); - List<BlockData> pendingBlocks = - this.openContainerBlockMap.getOpenBlocks(containerId); - for(BlockData blockData : pendingBlocks) { - commitKey(blockData, kvContainer); - } - } - - private long commitKey(BlockData blockData, KeyValueContainer kvContainer) - throws IOException { - Preconditions.checkNotNull(blockData); - long length = blockManager.putBlock(kvContainer, blockData); - //update the open key Map in containerManager - this.openContainerBlockMap.removeFromBlockMap(blockData.getBlockID()); - return length; - } - /** - * Handle Get Block operation. Calls BlockManager to process the request. - */ - ContainerCommandResponseProto handleGetBlock( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasGetBlock()) { - LOG.debug("Malformed Get Key request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - BlockData responseData; - try { - BlockID blockID = BlockID.getFromProtobuf( - request.getGetBlock().getBlockID()); - responseData = blockManager.getBlock(kvContainer, blockID); - long numBytes = responseData.getProtoBufMessage().toByteArray().length; - metrics.incContainerBytesStats(Type.GetBlock, numBytes); - - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Get Key failed", ex, IO_EXCEPTION), - request); - } - - return BlockUtils.getBlockDataResponse(request, responseData); - } - - /** - * Handles GetCommittedBlockLength operation. - * Calls BlockManager to process the request. - */ - ContainerCommandResponseProto handleGetCommittedBlockLength( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - if (!request.hasGetCommittedBlockLength()) { - LOG.debug("Malformed Get Key request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - long blockLength; - try { - BlockID blockID = BlockID - .getFromProtobuf(request.getGetCommittedBlockLength().getBlockID()); - // Check if it really exists in the openContainerBlockMap - if (openContainerBlockMap.checkIfBlockExists(blockID)) { - String msg = "Block " + blockID + " is not committed yet."; - throw new StorageContainerException(msg, BLOCK_NOT_COMMITTED); - } - blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID); - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("GetCommittedBlockLength failed", ex, - IO_EXCEPTION), request); - } - - return BlockUtils.getBlockLengthResponse(request, blockLength); - } - - /** - * Handle Delete Block operation. Calls BlockManager to process the request. - */ - ContainerCommandResponseProto handleDeleteBlock( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasDeleteBlock()) { - LOG.debug("Malformed Delete Key request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - try { - checkContainerOpen(kvContainer); - - BlockID blockID = BlockID.getFromProtobuf( - request.getDeleteBlock().getBlockID()); - - blockManager.deleteBlock(kvContainer, blockID); - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Delete Key failed", ex, IO_EXCEPTION), - request); - } - - return BlockUtils.getBlockResponseSuccess(request); - } - - /** - * Handle Read Chunk operation. Calls ChunkManager to process the request. - */ - ContainerCommandResponseProto handleReadChunk( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasReadChunk()) { - LOG.debug("Malformed Read Chunk request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - ChunkInfo chunkInfo; - byte[] data; - try { - BlockID blockID = BlockID.getFromProtobuf( - request.getReadChunk().getBlockID()); - chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk() - .getChunkData()); - Preconditions.checkNotNull(chunkInfo); - - data = chunkManager.readChunk(kvContainer, blockID, chunkInfo); - metrics.incContainerBytesStats(Type.ReadChunk, data.length); - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION), - request); - } - - return ChunkUtils.getReadChunkResponse(request, data, chunkInfo); - } - - /** - * Handle Delete Chunk operation. Calls ChunkManager to process the request. - */ - ContainerCommandResponseProto handleDeleteChunk( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasDeleteChunk()) { - LOG.debug("Malformed Delete Chunk request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - try { - checkContainerOpen(kvContainer); - - BlockID blockID = BlockID.getFromProtobuf( - request.getDeleteChunk().getBlockID()); - ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk() - .getChunkData(); - ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); - Preconditions.checkNotNull(chunkInfo); - - chunkManager.deleteChunk(kvContainer, blockID, chunkInfo); - openContainerBlockMap.removeChunk(blockID, chunkInfoProto); - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Delete Chunk failed", ex, - IO_EXCEPTION), request); - } - - return ChunkUtils.getChunkResponseSuccess(request); - } - - /** - * Handle Write Chunk operation. Calls ChunkManager to process the request. - */ - ContainerCommandResponseProto handleWriteChunk( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasWriteChunk()) { - LOG.debug("Malformed Write Chunk request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - try { - checkContainerOpen(kvContainer); - - BlockID blockID = BlockID.getFromProtobuf( - request.getWriteChunk().getBlockID()); - ContainerProtos.ChunkInfo chunkInfoProto = - request.getWriteChunk().getChunkData(); - ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); - Preconditions.checkNotNull(chunkInfo); - - byte[] data = null; - if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || - request.getWriteChunk().getStage() == Stage.COMBINED) { - data = request.getWriteChunk().getData().toByteArray(); - } - - chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, - request.getWriteChunk().getStage()); - - // We should increment stats after writeChunk - if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || - request.getWriteChunk().getStage() == Stage.COMBINED) { - metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() - .getChunkData().getLen()); - } - - if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA - || request.getWriteChunk().getStage() == Stage.COMBINED) { - // the openContainerBlockMap should be updated only during - // COMMIT_STAGE of handling write chunk request. - openContainerBlockMap.addChunk(blockID, chunkInfoProto); - } - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), - request); - } - - return ChunkUtils.getChunkResponseSuccess(request); - } - - /** - * Handle Put Small File operation. Writes the chunk and associated key - * using a single RPC. Calls BlockManager and ChunkManager to process the - * request. - */ - ContainerCommandResponseProto handlePutSmallFile( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasPutSmallFile()) { - LOG.debug("Malformed Put Small File request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - PutSmallFileRequestProto putSmallFileReq = - request.getPutSmallFile(); - - try { - checkContainerOpen(kvContainer); - - BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock() - .getBlockData().getBlockID()); - BlockData blockData = BlockData.getFromProtoBuf( - putSmallFileReq.getBlock().getBlockData()); - Preconditions.checkNotNull(blockData); - - ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf( - putSmallFileReq.getChunkInfo()); - Preconditions.checkNotNull(chunkInfo); - byte[] data = putSmallFileReq.getData().toByteArray(); - // chunks will be committed as a part of handling putSmallFile - // here. There is no need to maintain this info in openContainerBlockMap. - chunkManager.writeChunk( - kvContainer, blockID, chunkInfo, data, Stage.COMBINED); - - List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); - chunks.add(chunkInfo.getProtoBufMessage()); - blockData.setChunks(chunks); - blockManager.putBlock(kvContainer, blockData); - metrics.incContainerBytesStats(Type.PutSmallFile, data.length); - - } catch (StorageContainerException ex) { - return ContainerUtils.logAndReturnError(LOG, ex, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Read Chunk failed", ex, - PUT_SMALL_FILE_ERROR), request); - } - - return SmallFileUtils.getPutFileResponseSuccess(request); - } - - /** - * Handle Get Small File operation. Gets a data stream using a key. This - * helps in reducing the RPC overhead for small files. Calls BlockManager and - * ChunkManager to process the request. - */ - ContainerCommandResponseProto handleGetSmallFile( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - - if (!request.hasGetSmallFile()) { - LOG.debug("Malformed Get Small File request. trace ID: {}", - request.getTraceID()); - return ContainerUtils.malformedRequest(request); - } - - GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile(); - - try { - BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() - .getBlockID()); - BlockData responseData = blockManager.getBlock(kvContainer, blockID); - - ContainerProtos.ChunkInfo chunkInfo = null; - ByteString dataBuf = ByteString.EMPTY; - for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { - byte[] data = chunkManager.readChunk(kvContainer, blockID, - ChunkInfo.getFromProtoBuf(chunk)); - ByteString current = ByteString.copyFrom(data); - dataBuf = dataBuf.concat(current); - chunkInfo = chunk; - } - metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size()); - return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf - .toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo)); - } catch (StorageContainerException e) { - return ContainerUtils.logAndReturnError(LOG, e, request); - } catch (IOException ex) { - return ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Write Chunk failed", ex, - GET_SMALL_FILE_ERROR), request); - } - } - - /** - * Handle unsupported operation. - */ - ContainerCommandResponseProto handleUnsupportedOp( - ContainerCommandRequestProto request) { - // TODO : remove all unsupported operations or handle them. - return ContainerUtils.unsupportedRequest(request); - } - - /** - * Check if container is open. Throw exception otherwise. - * @param kvContainer - * @throws StorageContainerException - */ - private void checkContainerOpen(KeyValueContainer kvContainer) - throws StorageContainerException { - - ContainerLifeCycleState containerState = kvContainer.getContainerState(); - - if (containerState == ContainerLifeCycleState.OPEN) { - return; - } else { - String msg = "Requested operation not allowed as ContainerState is " + - containerState; - ContainerProtos.Result result = null; - switch (containerState) { - case CLOSING: - case CLOSED: - result = CLOSED_CONTAINER_IO; - break; - case INVALID: - result = INVALID_CONTAINER_STATE; - break; - default: - result = CONTAINER_INTERNAL_ERROR; - } - - throw new StorageContainerException(msg, result); - } - } - - public Container importContainer(long containerID, long maxSize, - FileInputStream rawContainerStream, - TarContainerPacker packer) - throws IOException { - - KeyValueContainerData containerData = - new KeyValueContainerData(containerID, - maxSize); - - KeyValueContainer container = new KeyValueContainer(containerData, - conf); - - populateContainerPathFields(container, maxSize); - container.importContainerData(rawContainerStream, packer); - return container; - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java deleted file mode 100644 index 13689a7..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.keyvalue; - -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.stream.Collectors; - -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; - -import com.google.common.base.Preconditions; -import org.apache.commons.compress.archivers.ArchiveEntry; -import org.apache.commons.compress.archivers.ArchiveOutputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.apache.commons.compress.compressors.CompressorException; -import org.apache.commons.compress.compressors.CompressorInputStream; -import org.apache.commons.compress.compressors.CompressorOutputStream; -import org.apache.commons.compress.compressors.CompressorStreamFactory; -import org.apache.commons.io.IOUtils; - -/** - * Compress/uncompress KeyValueContainer data to a tar.gz archive. - */ -public class TarContainerPacker - implements ContainerPacker<KeyValueContainerData> { - - private static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS; - - private static final String DB_DIR_NAME = "db"; - - private static final String CONTAINER_FILE_NAME = "container.yaml"; - - - - /** - * Given an input stream (tar file) extract the data to the specified - * directories. - * - * @param container container which defines the destination structure. - * @param inputStream the input stream. - * @throws IOException - */ - @Override - public byte[] unpackContainerData(Container<KeyValueContainerData> container, - InputStream inputStream) - throws IOException { - byte[] descriptorFileContent = null; - try { - KeyValueContainerData containerData = container.getContainerData(); - CompressorInputStream compressorInputStream = - new CompressorStreamFactory() - .createCompressorInputStream(CompressorStreamFactory.GZIP, - inputStream); - - TarArchiveInputStream tarInput = - new TarArchiveInputStream(compressorInputStream); - - TarArchiveEntry entry = tarInput.getNextTarEntry(); - while (entry != null) { - String name = entry.getName(); - if (name.startsWith(DB_DIR_NAME + "/")) { - Path destinationPath = containerData.getDbFile().toPath() - .resolve(name.substring(DB_DIR_NAME.length() + 1)); - extractEntry(tarInput, entry.getSize(), destinationPath); - } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) { - Path destinationPath = Paths.get(containerData.getChunksPath()) - .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1)); - extractEntry(tarInput, entry.getSize(), destinationPath); - } else if (name.equals(CONTAINER_FILE_NAME)) { - //Don't do anything. Container file should be unpacked in a - //separated step by unpackContainerDescriptor call. - descriptorFileContent = readEntry(tarInput, entry); - } else { - throw new IllegalArgumentException( - "Unknown entry in the tar file: " + "" + name); - } - entry = tarInput.getNextTarEntry(); - } - return descriptorFileContent; - - } catch (CompressorException e) { - throw new IOException( - "Can't uncompress the given container: " + container - .getContainerData().getContainerID(), - e); - } - } - - private void extractEntry(TarArchiveInputStream tarInput, long size, - Path path) throws IOException { - Preconditions.checkNotNull(path, "Path element should not be null"); - Path parent = Preconditions.checkNotNull(path.getParent(), - "Path element should have a parent directory"); - Files.createDirectories(parent); - try (BufferedOutputStream bos = new BufferedOutputStream( - new FileOutputStream(path.toAbsolutePath().toString()))) { - int bufferSize = 1024; - byte[] buffer = new byte[bufferSize + 1]; - long remaining = size; - while (remaining > 0) { - int read = - tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize)); - if (read >= 0) { - remaining -= read; - bos.write(buffer, 0, read); - } else { - remaining = 0; - } - } - } - - } - - /** - * Given a containerData include all the required container data/metadata - * in a tar file. - * - * @param container Container to archive (data + metadata). - * @param destination Destination tar file/stream. - * @throws IOException - */ - @Override - public void pack(Container<KeyValueContainerData> container, - OutputStream destination) - throws IOException { - - KeyValueContainerData containerData = container.getContainerData(); - - try (CompressorOutputStream gzippedOut = new CompressorStreamFactory() - .createCompressorOutputStream(CompressorStreamFactory.GZIP, - destination)) { - - try (ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream( - gzippedOut)) { - - includePath(containerData.getDbFile().toString(), DB_DIR_NAME, - archiveOutputStream); - - includePath(containerData.getChunksPath(), CHUNKS_DIR_NAME, - archiveOutputStream); - - includeFile(container.getContainerFile(), - CONTAINER_FILE_NAME, - archiveOutputStream); - } - } catch (CompressorException e) { - throw new IOException( - "Can't compress the container: " + containerData.getContainerID(), - e); - } - - } - - @Override - public byte[] unpackContainerDescriptor(InputStream inputStream) - throws IOException { - try { - CompressorInputStream compressorInputStream = - new CompressorStreamFactory() - .createCompressorInputStream(CompressorStreamFactory.GZIP, - inputStream); - - TarArchiveInputStream tarInput = - new TarArchiveInputStream(compressorInputStream); - - TarArchiveEntry entry = tarInput.getNextTarEntry(); - while (entry != null) { - String name = entry.getName(); - if (name.equals(CONTAINER_FILE_NAME)) { - return readEntry(tarInput, entry); - } - entry = tarInput.getNextTarEntry(); - } - - } catch (CompressorException e) { - throw new IOException( - "Can't read the container descriptor from the container archive", - e); - } - throw new IOException( - "Container descriptor is missing from the container archive."); - } - - private byte[] readEntry(TarArchiveInputStream tarInput, - TarArchiveEntry entry) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int bufferSize = 1024; - byte[] buffer = new byte[bufferSize + 1]; - long remaining = entry.getSize(); - while (remaining > 0) { - int read = - tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize)); - remaining -= read; - bos.write(buffer, 0, read); - } - return bos.toByteArray(); - } - - private void includePath(String containerPath, String subdir, - ArchiveOutputStream archiveOutputStream) throws IOException { - - for (Path path : Files.list(Paths.get(containerPath)) - .collect(Collectors.toList())) { - - includeFile(path.toFile(), subdir + "/" + path.getFileName(), - archiveOutputStream); - } - } - - private void includeFile(File file, String entryName, - ArchiveOutputStream archiveOutputStream) throws IOException { - ArchiveEntry archiveEntry = - archiveOutputStream.createArchiveEntry(file, entryName); - archiveOutputStream.putArchiveEntry(archiveEntry); - try (FileInputStream fis = new FileInputStream(file)) { - IOUtils.copy(fis, archiveOutputStream); - } - archiveOutputStream.closeArchiveEntry(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java deleted file mode 100644 index f5cc847..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.keyvalue.helpers; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetBlockResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - GetCommittedBlockLengthResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - PutBlockResponseProto; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.ozone.container.common.helpers.BlockData; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.apache.hadoop.ozone.container.common.utils.ContainerCache; -import org.apache.hadoop.utils.MetadataStore; - -import java.io.IOException; - -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.NO_SUCH_BLOCK; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.UNABLE_TO_READ_METADATA_DB; - -/** - * Utils functions to help block functions. - */ -public final class BlockUtils { - - /** Never constructed. **/ - private BlockUtils() { - - } - /** - * Get a DB handler for a given container. - * If the handler doesn't exist in cache yet, first create one and - * add into cache. This function is called with containerManager - * ReadLock held. - * - * @param containerData containerData. - * @param conf configuration. - * @return MetadataStore handle. - * @throws StorageContainerException - */ - public static MetadataStore getDB(KeyValueContainerData containerData, - Configuration conf) throws - StorageContainerException { - Preconditions.checkNotNull(containerData); - ContainerCache cache = ContainerCache.getInstance(conf); - Preconditions.checkNotNull(cache); - Preconditions.checkNotNull(containerData.getDbFile()); - try { - return cache.getDB(containerData.getContainerID(), containerData - .getContainerDBType(), containerData.getDbFile().getAbsolutePath()); - } catch (IOException ex) { - String message = String.format("Error opening DB. Container:%s " + - "ContainerPath:%s", containerData.getContainerID(), containerData - .getDbFile().getPath()); - throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB); - } - } - /** - * Remove a DB handler from cache. - * - * @param container - Container data. - * @param conf - Configuration. - */ - public static void removeDB(KeyValueContainerData container, Configuration - conf) { - Preconditions.checkNotNull(container); - ContainerCache cache = ContainerCache.getInstance(conf); - Preconditions.checkNotNull(cache); - cache.removeDB(container.getContainerID()); - } - - /** - * Shutdown all DB Handles. - * - * @param cache - Cache for DB Handles. - */ - @SuppressWarnings("unchecked") - public static void shutdownCache(ContainerCache cache) { - cache.shutdownCache(); - } - - /** - * Parses the {@link BlockData} from a bytes array. - * - * @param bytes Block data in bytes. - * @return Block data. - * @throws IOException if the bytes array is malformed or invalid. - */ - public static BlockData getBlockData(byte[] bytes) throws IOException { - try { - ContainerProtos.BlockData blockData = ContainerProtos.BlockData.parseFrom( - bytes); - BlockData data = BlockData.getFromProtoBuf(blockData); - return data; - } catch (IOException e) { - throw new StorageContainerException("Failed to parse block data from " + - "the bytes array.", NO_SUCH_BLOCK); - } - } - - /** - * Returns putBlock response success. - * @param msg - Request. - * @return Response. - */ - public static ContainerCommandResponseProto putBlockResponseSuccess( - ContainerCommandRequestProto msg, long blockLength) { - GetCommittedBlockLengthResponseProto.Builder - committedBlockLengthResponseBuilder = - getCommittedBlockLengthResponseBuilder(blockLength, - msg.getPutBlock().getBlockData().getBlockID()); - PutBlockResponseProto.Builder putKeyResponse = - PutBlockResponseProto.newBuilder(); - putKeyResponse - .setCommittedBlockLength(committedBlockLengthResponseBuilder); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setPutBlock(putKeyResponse); - return builder.build(); - } - /** - * Returns successful blockResponse. - * @param msg - Request. - * @return Response. - */ - public static ContainerCommandResponseProto getBlockResponseSuccess( - ContainerCommandRequestProto msg) { - return ContainerUtils.getSuccessResponse(msg); - } - - - public static ContainerCommandResponseProto getBlockDataResponse( - ContainerCommandRequestProto msg, BlockData data) { - GetBlockResponseProto.Builder getBlock = ContainerProtos - .GetBlockResponseProto - .newBuilder(); - getBlock.setBlockData(data.getProtoBufMessage()); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setGetBlock(getBlock); - return builder.build(); - } - - /** - * Returns successful getCommittedBlockLength Response. - * @param msg - Request. - * @return Response. - */ - public static ContainerCommandResponseProto getBlockLengthResponse( - ContainerCommandRequestProto msg, long blockLength) { - GetCommittedBlockLengthResponseProto.Builder - committedBlockLengthResponseBuilder = - getCommittedBlockLengthResponseBuilder(blockLength, - msg.getGetCommittedBlockLength().getBlockID()); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder); - return builder.build(); - } - - private static GetCommittedBlockLengthResponseProto.Builder - getCommittedBlockLengthResponseBuilder(long blockLength, - ContainerProtos.DatanodeBlockID blockID) { - ContainerProtos.GetCommittedBlockLengthResponseProto.Builder - getCommittedBlockLengthResponseBuilder = ContainerProtos. - GetCommittedBlockLengthResponseProto.newBuilder(); - getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength); - getCommittedBlockLengthResponseBuilder.setBlockID(blockID); - return getCommittedBlockLengthResponseBuilder; - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
