http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java deleted file mode 100644 index ec5a4c9..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java +++ /dev/null @@ -1,577 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache.impl; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Longs; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; -import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; -import org.apache.hadoop.utils.LevelDBStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.FileStore; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_DISK_CACHE_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_TRACE_IO_DEFAULT; - -/** - * A local cache used by the CBlock ISCSI server. This class is enabled or - * disabled via config settings. - */ -public class CBlockLocalCache implements CacheModule { - private static final Logger LOG = - LoggerFactory.getLogger(CBlockLocalCache.class); - private static final Logger TRACER = - LoggerFactory.getLogger("TraceIO"); - - private final Configuration conf; - /** - * LevelDB cache file. - */ - private final LevelDBStore cacheDB; - - /** - * AsyncBlock writer updates the cacheDB and writes the blocks async to - * remote containers. - */ - private final AsyncBlockWriter blockWriter; - - /** - * Sync block reader tries to read from the cache and if we get a cache - * miss we will fetch the block from remote location. It will asynchronously - * update the cacheDB. - */ - private final SyncBlockReader blockReader; - private final String userName; - private final String volumeName; - - /** - * From a block ID we are able to get the pipeline by indexing this array. - */ - private final Pipeline[] containerList; - private final int blockSize; - private XceiverClientManager clientManager; - /** - * If this flag is enabled then cache traces all I/O, all reads and writes - * are visible in the log with sha of the block written. Makes the system - * slower use it only for debugging or creating trace simulations. - */ - private final boolean traceEnabled; - private final boolean enableShortCircuitIO; - private final long volumeSize; - private long currentCacheSize; - private File dbPath; - private final ContainerCacheFlusher flusher; - private CBlockTargetMetrics cblockTargetMetrics; - - /** - * Get Db Path. - * @return the file instance of the db. - */ - public File getDbPath() { - return dbPath; - } - - /** - * Constructor for CBlockLocalCache invoked via the builder. - * - * @param conf - Configuration - * @param volumeName - volume Name - * @param userName - user name - * @param containerPipelines - Pipelines that make up this contianer - * @param blockSize - blockSize - * @param flusher - flusher to flush data to container - * @throws IOException - */ - CBlockLocalCache( - Configuration conf, String volumeName, - String userName, List<Pipeline> containerPipelines, int blockSize, - long volumeSize, ContainerCacheFlusher flusher) throws IOException { - this.conf = conf; - this.userName = userName; - this.volumeName = volumeName; - this.blockSize = blockSize; - this.flusher = flusher; - this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO, - DFS_CBLOCK_TRACE_IO_DEFAULT); - this.enableShortCircuitIO = conf.getBoolean( - DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, - DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT); - dbPath = Paths.get(conf.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY, - DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT), userName, volumeName).toFile(); - - if (!dbPath.exists() && !dbPath.mkdirs()) { - LOG.error("Unable to create the cache paths. Path: {}", dbPath); - throw new IllegalArgumentException("Unable to create paths. Path: " + - dbPath); - } - cacheDB = flusher.getCacheDB(dbPath.toString()); - this.containerList = containerPipelines.toArray(new - Pipeline[containerPipelines.size()]); - this.volumeSize = volumeSize; - - blockWriter = new AsyncBlockWriter(conf, this); - blockReader = new SyncBlockReader(conf, this); - if (this.traceEnabled) { - getTracer().info("Task=StartingCache"); - } - } - - private void setClientManager(XceiverClientManager manager) { - this.clientManager = manager; - } - - private void setCblockTargetMetrics(CBlockTargetMetrics targetMetrics) { - this.cblockTargetMetrics = targetMetrics; - } - - /** - * Returns new builder class that builds a CBlockLocalCache. - * - * @return Builder - */ - public static Builder newBuilder() { - return new Builder(); - } - - public void processDirtyMessage(String fileName) { - flusher.processDirtyBlocks(dbPath.toString(), fileName); - } - - /** - * Get usable disk space. - * - * @param dbPathString - Path to db - * @return long bytes remaining. - */ - private static long getRemainingDiskSpace(String dbPathString) { - try { - URI fileUri = new URI("file:///"); - Path dbPath = Paths.get(fileUri).resolve(dbPathString); - FileStore disk = Files.getFileStore(dbPath); - return disk.getUsableSpace(); - } catch (URISyntaxException | IOException ex) { - LOG.error("Unable to get free space on for path :" + dbPathString); - } - return 0L; - } - - /** - * Returns the Max current CacheSize. - * - * @return - Cache Size - */ - public long getCurrentCacheSize() { - return currentCacheSize; - } - - /** - * Sets the Maximum Cache Size. - * - * @param currentCacheSize - Max current Cache Size. - */ - public void setCurrentCacheSize(long currentCacheSize) { - this.currentCacheSize = currentCacheSize; - } - - /** - * True if block tracing is enabled. - * - * @return - bool - */ - public boolean isTraceEnabled() { - return traceEnabled; - } - - /** - * Checks if Short Circuit I/O is enabled. - * - * @return - true if it is enabled. - */ - public boolean isShortCircuitIOEnabled() { - return enableShortCircuitIO; - } - - /** - * Returns the default block size of this device. - * - * @return - int - */ - public int getBlockSize() { - return blockSize; - } - - /** - * Gets the client manager. - * - * @return XceiverClientManager - */ - public XceiverClientManager getClientManager() { - return clientManager; - } - - /** - * check if the key is cached, if yes, returned the cached object. - * otherwise, load from data source. Then put it into cache. - * - * @param blockID - * @return the block associated to the blockID - */ - @Override - public LogicalBlock get(long blockID) throws IOException { - cblockTargetMetrics.incNumReadOps(); - return blockReader.readBlock(blockID); - } - - /** - * put the value of the key into cache and remote container. - * - * @param blockID - BlockID - * @param data - byte[] - */ - @Override - public void put(long blockID, byte[] data) throws IOException { - cblockTargetMetrics.incNumWriteOps(); - LogicalBlock block = new DiskBlock(blockID, data, false); - blockWriter.writeBlock(block); - } - - @Override - public void flush() throws IOException { - - } - - @Override - public void start() throws IOException { - flusher.register(getDbPath().getPath(), containerList); - blockWriter.start(); - } - - @Override - public void stop() throws IOException { - } - - @Override - public void close() throws IOException { - blockReader.shutdown(); - blockWriter.shutdown(); - this.flusher.releaseCacheDB(dbPath.toString()); - if (this.traceEnabled) { - getTracer().info("Task=ShutdownCache"); - } - } - - /** - * Returns true if cache still has blocks pending to write. - * - * @return false if we have no pending blocks to write. - */ - @Override - public boolean isDirtyCache() { - return false; - } - - /** - * Returns the local cache DB. - * - * @return - DB - */ - LevelDBStore getCacheDB() { - return this.cacheDB; - } - - /** - * Returns the current userName. - * - * @return - UserName - */ - String getUserName() { - return this.userName; - } - - /** - * Returns the volume name. - * - * @return VolumeName. - */ - String getVolumeName() { - return this.volumeName; - } - - /** - * Returns the target metrics. - * - * @return CBlock Target Metrics. - */ - CBlockTargetMetrics getTargetMetrics() { - return this.cblockTargetMetrics; - } - - /** - * Returns the pipeline to use given a container. - * - * @param blockId - blockID - * @return - pipeline. - */ - Pipeline getPipeline(long blockId) { - int containerIdx = (int) blockId % containerList.length; - long cBlockIndex = - Longs.fromByteArray(containerList[containerIdx].getData()); - if (cBlockIndex > 0) { - // This catches the case when we get a wrong container in the ordering - // of the containers. - Preconditions.checkState(containerIdx % cBlockIndex == 0, - "The container ID computed should match with the container index " + - "returned from cBlock Server."); - } - return containerList[containerIdx]; - } - - String getTraceID(long blockID) { - return flusher.getTraceID(dbPath, blockID); - } - - /** - * Returns tracer. - * - * @return - Logger - */ - Logger getTracer() { - return TRACER; - } - - /** - * Builder class for CBlocklocalCache. - */ - public static class Builder { - private Configuration configuration; - private String userName; - private String volumeName; - private List<Pipeline> pipelines; - private XceiverClientManager clientManager; - private int blockSize; - private long volumeSize; - private ContainerCacheFlusher flusher; - private CBlockTargetMetrics metrics; - - /** - * Ctor. - */ - Builder() { - } - - /** - * Computes a cache size based on the configuration and available disk - * space. - * - * @param configuration - Config - * @param volumeSize - Size of Volume - * @param blockSize - Size of the block - * @return - cache size in bytes. - */ - private static long computeCacheSize(Configuration configuration, - long volumeSize, int blockSize) { - long cacheSize = 0; - String dbPath = configuration.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY, - DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT); - if (StringUtils.isBlank(dbPath)) { - return cacheSize; - } - long spaceRemaining = getRemainingDiskSpace(dbPath); - double cacheRatio = 1.0; - - if (spaceRemaining < volumeSize) { - cacheRatio = (double)spaceRemaining / volumeSize; - } - - // if cache is going to be at least 10% of the volume size it is worth - // doing, otherwise skip creating the cache. - if (cacheRatio >= 0.10) { - cacheSize = Double.doubleToLongBits(volumeSize * cacheRatio); - } - return cacheSize; - } - - /** - * Sets the Config to be used by this cache. - * - * @param conf - Config - * @return Builder - */ - public Builder setConfiguration(Configuration conf) { - this.configuration = conf; - return this; - } - - /** - * Sets the user name who is the owner of this volume. - * - * @param user - name of the owner, please note this is not the current - * user name. - * @return - Builder - */ - public Builder setUserName(String user) { - this.userName = user; - return this; - } - - /** - * Sets the VolumeName. - * - * @param volume - Name of the volume - * @return Builder - */ - public Builder setVolumeName(String volume) { - this.volumeName = volume; - return this; - } - - /** - * Sets the Pipelines that form this volume. - * - * @param pipelineList - list of pipelines - * @return Builder - */ - public Builder setPipelines(List<Pipeline> pipelineList) { - this.pipelines = pipelineList; - return this; - } - - /** - * Sets the Client Manager that manages the communication with containers. - * - * @param xceiverClientManager - clientManager. - * @return - Builder - */ - public Builder setClientManager(XceiverClientManager xceiverClientManager) { - this.clientManager = xceiverClientManager; - return this; - } - - /** - * Sets the block size -- Typical sizes are 4KB, 8KB etc. - * - * @param size - BlockSize. - * @return - Builder - */ - public Builder setBlockSize(int size) { - this.blockSize = size; - return this; - } - - /** - * Sets the volumeSize. - * - * @param size - VolumeSize - * @return - Builder - */ - public Builder setVolumeSize(long size) { - this.volumeSize = size; - return this; - } - - /** - * Set flusher. - * @param containerCacheFlusher - cache Flusher - * @return Builder. - */ - public Builder setFlusher(ContainerCacheFlusher containerCacheFlusher) { - this.flusher = containerCacheFlusher; - return this; - } - - /** - * Sets the cblock Metrics. - * - * @param targetMetrics - CBlock Target Metrics - * @return - Builder - */ - public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) { - this.metrics = targetMetrics; - return this; - } - - /** - * Constructs a CBlockLocalCache. - * - * @return the CBlockLocalCache with the preset properties. - * @throws IOException - */ - public CBlockLocalCache build() throws IOException { - Preconditions.checkNotNull(this.configuration, "A valid configuration " + - "is needed"); - Preconditions.checkState(StringUtils.isNotBlank(userName), "A valid " + - "username is needed"); - Preconditions.checkState(StringUtils.isNotBlank(volumeName), " A valid" + - " volume name is needed"); - Preconditions.checkNotNull(this.pipelines, "Pipelines cannot be null"); - Preconditions.checkState(this.pipelines.size() > 0, "At least one " + - "pipeline location is needed for a volume"); - - for (int x = 0; x < pipelines.size(); x++) { - Preconditions.checkNotNull(pipelines.get(x).getData(), "cBlock " + - "relies on private data on the pipeline, null data found."); - } - - Preconditions.checkNotNull(clientManager, "Client Manager cannot be " + - "null"); - Preconditions.checkState(blockSize > 0, " Block size has to be a " + - "number greater than 0"); - - Preconditions.checkState(volumeSize > 0, "Volume Size cannot be less " + - "than 1"); - Preconditions.checkNotNull(this.flusher, "Flusher cannot be null."); - - CBlockLocalCache cache = new CBlockLocalCache(this.configuration, - this.volumeName, this.userName, this.pipelines, blockSize, - volumeSize, flusher); - cache.setCblockTargetMetrics(this.metrics); - cache.setClientManager(this.clientManager); - - // TODO : Support user configurable maximum size. - long cacheSize = computeCacheSize(this.configuration, this.volumeSize, - this.blockSize); - cache.setCurrentCacheSize(cacheSize); - return cache; - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java deleted file mode 100644 index 26c174f..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache.impl; - -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import java.nio.ByteBuffer; - -/** - * Impl class for LogicalBlock. - */ -public class DiskBlock implements LogicalBlock { - private ByteBuffer data; - private long blockID; - private boolean persisted; - - /** - * Constructs a DiskBlock Class from the following params. - * @param blockID - 64-bit block ID - * @param data - Byte Array - * @param persisted - Flag which tells us if this is persisted to remote - */ - public DiskBlock(long blockID, byte[] data, boolean persisted) { - if (data !=null) { - this.data = ByteBuffer.wrap(data); - } - this.blockID = blockID; - this.persisted = persisted; - } - - @Override - public ByteBuffer getData() { - return data; - } - - /** - * Frees the byte buffer since we don't need it any more. - */ - @Override - public void clearData() { - data.clear(); - } - - @Override - public long getBlockID() { - return blockID; - } - - @Override - public boolean isPersisted() { - return persisted; - } - - /** - * Sets the value of persisted. - * @param value - True if this has been persisted to container, false - * otherwise. - */ - public void setPersisted(boolean value) { - persisted = value; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java deleted file mode 100644 index 557b201..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache.impl; - -import com.google.common.primitives.Longs; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; -import org.apache.hadoop.utils.LevelDBStore; -import org.iq80.leveldb.DBException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * Reads blocks from the container via the local cache. - */ -public class SyncBlockReader { - private static final Logger LOG = - LoggerFactory.getLogger(SyncBlockReader.class); - - /** - * Update Queue - The reason why we have the queue is that we want to - * return the block as soon as we read it from the containers. This queue - * is work queue which will take the read block and update the cache. - * During testing we found levelDB is slow during writes, hence we wanted - * to return as block as soon as possible and update levelDB asynchronously. - */ - private final static int QUEUE_SIZE = 1024; - /** - * Config. - */ - private final Configuration conf; - /** - * The parent cache this reader is operating against. - */ - private final CBlockLocalCache parentCache; - private final BlockingQueue<Runnable> updateQueue; - - /** - * executor is used for running LevelDB updates. In future, we might do - * read-aheads and this pool is useful for that too. The reason why we - * don't share an executor for reads and writes is because the write task - * is couple of magnitude slower than read task. So we don't want the - * update DB to queue up behind the writes. - */ - private final ThreadPoolExecutor executor; - - /** - * Number of threads that pool starts with. - */ - private final int corePoolSize = 1; - /** - * Maximum number of threads our pool will ever create. - */ - private final int maxPoolSize = 10; - /** - * The idle time a thread hangs around waiting for work. if we don't find - * new work in 60 seconds the worker thread is killed. - */ - private final long keepAlive = 60L; - - /** - * Constructs a SyncBlock reader. - * - * @param conf - Configuration - * @param cache - Cache - */ - public SyncBlockReader(Configuration conf, CBlockLocalCache cache) { - this.conf = conf; - this.parentCache = cache; - updateQueue = new ArrayBlockingQueue<>(QUEUE_SIZE, true); - ThreadFactory workerThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("SyncBlockReader Thread #%d").setDaemon(true).build(); - executor = new HadoopThreadPoolExecutor( - corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, - updateQueue, workerThreadFactory, - new ThreadPoolExecutor.CallerRunsPolicy()); - } - - /** - * Returns the cache DB. - * - * @return LevelDB - */ - private LevelDBStore getCacheDB() { - return parentCache.getCacheDB(); - } - - /** - * Returns data from the local cache if found, else reads from the remote - * container. - * - * @param blockID - blockID - * @return LogicalBlock - */ - LogicalBlock readBlock(long blockID) throws IOException { - XceiverClientSpi client = null; - byte[] data = getblockFromDB(blockID); - if (data != null) { - parentCache.getTargetMetrics().incNumReadCacheHits(); - return new DiskBlock(blockID, data, false); - } - - parentCache.getTargetMetrics().incNumReadCacheMiss(); - try { - client = parentCache.getClientManager() - .acquireClient(parentCache.getPipeline(blockID)); - LogicalBlock block = getBlockFromContainer(blockID, client); - return block; - } catch (Exception ex) { - parentCache.getTargetMetrics().incNumFailedReadBlocks(); - LOG.error("read failed for BlockId: {}", blockID, ex); - throw ex; - } finally { - if (client != null) { - parentCache.getClientManager().releaseClient(client); - } - } - } - - /** - * Gets data from the DB if it exists. - * - * @param blockID - block id - * @return data - */ - private byte[] getblockFromDB(long blockID) { - try { - if(parentCache.isShortCircuitIOEnabled()) { - long startTime = Time.monotonicNow(); - byte[] data = getCacheDB().get(Longs.toByteArray(blockID)); - long endTime = Time.monotonicNow(); - - if (parentCache.isTraceEnabled()) { - parentCache.getTracer().info( - "Task=ReadTaskDBRead,BlockID={},SHA={},Time={}", - blockID, (data != null && data.length > 0) - ? DigestUtils.sha256Hex(data) : null, - endTime - startTime); - } - parentCache.getTargetMetrics().updateDBReadLatency( - endTime - startTime); - return data; - } - - - } catch (DBException dbe) { - LOG.error("Error while reading from cacheDB.", dbe); - throw dbe; - } - return null; - } - - - /** - * Returns a block from a Remote Container. if the key is not found on a - * remote container we just return a block initialzied with zeros. - * - * @param blockID - blockID - * @param client - client - * @return LogicalBlock - * @throws IOException - */ - private LogicalBlock getBlockFromContainer(long blockID, - XceiverClientSpi client) throws IOException { - String containerName = parentCache.getPipeline(blockID).getContainerName(); - try { - long startTime = Time.monotonicNow(); - ContainerProtos.GetSmallFileResponseProto response = - ContainerProtocolCalls.readSmallFile(client, containerName, - Long.toString(blockID), parentCache.getTraceID(blockID)); - long endTime = Time.monotonicNow(); - if (parentCache.isTraceEnabled()) { - parentCache.getTracer().info( - "Task=ReadTaskContainerRead,BlockID={},SHA={},Time={}", - blockID, response.getData().getData().toByteArray().length > 0 ? - DigestUtils.sha256Hex(response.getData() - .getData().toByteArray()) : null, endTime - startTime); - } - - parentCache.getTargetMetrics().updateContainerReadLatency( - endTime - startTime); - DiskBlock block = new DiskBlock(blockID, - response.getData().getData().toByteArray(), false); - - if(parentCache.isShortCircuitIOEnabled()) { - queueUpdateTask(block); - } - - return block; - } catch (IOException ex) { - if (ex instanceof StorageContainerException) { - parentCache.getTargetMetrics().incNumReadLostBlocks(); - StorageContainerException sce = (StorageContainerException) ex; - if (sce.getResult() == ContainerProtos.Result.NO_SUCH_KEY || - sce.getResult() == ContainerProtos.Result.IO_EXCEPTION) { - return new DiskBlock(blockID, new byte[parentCache.getBlockSize()], - false); - } - } - throw ex; - } - } - - /** - * Updates the cache with the block that we just read. - * - * @param block - */ - private void queueUpdateTask(final DiskBlock block) { - Runnable updateTask = () -> { - if(block.getData().array().length > 0) { - getCacheDB().put(Longs.toByteArray(block.getBlockID()), - block.getData().array()); - block.setPersisted(true); - } else { - LOG.error("Refusing to update the a null block in the local cache."); - } - }; - if (this.executor.isShutdown() || this.executor.isTerminated()) { - LOG.error("Thread executor is not running."); - } else { - this.executor.submit(updateTask); - } - } - - /** - * This is a read operation, we don't care if we updated the cache with the - * last block e read. - */ - void shutdown() { - this.executor.shutdownNow(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java deleted file mode 100644 index dfac110..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache.impl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java deleted file mode 100644 index 47f76b8..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper.cache; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java deleted file mode 100644 index 85f8d6f..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/DynamicProvisioner.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/DynamicProvisioner.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/DynamicProvisioner.java deleted file mode 100644 index e21966b..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/DynamicProvisioner.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.cblock.kubernetes; - -import com.google.gson.reflect.TypeToken; -import com.squareup.okhttp.RequestBody; -import io.kubernetes.client.ApiClient; -import io.kubernetes.client.ApiException; -import io.kubernetes.client.Configuration; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1ISCSIVolumeSource; -import io.kubernetes.client.models.V1ObjectMeta; -import io.kubernetes.client.models.V1ObjectReference; -import io.kubernetes.client.models.V1PersistentVolume; -import io.kubernetes.client.models.V1PersistentVolumeClaim; -import io.kubernetes.client.models.V1PersistentVolumeSpec; -import io.kubernetes.client.util.Config; -import io.kubernetes.client.util.Watch; -import okio.Buffer; - -import org.apache.hadoop.cblock.CblockUtils; -import org.apache.hadoop.cblock.exception.CBlockException; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.storage.StorageManager; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_IP; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_PORT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_KUBERNETES_CBLOCK_USER; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY; - -/** - * Kubernetes Dynamic Persistent Volume provisioner. - * - * Listens on the kubernetes feed and creates the appropriate cblock AND - * kubernetes PersistentVolume according to the created PersistentVolumeClaims. - */ -public class DynamicProvisioner implements Runnable{ - - protected static final Logger LOGGER = - LoggerFactory.getLogger(DynamicProvisioner.class); - - private static final String STORAGE_CLASS = "cblock"; - - private static final String PROVISIONER_ID = "hadoop.apache.org/cblock"; - private static final String KUBERNETES_PROVISIONER_KEY = - "volume.beta.kubernetes.io/storage-provisioner"; - private static final String KUBERNETES_BIND_COMPLETED_KEY = - "pv.kubernetes.io/bind-completed"; - - private boolean running = true; - - private final StorageManager storageManager; - - private String kubernetesConfigFile; - - private String externalIp; - - private int externalPort; - - private String cblockUser; - - private CoreV1Api api; - - private ApiClient client; - - private Thread watcherThread; - - public DynamicProvisioner(OzoneConfiguration ozoneConf, - StorageManager storageManager) throws IOException { - this.storageManager = storageManager; - - kubernetesConfigFile = ozoneConf - .getTrimmed(DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY); - - String jscsiServerAddress = ozoneConf - .get(DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY, - DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT); - - externalIp = ozoneConf. - getTrimmed(DFS_CBLOCK_ISCSI_ADVERTISED_IP, jscsiServerAddress); - - externalPort = ozoneConf. - getInt(DFS_CBLOCK_ISCSI_ADVERTISED_PORT, - DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT); - - cblockUser = ozoneConf.getTrimmed(DFS_CBLOCK_KUBERNETES_CBLOCK_USER, - DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT); - - - } - - public void init() throws IOException { - if (kubernetesConfigFile != null) { - client = Config.fromConfig(kubernetesConfigFile); - } else { - client = Config.fromCluster(); - } - client.getHttpClient().setReadTimeout(60, TimeUnit.SECONDS); - Configuration.setDefaultApiClient(client); - api = new CoreV1Api(); - - watcherThread = new Thread(this); - watcherThread.setName("DynamicProvisioner"); - watcherThread.setDaemon(true); - } - - @Override - public void run() { - LOGGER.info("Starting kubernetes dynamic provisioner."); - while (running) { - String resourceVersion = null; - try { - - Watch<V1PersistentVolumeClaim> watch = Watch.createWatch(client, - api.listPersistentVolumeClaimForAllNamespacesCall(null, - null, - false, - null, - null, - null, - resourceVersion, - null, - true, - null, - null), - new TypeToken<Watch.Response<V1PersistentVolumeClaim>>() { - }.getType()); - - - //check the new pvc resources, and create cblock + pv if needed - for (Watch.Response<V1PersistentVolumeClaim> item : watch) { - V1PersistentVolumeClaim claim = item.object; - - if (isPvMissingForPvc(claim)) { - - LOGGER.info("Provisioning volumes for PVC {}/{}", - claim.getMetadata().getNamespace(), - claim.getMetadata().getName()); - - if (LOGGER.isDebugEnabled()) { - RequestBody request = - api.getApiClient().serialize(claim, "application/json"); - - final Buffer buffer = new Buffer(); - request.writeTo(buffer); - LOGGER.debug("New PVC is detected: " + buffer.readUtf8()); - } - - String volumeName = createVolumeName(claim); - - long size = CblockUtils.parseSize( - claim.getSpec().getResources().getRequests().get("storage")); - - createCBlock(volumeName, size); - createPersistentVolumeFromPVC(item.object, volumeName); - } - } - } catch (Exception ex) { - if (ex.getCause() != null && ex - .getCause() instanceof SocketTimeoutException) { - //This is normal. We are connection to the kubernetes server and the - //connection should be reopened time to time... - LOGGER.debug("Time exception occured", ex); - } else { - LOGGER.error("Error on provisioning persistent volumes.", ex); - try { - //we can try again in the main loop - Thread.sleep(1000); - } catch (InterruptedException e) { - LOGGER.error("Error on sleeping after an error.", e); - } - } - } - } - } - - private boolean isPvMissingForPvc(V1PersistentVolumeClaim claim) { - - Map<String, String> annotations = claim.getMetadata().getAnnotations(); - - return claim.getStatus().getPhase().equals("Pending") && STORAGE_CLASS - .equals(claim.getSpec().getStorageClassName()) && PROVISIONER_ID - .equals(annotations.get(KUBERNETES_PROVISIONER_KEY)) && !"yes" - .equals(annotations.get(KUBERNETES_BIND_COMPLETED_KEY)); - } - - @VisibleForTesting - protected String createVolumeName(V1PersistentVolumeClaim claim) { - return claim.getMetadata().getName() + "-" + claim.getMetadata() - .getUid(); - } - - public void stop() { - running = false; - try { - watcherThread.join(60000); - } catch (InterruptedException e) { - LOGGER.error("Kubernetes watcher thread can't stopped gracefully.", e); - } - } - - private void createCBlock(String volumeName, long size) - throws CBlockException { - - MountVolumeResponse mountVolumeResponse = - storageManager.isVolumeValid(cblockUser, volumeName); - if (!mountVolumeResponse.getIsValid()) { - storageManager - .createVolume(cblockUser, volumeName, size, 4 * 1024); - } - } - - private void createPersistentVolumeFromPVC(V1PersistentVolumeClaim claim, - String volumeName) throws ApiException, IOException { - - V1PersistentVolume v1PersistentVolume = - persitenceVolumeBuilder(claim, volumeName); - - if (LOGGER.isDebugEnabled()) { - RequestBody request = - api.getApiClient().serialize(v1PersistentVolume, "application/json"); - - final Buffer buffer = new Buffer(); - request.writeTo(buffer); - LOGGER.debug("Creating new PV: " + buffer.readUtf8()); - } - api.createPersistentVolume(v1PersistentVolume, null); - } - - protected V1PersistentVolume persitenceVolumeBuilder( - V1PersistentVolumeClaim claim, - String volumeName) { - - V1PersistentVolume v1PersistentVolume = new V1PersistentVolume(); - v1PersistentVolume.setKind("PersistentVolume"); - v1PersistentVolume.setApiVersion("v1"); - - V1ObjectMeta metadata = new V1ObjectMeta(); - metadata.setName(volumeName); - metadata.setNamespace(claim.getMetadata().getNamespace()); - metadata.setAnnotations(new HashMap<>()); - - metadata.getAnnotations() - .put("pv.kubernetes.io/provisioned-by", PROVISIONER_ID); - - metadata.getAnnotations() - .put("volume.beta.kubernetes.io/storage-class", STORAGE_CLASS); - - v1PersistentVolume.setMetadata(metadata); - - V1PersistentVolumeSpec spec = new V1PersistentVolumeSpec(); - - spec.setCapacity(new HashMap<>()); - spec.getCapacity().put("storage", - claim.getSpec().getResources().getRequests().get("storage")); - - spec.setAccessModes(new ArrayList<>()); - spec.getAccessModes().add("ReadWriteOnce"); - - V1ObjectReference claimRef = new V1ObjectReference(); - claimRef.setName(claim.getMetadata().getName()); - claimRef.setNamespace(claim.getMetadata().getNamespace()); - claimRef.setKind(claim.getKind()); - claimRef.setApiVersion(claim.getApiVersion()); - claimRef.setUid(claim.getMetadata().getUid()); - spec.setClaimRef(claimRef); - - spec.persistentVolumeReclaimPolicy("Delete"); - - V1ISCSIVolumeSource iscsi = new V1ISCSIVolumeSource(); - iscsi.setIqn(cblockUser + ":" + volumeName); - iscsi.setLun(0); - iscsi.setFsType("ext4"); - String portal = externalIp + ":" + externalPort; - iscsi.setTargetPortal(portal); - iscsi.setPortals(new ArrayList<>()); - iscsi.getPortals().add(portal); - - spec.iscsi(iscsi); - v1PersistentVolume.setSpec(spec); - return v1PersistentVolume; - } - - - @VisibleForTesting - protected CoreV1Api getApi() { - return api; - } - - public void start() { - watcherThread.start(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/package-info.java deleted file mode 100644 index 3ec5aab..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/kubernetes/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains helper classes to run hadoop cluster in kubernetes - * environment. - */ -package org.apache.hadoop.cblock.kubernetes; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/ContainerDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/ContainerDescriptor.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/ContainerDescriptor.java deleted file mode 100644 index 2c31224..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/ContainerDescriptor.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.meta; - -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; - -/** - * - * The internal representation of a container maintained by CBlock server. - * Include enough information to exactly identify a container for read/write - * operation. - * - * NOTE that this class is work-in-progress. Depends on HDFS-7240 container - * implementation. Currently only to allow testing. - */ -public class ContainerDescriptor { - private final String containerID; - // the index of this container with in a volume - // on creation, there may be no way to know the index of the container - // as it is a volume specific information - private int containerIndex; - private Pipeline pipeline; - - public ContainerDescriptor(String containerID) { - this.containerID = containerID; - } - - public ContainerDescriptor(String containerID, int containerIndex) { - this.containerID = containerID; - this.containerIndex = containerIndex; - } - - public void setContainerIndex(int idx) { - this.containerIndex = idx; - } - - public String getContainerID() { - return containerID; - } - - public void setPipeline(Pipeline pipeline) { - this.pipeline = pipeline; - } - - public Pipeline getPipeline() { - return pipeline; - } - - public int getContainerIndex() { - return containerIndex; - } - - public long getUtilization() { - return 0; - } - - public CBlockClientServerProtocolProtos.ContainerIDProto toProtobuf() { - CBlockClientServerProtocolProtos.ContainerIDProto.Builder builder = - CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder(); - builder.setContainerID(containerID); - builder.setIndex(containerIndex); - if (pipeline != null) { - builder.setPipeline(pipeline.getProtobufMessage()); - } - return builder.build(); - } - - public static ContainerDescriptor fromProtobuf(byte[] data) - throws InvalidProtocolBufferException { - CBlockClientServerProtocolProtos.ContainerIDProto id = - CBlockClientServerProtocolProtos.ContainerIDProto.parseFrom(data); - return new ContainerDescriptor(id.getContainerID(), - (int)id.getIndex()); - } - - @Override - public int hashCode() { - return containerID.hashCode()*37 + containerIndex; - } - - @Override - public boolean equals(Object o) { - if (o != null && o instanceof ContainerDescriptor) { - ContainerDescriptor other = (ContainerDescriptor)o; - return containerID.equals(other.containerID) && - containerIndex == other.containerIndex; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java deleted file mode 100644 index 930741d..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.meta; - -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * The internal representation maintained by CBlock server as the info for - * a volume. Contains the list of containers belonging to this volume. - * - * Many methods of this class is made such that the volume information ( - * including container list) can be easily transformed into a Json string - * that can be stored/parsed from a persistent store for cblock server - * persistence. - * - * This class is still work-in-progress. - */ -public class VolumeDescriptor { - // The main data structure is the container location map - // other thing are mainly just information - - // since only one operation at a time is allowed, no - // need to consider concurrency control here - - // key is container id - - private static final Logger LOG = - LoggerFactory.getLogger(VolumeDescriptor.class); - - private ConcurrentHashMap<String, ContainerDescriptor> containerMap; - private String userName; - private int blockSize; - private long volumeSize; - private String volumeName; - // this is essentially the ordered keys of containerMap - // which is kind of redundant information. But since we - // are likely to access it frequently based on ordering. - // keeping this copy to avoid having to sort the key every - // time - private List<String> containerIdOrdered; - - /** - * This is not being called explicitly, but this is necessary as - * it will be called by the parse method implicitly when - * reconstructing the object from json string. The get*() methods - * and set*() methods are for the same purpose also. - */ - public VolumeDescriptor() { - this(null, null, 0, 0); - } - - public VolumeDescriptor(String userName, String volumeName, long volumeSize, - int blockSize) { - this.containerMap = new ConcurrentHashMap<>(); - this.userName = userName; - this.volumeName = volumeName; - this.blockSize = blockSize; - this.volumeSize = volumeSize; - this.containerIdOrdered = new LinkedList<>(); - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public String getVolumeName() { - return volumeName; - } - - public void setVolumeName(String volumeName) { - this.volumeName = volumeName; - } - - public long getVolumeSize() { - return volumeSize; - } - - public void setVolumeSize(long volumeSize) { - this.volumeSize = volumeSize; - } - - public int getBlockSize() { - return blockSize; - } - - public void setBlockSize(int blockSize) { - this.blockSize = blockSize; - } - - public void setContainerIDs(ArrayList<String> containerIDs) { - containerIdOrdered.addAll(containerIDs); - } - - public void addContainer(ContainerDescriptor containerDescriptor) { - containerMap.put(containerDescriptor.getContainerID(), - containerDescriptor); - } - - - public HashMap<String, Pipeline> getPipelines() { - HashMap<String, Pipeline> pipelines = new HashMap<>(); - for (Map.Entry<String, ContainerDescriptor> entry : - containerMap.entrySet()) { - pipelines.put(entry.getKey(), entry.getValue().getPipeline()); - } - return pipelines; - } - - public boolean isEmpty() { - VolumeInfo info = getInfo(); - return info.getUsage() == 0; - } - - public VolumeInfo getInfo() { - // TODO : need to actually go through all containers of this volume and - // ask for their utilization. - long utilization = 0; - for (Map.Entry<String, ContainerDescriptor> entry : - containerMap.entrySet()) { - utilization += entry.getValue().getUtilization(); - } - return new VolumeInfo(this.userName, this.volumeName, - this.volumeSize, this.blockSize, - utilization * blockSize); - } - - public String[] getContainerIDs() { - //ArrayList<Long> ids = new ArrayList(containerMap.keySet()); - //return ids.toArray(new Long[ids.size()]); - return containerIdOrdered.toArray(new String[containerIdOrdered.size()]); - } - - public List<String> getContainerIDsList() { - return new ArrayList<>(containerIdOrdered); - } - - public List<Pipeline> getContainerPipelines() { - Map<String, Pipeline> tmp = getPipelines(); - List<Pipeline> pipelineList = new LinkedList<>(); - for (String containerIDString : containerIdOrdered) { - pipelineList.add(tmp.get(containerIDString)); - } - return pipelineList; - } - - @Override - public String toString() { - String string = ""; - string += "Username:" + userName + "\n"; - string += "VolumeName:" + volumeName + "\n"; - string += "VolumeSize:" + volumeSize + "\n"; - string += "blockSize:" + blockSize + "\n"; - string += "containerIds:" + containerIdOrdered + "\n"; - string += "containerIdsWithObject:" + containerMap.keySet(); - return string; - } - - public CBlockClientServerProtocolProtos.MountVolumeResponseProto - toProtobuf() { - CBlockClientServerProtocolProtos.MountVolumeResponseProto.Builder volume = - CBlockClientServerProtocolProtos.MountVolumeResponseProto.newBuilder(); - volume.setIsValid(true); - volume.setVolumeName(volumeName); - volume.setUserName(userName); - volume.setVolumeSize(volumeSize); - volume.setBlockSize(blockSize); - for (String containerIDString : containerIdOrdered) { - ContainerDescriptor containerDescriptor = containerMap.get( - containerIDString); - volume.addAllContainerIDs(containerDescriptor.toProtobuf()); - } - return volume.build(); - } - - public static VolumeDescriptor fromProtobuf(byte[] data) - throws InvalidProtocolBufferException { - CBlockClientServerProtocolProtos.MountVolumeResponseProto volume = - CBlockClientServerProtocolProtos.MountVolumeResponseProto - .parseFrom(data); - String userName = volume.getUserName(); - String volumeName = volume.getVolumeName(); - long volumeSize = volume.getVolumeSize(); - int blockSize = volume.getBlockSize(); - VolumeDescriptor volumeDescriptor = new VolumeDescriptor(userName, - volumeName, volumeSize, blockSize); - List<CBlockClientServerProtocolProtos.ContainerIDProto> containers - = volume.getAllContainerIDsList(); - - String[] containerOrdering = new String[containers.size()]; - - for (CBlockClientServerProtocolProtos.ContainerIDProto containerProto : - containers) { - ContainerDescriptor containerDescriptor = new ContainerDescriptor( - containerProto.getContainerID(), - (int)containerProto.getIndex()); - if(containerProto.hasPipeline()) { - containerDescriptor.setPipeline( - Pipeline.getFromProtoBuf(containerProto.getPipeline())); - } - volumeDescriptor.addContainer(containerDescriptor); - containerOrdering[containerDescriptor.getContainerIndex()] = - containerDescriptor.getContainerID(); - } - volumeDescriptor.setContainerIDs( - new ArrayList<>(Arrays.asList(containerOrdering))); - return volumeDescriptor; - } - - @Override - public int hashCode() { - return userName.hashCode()*37 + volumeName.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (o != null && o instanceof VolumeDescriptor) { - VolumeDescriptor other = (VolumeDescriptor)o; - if (!userName.equals(other.getUserName()) || - !volumeName.equals(other.getVolumeName()) || - volumeSize != other.getVolumeSize() || - blockSize != other.getBlockSize()) { - return false; - } - if (containerIdOrdered.size() != other.containerIdOrdered.size() || - containerMap.size() != other.containerMap.size()) { - return false; - } - for (int i = 0; i<containerIdOrdered.size(); i++) { - if (!containerIdOrdered.get(i).equals( - other.containerIdOrdered.get(i))) { - return false; - } - } - return containerMap.equals(other.containerMap); - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeInfo.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeInfo.java deleted file mode 100644 index 7f50c41..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/VolumeInfo.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.meta; - -/** - * A wrapper class that represents the information about a volume. Used in - * communication between CBlock client and CBlock server only. - */ -public class VolumeInfo { - private final String userName; - private final String volumeName; - private final long volumeSize; - private final long blockSize; - private final long usage; - - public VolumeInfo(String userName, String volumeName, long volumeSize, - long blockSize, long usage) { - this.userName = userName; - this.volumeName = volumeName; - this.volumeSize = volumeSize; - this.blockSize = blockSize; - this.usage = usage; - } - - // When listing volume, the usage will not be set. - public VolumeInfo(String userName, String volumeName, long volumeSize, - long blockSize) { - this.userName = userName; - this.volumeName = volumeName; - this.volumeSize = volumeSize; - this.blockSize = blockSize; - this.usage = -1; - } - - public long getVolumeSize() { - return volumeSize; - } - - public long getBlockSize() { - return blockSize; - } - - public long getUsage() { - return usage; - } - - public String getUserName() { - return userName; - } - - public String getVolumeName() { - return volumeName; - } - - @Override - public String toString() { - return " userName:" + userName + - " volumeName:" + volumeName + - " volumeSize:" + volumeSize + - " blockSize:" + blockSize + - " (sizeInBlocks:" + volumeSize/blockSize + ")" + - " usageInBlocks:" + usage; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/package-info.java deleted file mode 100644 index a331d7a..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/meta/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.meta; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/package-info.java deleted file mode 100644 index a7d5d8b..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientProtocol.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientProtocol.java deleted file mode 100644 index fc40cef..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientProtocol.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.proto; - -import org.apache.hadoop.cblock.meta.VolumeInfo; - -import java.io.IOException; -import java.util.List; - -/** - * The protocol that CBlock client side uses to talk to server side. CBlock - * client is the point where a volume is mounted. All the actual volume IO - * operations will go through CBlock client after the volume is mounted. - * - * When users mount a volume on CBlock client, CBlock client side uses this - * protocol to send mount request to CBlock server. - */ -public interface CBlockClientProtocol { - MountVolumeResponse mountVolume(String userName, String volumeName) - throws IOException; - - List<VolumeInfo> listVolumes() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockServiceProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockServiceProtocol.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockServiceProtocol.java deleted file mode 100644 index bf00bc0..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/CBlockServiceProtocol.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.proto; - -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.classification.InterfaceAudience; - -import java.io.IOException; -import java.util.List; - -/** - * CBlock uses a separate command line tool to send volume management - * operations to CBlock server, including create/delete/info/list volumes. This - * is the protocol used by the command line tool to send these requests and get - * responses from CBlock server. - */ -@InterfaceAudience.Private -public interface CBlockServiceProtocol { - - void createVolume(String userName, String volumeName, - long volumeSize, int blockSize) throws IOException; - - void deleteVolume(String userName, String volumeName, - boolean force) throws IOException; - - VolumeInfo infoVolume(String userName, - String volumeName) throws IOException; - - List<VolumeInfo> listVolume(String userName) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/MountVolumeResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/MountVolumeResponse.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/MountVolumeResponse.java deleted file mode 100644 index d33337f..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/MountVolumeResponse.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.proto; - -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; - -import java.util.HashMap; -import java.util.List; - -/** - * The response message of mounting a volume. Including enough information - * for the client to communicate (perform IO) with the volume containers - * directly. - */ -public class MountVolumeResponse { - private final boolean isValid; - private final String userName; - private final String volumeName; - private final long volumeSize; - private final int blockSize; - private List<Pipeline> containerList; - private HashMap<String, Pipeline> pipelineMap; - - public MountVolumeResponse(boolean isValid, String userName, - String volumeName, long volumeSize, int blockSize, - List<Pipeline> containerList, - HashMap<String, Pipeline> pipelineMap) { - this.isValid = isValid; - this.userName = userName; - this.volumeName = volumeName; - this.volumeSize = volumeSize; - this.blockSize = blockSize; - this.containerList = containerList; - this.pipelineMap = pipelineMap; - } - - public boolean getIsValid() { - return isValid; - } - - public String getUserName() { - return userName; - } - - public String getVolumeName() { - return volumeName; - } - - public long getVolumeSize() { - return volumeSize; - } - - public int getBlockSize() { - return blockSize; - } - - public List<Pipeline> getContainerList() { - return containerList; - } - - public HashMap<String, Pipeline> getPipelineMap() { - return pipelineMap; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/package-info.java deleted file mode 100644 index 33438ec..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/proto/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.proto; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolPB.java deleted file mode 100644 index 99f3110..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolPB.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.protocolPB; - -import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtocolInfo; - -/** - * This is the protocol CBlock client uses to talk to CBlock server. - * CBlock client is the mounting point of a volume. When a user mounts a - * volume, the cBlock client running on the local node will use this protocol - * to talk to CBlock server to mount the volume. - */ -@ProtocolInfo(protocolName = - "org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocol", - protocolVersion = 1) -@InterfaceAudience.Private -public interface CBlockClientServerProtocolPB extends - CBlockClientServerProtocolProtos - .CBlockClientServerProtocolService.BlockingInterface { -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java deleted file mode 100644 index f937a73..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.protocolPB; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.CBlockClientProtocol; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos; -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.stream.Collectors; - -/** - * The server side implementation of cblock client to server protocol. - */ -@InterfaceAudience.Private -public class CBlockClientServerProtocolServerSideTranslatorPB implements - CBlockClientServerProtocolPB { - - private final CBlockClientProtocol impl; - - public CBlockClientServerProtocolServerSideTranslatorPB( - CBlockClientProtocol impl) { - this.impl = impl; - } - - @Override - public CBlockClientServerProtocolProtos.MountVolumeResponseProto mountVolume( - RpcController controller, - CBlockClientServerProtocolProtos.MountVolumeRequestProto request) - throws ServiceException { - String userName = request.getUserName(); - String volumeName = request.getVolumeName(); - CBlockClientServerProtocolProtos.MountVolumeResponseProto.Builder - resp = - CBlockClientServerProtocolProtos - .MountVolumeResponseProto.newBuilder(); - try { - MountVolumeResponse result = impl.mountVolume(userName, volumeName); - boolean isValid = result.getIsValid(); - resp.setIsValid(isValid); - if (isValid) { - resp.setUserName(result.getUserName()); - resp.setVolumeName(result.getVolumeName()); - resp.setVolumeSize(result.getVolumeSize()); - resp.setBlockSize(result.getBlockSize()); - List<Pipeline> containers = result.getContainerList(); - HashMap<String, Pipeline> pipelineMap = result.getPipelineMap(); - - for (int i=0; i<containers.size(); i++) { - CBlockClientServerProtocolProtos.ContainerIDProto.Builder id = - CBlockClientServerProtocolProtos.ContainerIDProto.newBuilder(); - String containerName = containers.get(i).getContainerName(); - id.setContainerID(containerName); - id.setIndex(i); - if (pipelineMap.containsKey(containerName)) { - id.setPipeline(pipelineMap.get(containerName).getProtobufMessage()); - } - resp.addAllContainerIDs(id.build()); - } - } - } catch (IOException e) { - throw new ServiceException(e); - } - return resp.build(); - } - - @Override - public CBlockClientServerProtocolProtos.ListVolumesResponseProto listVolumes( - RpcController controller, - CBlockClientServerProtocolProtos.ListVolumesRequestProto request) - throws ServiceException { - try { - CBlockClientServerProtocolProtos.ListVolumesResponseProto.Builder resp = - CBlockClientServerProtocolProtos.ListVolumesResponseProto - .newBuilder(); - List<VolumeInfo> volumeInfos = impl.listVolumes(); - List<CBlockServiceProtocolProtos.VolumeInfoProto> convertedInfos = - volumeInfos.stream().map( - volumeInfo -> CBlockServiceProtocolProtos.VolumeInfoProto - .newBuilder().setUserName(volumeInfo.getUserName()) - .setBlockSize(volumeInfo.getBlockSize()) - .setVolumeName(volumeInfo.getVolumeName()) - .setVolumeSize(volumeInfo.getVolumeSize()) - .setUsage(volumeInfo.getUsage()).build()) - .collect(Collectors.toList()); - resp.addAllVolumeEntry(convertedInfos); - return resp.build(); - } catch (IOException e) { - throw new ServiceException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolPB.java deleted file mode 100644 index 282d6cd..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockServiceProtocolPB.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.protocolPB; - -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtocolInfo; - -/** - * Users use a independent command line tool to talk to CBlock server for - * volume operations (create/delete/info/list). This is the protocol used by - * the the command line tool to send these requests to CBlock server. - */ -@ProtocolInfo(protocolName = - "org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocol", - protocolVersion = 1) -@InterfaceAudience.Private -public interface CBlockServiceProtocolPB extends - CBlockServiceProtocolProtos.CBlockServiceProtocolService.BlockingInterface { -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org