http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java new file mode 100644 index 0000000..e7df0cf --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * This class is for maintaining the various Cblock Target statistics + * and publishing them through the metrics interfaces. + * This also registers the JMX MBean for RPC. + * + * This class maintains stats like cache hit and miss ratio + * as well as the latency time of read and write ops. + */ +public class CBlockTargetMetrics { + // IOPS based Metrics + @Metric private MutableCounterLong numReadOps; + @Metric private MutableCounterLong numWriteOps; + @Metric private MutableCounterLong numReadCacheHits; + @Metric private MutableCounterLong numReadCacheMiss; + @Metric private MutableCounterLong numDirectBlockWrites; + + // Cblock internal Metrics + @Metric private MutableCounterLong numDirtyLogBlockRead; + @Metric private MutableCounterLong numBytesDirtyLogRead; + @Metric private MutableCounterLong numBytesDirtyLogWritten; + @Metric private MutableCounterLong numBlockBufferFlushCompleted; + @Metric private MutableCounterLong numBlockBufferFlushTriggered; + @Metric private MutableCounterLong numBlockBufferUpdates; + @Metric private MutableCounterLong numRetryLogBlockRead; + @Metric private MutableCounterLong numBytesRetryLogRead; + + // Failure Metrics + @Metric private MutableCounterLong numReadLostBlocks; + @Metric private MutableCounterLong numFailedReadBlocks; + @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks; + @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks; + @Metric private MutableCounterLong numFailedDirectBlockWrites; + @Metric private MutableCounterLong numIllegalDirtyLogFiles; + @Metric private MutableCounterLong numFailedDirtyLogFileDeletes; + @Metric private MutableCounterLong numFailedBlockBufferFlushes; + @Metric private MutableCounterLong numInterruptedBufferWaits; + @Metric private MutableCounterLong numFailedRetryLogFileWrites; + @Metric private MutableCounterLong numWriteMaxRetryBlocks; + @Metric private MutableCounterLong numFailedReleaseLevelDB; + + // Latency based Metrics + @Metric private MutableRate dbReadLatency; + @Metric private MutableRate containerReadLatency; + @Metric private MutableRate dbWriteLatency; + @Metric private MutableRate containerWriteLatency; + @Metric private MutableRate blockBufferFlushLatency; + @Metric private MutableRate directBlockWriteLatency; + + public CBlockTargetMetrics() { + } + + public static CBlockTargetMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register("CBlockTargetMetrics", + "CBlock Target Metrics", + new CBlockTargetMetrics()); + } + + public void incNumReadOps() { + numReadOps.incr(); + } + + public void incNumWriteOps() { + numWriteOps.incr(); + } + + public void incNumReadCacheHits() { + numReadCacheHits.incr(); + } + + public void incNumReadCacheMiss() { + numReadCacheMiss.incr(); + } + + public void incNumReadLostBlocks() { + numReadLostBlocks.incr(); + } + + public void incNumDirectBlockWrites() { + numDirectBlockWrites.incr(); + } + + public void incNumWriteIOExceptionRetryBlocks() { + numWriteIOExceptionRetryBlocks.incr(); + } + + public void incNumWriteGenericExceptionRetryBlocks() { + numWriteGenericExceptionRetryBlocks.incr(); + } + + public void incNumFailedDirectBlockWrites() { + numFailedDirectBlockWrites.incr(); + } + + public void incNumFailedReadBlocks() { + numFailedReadBlocks.incr(); + } + + public void incNumBlockBufferFlushCompleted() { + numBlockBufferFlushCompleted.incr(); + } + + public void incNumBlockBufferFlushTriggered() { + numBlockBufferFlushTriggered.incr(); + } + + public void incNumDirtyLogBlockRead() { + numDirtyLogBlockRead.incr(); + } + + public void incNumBytesDirtyLogRead(int bytes) { + numBytesDirtyLogRead.incr(bytes); + } + + public void incNumBlockBufferUpdates() { + numBlockBufferUpdates.incr(); + } + + public void incNumRetryLogBlockRead() { + numRetryLogBlockRead.incr(); + } + + public void incNumBytesRetryLogRead(int bytes) { + numBytesRetryLogRead.incr(bytes); + } + + public void incNumBytesDirtyLogWritten(int bytes) { + numBytesDirtyLogWritten.incr(bytes); + } + + public void incNumFailedBlockBufferFlushes() { + numFailedBlockBufferFlushes.incr(); + } + + public void incNumInterruptedBufferWaits() { + numInterruptedBufferWaits.incr(); + } + + public void incNumIllegalDirtyLogFiles() { + numIllegalDirtyLogFiles.incr(); + } + + public void incNumFailedDirtyLogFileDeletes() { + numFailedDirtyLogFileDeletes.incr(); + } + + public void incNumFailedRetryLogFileWrites() { + numFailedRetryLogFileWrites.incr(); + } + + public void incNumWriteMaxRetryBlocks() { + numWriteMaxRetryBlocks.incr(); + } + + public void incNumFailedReleaseLevelDB() { + numFailedReleaseLevelDB.incr(); + } + + public void updateDBReadLatency(long latency) { + dbReadLatency.add(latency); + } + + public void updateContainerReadLatency(long latency) { + containerReadLatency.add(latency); + } + + public void updateDBWriteLatency(long latency) { + dbWriteLatency.add(latency); + } + + public void updateContainerWriteLatency(long latency) { + containerWriteLatency.add(latency); + } + + public void updateDirectBlockWriteLatency(long latency) { + directBlockWriteLatency.add(latency); + } + + public void updateBlockBufferFlushLatency(long latency) { + blockBufferFlushLatency.add(latency); + } + + @VisibleForTesting + public long getNumReadOps() { + return numReadOps.value(); + } + + @VisibleForTesting + public long getNumWriteOps() { + return numWriteOps.value(); + } + + @VisibleForTesting + public long getNumReadCacheHits() { + return numReadCacheHits.value(); + } + + @VisibleForTesting + public long getNumReadCacheMiss() { + return numReadCacheMiss.value(); + } + + @VisibleForTesting + public long getNumReadLostBlocks() { + return numReadLostBlocks.value(); + } + + @VisibleForTesting + public long getNumDirectBlockWrites() { + return numDirectBlockWrites.value(); + } + + @VisibleForTesting + public long getNumFailedDirectBlockWrites() { + return numFailedDirectBlockWrites.value(); + } + + @VisibleForTesting + public long getNumFailedReadBlocks() { + return numFailedReadBlocks.value(); + } + + @VisibleForTesting + public long getNumWriteIOExceptionRetryBlocks() { + return numWriteIOExceptionRetryBlocks.value(); + } + + @VisibleForTesting + public long getNumWriteGenericExceptionRetryBlocks() { + return numWriteGenericExceptionRetryBlocks.value(); + } + + @VisibleForTesting + public long getNumBlockBufferFlushCompleted() { + return numBlockBufferFlushCompleted.value(); + } + + @VisibleForTesting + public long getNumBlockBufferFlushTriggered() { + return numBlockBufferFlushTriggered.value(); + } + + @VisibleForTesting + public long getNumDirtyLogBlockRead() { + return numDirtyLogBlockRead.value(); + } + + @VisibleForTesting + public long getNumBytesDirtyLogReads() { + return numBytesDirtyLogRead.value(); + } + + @VisibleForTesting + public long getNumBlockBufferUpdates() { + return numBlockBufferUpdates.value(); + } + + @VisibleForTesting + public long getNumRetryLogBlockRead() { + return numRetryLogBlockRead.value(); + } + + @VisibleForTesting + public long getNumBytesRetryLogReads() { + return numBytesRetryLogRead.value(); + } + + @VisibleForTesting + public long getNumBytesDirtyLogWritten() { + return numBytesDirtyLogWritten.value(); + } + + @VisibleForTesting + public long getNumFailedBlockBufferFlushes() { + return numFailedBlockBufferFlushes.value(); + } + + @VisibleForTesting + public long getNumInterruptedBufferWaits() { + return numInterruptedBufferWaits.value(); + } + + @VisibleForTesting + public long getNumIllegalDirtyLogFiles() { + return numIllegalDirtyLogFiles.value(); + } + + @VisibleForTesting + public long getNumFailedDirtyLogFileDeletes() { + return numFailedDirtyLogFileDeletes.value(); + } + + @VisibleForTesting + public long getNumFailedRetryLogFileWrites() { + return numFailedRetryLogFileWrites.value(); + } + + @VisibleForTesting + public long getNumWriteMaxRetryBlocks() { + return numWriteMaxRetryBlocks.value(); + } + + @VisibleForTesting + public long getNumFailedReleaseLevelDB() { + return numFailedReleaseLevelDB.value(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java new file mode 100644 index 0000000..75e013e --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.apache.hadoop.cblock.util.KeyUtil; +import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.scm.XceiverClientManager; +import org.jscsi.target.Configuration; +import org.jscsi.target.Target; +import org.jscsi.target.TargetServer; + +import java.io.IOException; +import java.util.HashMap; + +/** + * This class extends JSCSI target server, which is a ISCSI target that can be + * recognized by a remote machine with ISCSI installed. + */ +public final class CBlockTargetServer extends TargetServer { + private final OzoneConfiguration conf; + private final CBlockManagerHandler cBlockManagerHandler; + private final XceiverClientManager xceiverClientManager; + private final ContainerCacheFlusher containerCacheFlusher; + private final CBlockTargetMetrics metrics; + + public CBlockTargetServer(OzoneConfiguration ozoneConfig, + Configuration jscsiConf, + CBlockManagerHandler cBlockManagerHandler, + CBlockTargetMetrics metrics) + throws IOException { + super(jscsiConf); + this.cBlockManagerHandler = cBlockManagerHandler; + this.xceiverClientManager = new XceiverClientManager(ozoneConfig); + this.conf = ozoneConfig; + this.containerCacheFlusher = new ContainerCacheFlusher(this.conf, + xceiverClientManager, metrics); + this.metrics = metrics; + LOGGER.info("Starting flusher thread."); + Thread flushListenerThread = new Thread(containerCacheFlusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); + } + + public static void main(String[] args) throws Exception { + } + + @Override + public boolean isValidTargetName(String checkTargetName) { + if (!KeyUtil.isValidVolumeKey(checkTargetName)) { + return false; + } + String userName = KeyUtil.getUserNameFromVolumeKey(checkTargetName); + String volumeName = KeyUtil.getVolumeFromVolumeKey(checkTargetName); + if (userName == null || volumeName == null) { + return false; + } + try { + MountVolumeResponse result = + cBlockManagerHandler.mountVolume(userName, volumeName); + if (!result.getIsValid()) { + LOGGER.error("Not a valid volume:" + checkTargetName); + return false; + } + String volumeKey = KeyUtil.getVolumeKey(result.getUserName(), + result.getVolumeName()); + if (!targets.containsKey(volumeKey)) { + LOGGER.info("Mounting Volume. username: {} volume:{}", + userName, volumeName); + CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() + .setUserName(userName) + .setVolumeName(volumeName) + .setVolumeSize(result.getVolumeSize()) + .setBlockSize(result.getBlockSize()) + .setContainerList(result.getContainerList()) + .setClientManager(xceiverClientManager) + .setConf(this.conf) + .setFlusher(containerCacheFlusher) + .setCBlockTargetMetrics(metrics) + .build(); + Target target = new Target(volumeKey, volumeKey, ozoneStore); + targets.put(volumeKey, target); + } + } catch (IOException e) { + LOGGER.error("Can not connect to server when validating target!" + + e.getMessage()); + } + return targets.containsKey(checkTargetName); + } + + @Override + public String[] getTargetNames() { + try { + if (cBlockManagerHandler != null) { + return cBlockManagerHandler.listVolumes(). + stream().map( + volumeInfo -> volumeInfo.getUserName() + ":" + volumeInfo + .getVolumeName()).toArray(String[]::new); + } else { + return new String[0]; + } + } catch (IOException e) { + LOGGER.error("Can't list existing volumes", e); + return new String[0]; + } + } + + @VisibleForTesting + public HashMap<String, Target> getTargets() { + return targets; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java new file mode 100644 index 0000000..292662e --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java @@ -0,0 +1,599 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.cblock.CBlockConfigKeys; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.LevelDBStore; +import org.iq80.leveldb.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_KEEP_ALIVE; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_MAX_POOL_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_THREAD_PRIORITY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT; + +/** + * Class that writes to remote containers. + */ +public class ContainerCacheFlusher implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerCacheFlusher.class); + private final LinkedBlockingQueue<Message> messageQueue; + private final ThreadPoolExecutor threadPoolExecutor; + private final ArrayBlockingQueue<Runnable> workQueue; + private final ConcurrentMap<String, RefCountedDB> dbMap; + private final ByteBuffer blockIDBuffer; + private final ConcurrentMap<String, Pipeline[]> pipelineMap; + private final AtomicLong remoteIO; + private final XceiverClientManager xceiverClientManager; + private final CBlockTargetMetrics metrics; + private AtomicBoolean shutdown; + private final long levelDBCacheSize; + private final int maxRetryCount; + private final String tracePrefix; + + private final ConcurrentMap<String, FinishCounter> finishCountMap; + + /** + * Constructs the writers to remote queue. + */ + public ContainerCacheFlusher(Configuration config, + XceiverClientManager xceiverClientManager, + CBlockTargetMetrics metrics) { + int queueSize = config.getInt(DFS_CBLOCK_CACHE_QUEUE_SIZE_KB, + DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT) * 1024; + int corePoolSize = config.getInt(DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE, + DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT); + int maxPoolSize = config.getInt(DFS_CBLOCK_CACHE_MAX_POOL_SIZE, + DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT); + long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE, + DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT, TimeUnit.SECONDS); + int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY, + DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); + int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); + levelDBCacheSize = config.getInt(DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY, + DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT) * OzoneConsts.MB; + + LOG.info("Cache: Core Pool Size: {}", corePoolSize); + LOG.info("Cache: Keep Alive: {}", keepAlive); + LOG.info("Cache: Max Pool Size: {}", maxPoolSize); + LOG.info("Cache: Thread Pri: {}", threadPri); + LOG.info("Cache: BlockBuffer Size: {}", blockBufferSize); + + shutdown = new AtomicBoolean(false); + messageQueue = new LinkedBlockingQueue<>(); + workQueue = new ArrayBlockingQueue<>(queueSize, true); + + ThreadFactory workerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("Cache Block Writer Thread #%d") + .setDaemon(true) + .setPriority(threadPri) + .build(); + threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, + keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory, + new ThreadPoolExecutor.AbortPolicy()); + threadPoolExecutor.prestartAllCoreThreads(); + + dbMap = new ConcurrentHashMap<>(); + pipelineMap = new ConcurrentHashMap<>(); + blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize); + this.xceiverClientManager = xceiverClientManager; + this.metrics = metrics; + this.remoteIO = new AtomicLong(); + + this.finishCountMap = new ConcurrentHashMap<>(); + this.maxRetryCount = + config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY, + CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT); + this.tracePrefix = getTracePrefix(); + } + + private void checkExistingLog(String prefixFileName, File dbPath) { + if (!dbPath.exists()) { + LOG.debug("No existing dirty log found at {}", dbPath); + return; + } + LOG.debug("Need to check and requeue existing dirty log {}", dbPath); + HashMap<String, ArrayList<String>> allFiles = new HashMap<>(); + traverse(prefixFileName, dbPath, allFiles); + for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) { + String parentPath = entry.getKey(); + for (String fileName : entry.getValue()) { + LOG.info("found {} {} with prefix {}", + parentPath, fileName, prefixFileName); + processDirtyBlocks(parentPath, fileName); + } + } + } + + private void traverse(String prefixFileName, File path, + HashMap<String, ArrayList<String>> files) { + if (path.isFile()) { + if (path.getName().startsWith(prefixFileName)) { + LOG.debug("found this {} with {}", path.getParent(), path.getName()); + if (!files.containsKey(path.getParent())) { + files.put(path.getParent(), new ArrayList<>()); + } + files.get(path.getParent()).add(path.getName()); + } + } else { + File[] listFiles = path.listFiles(); + if (listFiles != null) { + for (File subPath : listFiles) { + traverse(prefixFileName, subPath, files); + } + } + } + } + + /** + * Gets the CBlockTargetMetrics. + * + * @return CBlockTargetMetrics + */ + public CBlockTargetMetrics getTargetMetrics() { + return metrics; + } + + /** + * Gets the getXceiverClientManager. + * + * @return XceiverClientManager + */ + public XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } + + /** + * Shutdown this instance. + */ + public void shutdown() { + this.shutdown.set(true); + threadPoolExecutor.shutdown(); + } + + public long incrementRemoteIO() { + return remoteIO.incrementAndGet(); + } + + /** + * Processes a block cache file and queues those blocks for the remote I/O. + * + * @param dbPath - Location where the DB can be found. + * @param fileName - Block Cache File Name + */ + public void processDirtyBlocks(String dbPath, String fileName) { + LOG.info("Adding {}/{} to queue. Queue Length: {}", dbPath, fileName, + messageQueue.size()); + this.messageQueue.add(new Message(dbPath, fileName)); + } + + public Logger getLOG() { + return LOG; + } + + /** + * Opens a DB if needed or returns a handle to an already open DB. + * + * @param dbPath -- dbPath + * @return the levelDB on the given path. + * @throws IOException + */ + public synchronized LevelDBStore openDB(String dbPath) + throws IOException { + if (dbMap.containsKey(dbPath)) { + RefCountedDB refDB = dbMap.get(dbPath); + refDB.open(); + return refDB.db; + } else { + Options options = new Options(); + options.cacheSize(levelDBCacheSize); + options.createIfMissing(true); + LevelDBStore cacheDB = new LevelDBStore( + new File(getDBFileName(dbPath)), options); + RefCountedDB refDB = new RefCountedDB(dbPath, cacheDB); + dbMap.put(dbPath, refDB); + return cacheDB; + } + } + + /** + * Updates the container map. This data never changes so we will update this + * during restarts and it should not hurt us. + * + * Once a CBlockLocalCache cache is registered, requeue dirty/retry log files + * for the volume + * + * @param dbPath - DbPath + * @param containerList - Container List. + */ + public void register(String dbPath, Pipeline[] containerList) { + File dbFile = Paths.get(dbPath).toFile(); + pipelineMap.put(dbPath, containerList); + checkExistingLog(AsyncBlockWriter.DIRTY_LOG_PREFIX, dbFile); + checkExistingLog(AsyncBlockWriter.RETRY_LOG_PREFIX, dbFile); + } + + private String getDBFileName(String dbPath) { + return dbPath + ".db"; + } + + public LevelDBStore getCacheDB(String dbPath) throws IOException { + return openDB(dbPath); + } + + public void releaseCacheDB(String dbPath) { + try { + closeDB(dbPath); + } catch (Exception e) { + metrics.incNumFailedReleaseLevelDB(); + LOG.error("LevelDB close failed, dbPath:" + dbPath, e); + } + } + /** + * Close the DB if we don't have any outstanding references. + * + * @param dbPath - dbPath + * @throws IOException + */ + public synchronized void closeDB(String dbPath) throws IOException { + if (dbMap.containsKey(dbPath)) { + RefCountedDB refDB = dbMap.get(dbPath); + int count = refDB.close(); + if (count == 0) { + dbMap.remove(dbPath); + } + } + } + + Pipeline getPipeline(String dbPath, long blockId) { + Pipeline[] containerList = pipelineMap.get(dbPath); + Preconditions.checkNotNull(containerList); + int containerIdx = (int) blockId % containerList.length; + long cBlockIndex = + Longs.fromByteArray(containerList[containerIdx].getData()); + if (cBlockIndex > 0) { + // This catches the case when we get a wrong container in the ordering + // of the containers. + Preconditions.checkState(containerIdx % cBlockIndex == 0, + "The container ID computed should match with the container index " + + "returned from cBlock Server."); + } + return containerList[containerIdx]; + } + + public void incFinishCount(String fileName) { + if (!finishCountMap.containsKey(fileName)) { + LOG.error("No record for such file:" + fileName); + return; + } + finishCountMap.get(fileName).incCount(); + if (finishCountMap.get(fileName).isFileDeleted()) { + finishCountMap.remove(fileName); + } + } + + /** + * When an object implementing interface <code>Runnable</code> is used + * to create a thread, starting the thread causes the object's + * <code>run</code> method to be called in that separately executing + * thread. + * <p> + * The general contract of the method <code>run</code> is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + while (!this.shutdown.get()) { + try { + Message message = messageQueue.take(); + LOG.debug("Got message to process -- DB Path : {} , FileName; {}", + message.getDbPath(), message.getFileName()); + String fullPath = Paths.get(message.getDbPath(), + message.getFileName()).toString(); + String[] fileNameParts = message.getFileName().split("\\."); + Preconditions.checkState(fileNameParts.length > 1); + String fileType = fileNameParts[0]; + boolean isDirtyLogFile = + fileType.equalsIgnoreCase(AsyncBlockWriter.DIRTY_LOG_PREFIX); + ReadableByteChannel fileChannel = new FileInputStream(fullPath) + .getChannel(); + // TODO: We can batch and unique the IOs here. First getting the code + // to work, we will add those later. + int bytesRead = fileChannel.read(blockIDBuffer); + fileChannel.close(); + LOG.debug("Read blockID log of size: {} position {} remaining {}", + bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining()); + // current position of in the buffer in bytes, divided by number of + // bytes per long (which is calculated by number of bits per long + // divided by number of bits per byte) gives the number of blocks + int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE); + if (isDirtyLogFile) { + getTargetMetrics().incNumBytesDirtyLogRead(bytesRead); + } else { + getTargetMetrics().incNumBytesRetryLogRead(bytesRead); + } + if (finishCountMap.containsKey(message.getFileName())) { + // In theory this should never happen. But if it happened, + // we need to know it... + getTargetMetrics().incNumIllegalDirtyLogFiles(); + LOG.error("Adding DirtyLog file again {} current count {} new {}", + message.getFileName(), + finishCountMap.get(message.getFileName()).expectedCount, + blockCount); + } + finishCountMap.put(message.getFileName(), + new FinishCounter(blockCount, message.getDbPath(), + message.getFileName(), this)); + // should be flip instead of rewind, because we also need to make sure + // the end position is correct. + blockIDBuffer.flip(); + LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(), + blockCount); + while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) { + long blockID = blockIDBuffer.getLong(); + int retryCount = 0; + if (isDirtyLogFile) { + getTargetMetrics().incNumDirtyLogBlockRead(); + } else { + getTargetMetrics().incNumRetryLogBlockRead(); + Preconditions.checkState(fileNameParts.length == 4); + retryCount = Integer.parseInt(fileNameParts[3]); + } + LogicalBlock block = new DiskBlock(blockID, null, false); + BlockWriterTask blockWriterTask = new BlockWriterTask(block, this, + message.getDbPath(), retryCount, message.getFileName(), + maxRetryCount); + threadPoolExecutor.submit(blockWriterTask); + } + blockIDBuffer.clear(); + } catch (InterruptedException e) { + LOG.info("ContainerCacheFlusher is interrupted.", e); + } catch (FileNotFoundException e) { + LOG.error("Unable to find the dirty blocks file. This will cause " + + "data errors. Please stop using this volume.", e); + } catch (IOException e) { + LOG.error("Unable to read the dirty blocks file. This will cause " + + "data errors. Please stop using this volume.", e); + } catch (Exception e) { + LOG.error("Generic exception.", e); + } + } + LOG.info("Exiting flusher"); + } + + /** + * Tries to get the local host IP Address as trace prefix + * for creating trace IDs, otherwise uses a random UUID for it. + */ + private static String getTracePrefix() { + String tmp; + try { + tmp = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException ex) { + tmp = UUID.randomUUID().toString(); + LOG.error("Unable to read the host address. Using a GUID for " + + "hostname:{} ", tmp, ex); + } + return tmp; + } + + /** + * We create a trace ID to make it easy to debug issues. + * A trace ID is in IPAddress:UserName:VolumeName:blockID:second format. + * + * This will get written down on the data node if we get any failures, so + * with this trace ID we can correlate cBlock failures across machines. + * + * @param blockID - Block ID + * @return trace ID + */ + public String getTraceID(File dbPath, long blockID) { + String volumeName = dbPath.getName(); + String userName = dbPath.getParentFile().getName(); + // mapping to seconds to make the string smaller. + return tracePrefix + ":" + userName + ":" + volumeName + + ":" + blockID + ":" + Time.monotonicNow() / 1000; + } + + /** + * Keeps a Reference counted DB that we close only when the total Reference + * has gone to zero. + */ + private static class RefCountedDB { + private LevelDBStore db; + private AtomicInteger refcount; + private String dbPath; + + /** + * RefCountedDB DB ctor. + * + * @param dbPath - DB path. + * @param db - LevelDBStore db + */ + RefCountedDB(String dbPath, LevelDBStore db) { + this.db = db; + this.refcount = new AtomicInteger(1); + this.dbPath = dbPath; + } + + /** + * close the DB if possible. + */ + public int close() throws IOException { + int count = this.refcount.decrementAndGet(); + if (count == 0) { + LOG.info("Closing the LevelDB. {} ", this.dbPath); + db.close(); + } + return count; + } + + public void open() { + this.refcount.incrementAndGet(); + } + } + + /** + * The message held in processing queue. + */ + private static class Message { + private String dbPath; + private String fileName; + + /** + * A message that holds the info about which path dirty blocks log and + * which path contains db. + * + * @param dbPath + * @param fileName + */ + Message(String dbPath, String fileName) { + this.dbPath = dbPath; + this.fileName = fileName; + } + + public String getDbPath() { + return dbPath; + } + + public void setDbPath(String dbPath) { + this.dbPath = dbPath; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + } + + private static class FinishCounter { + private final long expectedCount; + private final String dbPath; + private final String dirtyLogPath; + private final AtomicLong currentCount; + private AtomicBoolean fileDeleted; + private final ContainerCacheFlusher flusher; + + FinishCounter(long expectedCount, String dbPath, + String dirtyLogPath, ContainerCacheFlusher flusher) throws IOException { + this.expectedCount = expectedCount; + this.dbPath = dbPath; + this.dirtyLogPath = dirtyLogPath; + this.currentCount = new AtomicLong(0); + this.fileDeleted = new AtomicBoolean(false); + this.flusher = flusher; + } + + public boolean isFileDeleted() { + return fileDeleted.get(); + } + + public void incCount() { + long count = this.currentCount.incrementAndGet(); + if (count >= expectedCount) { + String filePath = String.format("%s/%s", dbPath, dirtyLogPath); + LOG.debug( + "Deleting {} with count {} {}", filePath, count, expectedCount); + try { + Path path = Paths.get(filePath); + Files.delete(path); + // the following part tries to remove the directory if it is empty + // but not sufficient, because the .db directory still exists.... + // TODO how to handle the .db directory? + /*Path parent = path.getParent(); + if (parent.toFile().listFiles().length == 0) { + Files.delete(parent); + }*/ + fileDeleted.set(true); + } catch (Exception e) { + flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes(); + LOG.error("Error deleting dirty log file:" + filePath, e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java new file mode 100644 index 0000000..f164f38 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/SCSITargetDaemon.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ISCSI_ADVERTISED_IP; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ISCSI_ADVERTISED_PORT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT; + +import org.apache.hadoop.cblock.CblockUtils; +import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; +import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.scm.client.ContainerOperationClient; +import org.apache.hadoop.security.UserGroupInformation; +import org.jscsi.target.Configuration; + +import java.net.InetSocketAddress; + +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CONTAINER_SIZE_GB_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_PORT_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY; + +/** + * This class runs the target server process. + */ +public final class SCSITargetDaemon { + public static void main(String[] args) throws Exception { + CblockUtils.activateConfigs(); + OzoneConfiguration ozoneConf = new OzoneConfiguration(); + + RPC.setProtocolEngine(ozoneConf, CBlockClientServerProtocolPB.class, + ProtobufRpcEngine.class); + long containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY, + DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT); + ContainerOperationClient.setContainerSizeB( + containerSizeGB * OzoneConsts.GB); + String jscsiServerAddress = ozoneConf.get( + DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY, + DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT); + String cbmIPAddress = ozoneConf.get( + DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY, + DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT + ); + int cbmPort = ozoneConf.getInt( + DFS_CBLOCK_JSCSI_PORT_KEY, + DFS_CBLOCK_JSCSI_PORT_DEFAULT + ); + + String scmAddress = ozoneConf.get(OZONE_SCM_CLIENT_BIND_HOST_KEY, + OZONE_SCM_CLIENT_BIND_HOST_DEFAULT); + int scmClientPort = ozoneConf.getInt(OZONE_SCM_CLIENT_PORT_KEY, + OZONE_SCM_CLIENT_PORT_DEFAULT); + int scmDatanodePort = ozoneConf.getInt(OZONE_SCM_DATANODE_PORT_KEY, + OZONE_SCM_DATANODE_PORT_DEFAULT); + + String scmClientAddress = scmAddress + ":" + scmClientPort; + String scmDataodeAddress = scmAddress + ":" + scmDatanodePort; + + ozoneConf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, scmClientAddress); + ozoneConf.set(OZONE_SCM_DATANODE_ADDRESS_KEY, scmDataodeAddress); + + InetSocketAddress cbmAddress = new InetSocketAddress( + cbmIPAddress, cbmPort); + long version = RPC.getProtocolVersion( + CBlockServiceProtocolPB.class); + CBlockClientProtocolClientSideTranslatorPB cbmClient = + new CBlockClientProtocolClientSideTranslatorPB( + RPC.getProxy(CBlockClientServerProtocolPB.class, version, + cbmAddress, UserGroupInformation.getCurrentUser(), ozoneConf, + NetUtils.getDefaultSocketFactory(ozoneConf), 5000) + ); + CBlockManagerHandler cbmHandler = new CBlockManagerHandler(cbmClient); + + String advertisedAddress = ozoneConf. + getTrimmed(DFS_CBLOCK_ISCSI_ADVERTISED_IP, jscsiServerAddress); + + int advertisedPort = ozoneConf. + getInt(DFS_CBLOCK_ISCSI_ADVERTISED_PORT, + DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT); + + Configuration jscsiConfig = + new Configuration(jscsiServerAddress, + advertisedAddress, + advertisedPort); + DefaultMetricsSystem.initialize("CBlockMetrics"); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + CBlockTargetServer targetServer = new CBlockTargetServer( + ozoneConf, jscsiConfig, cbmHandler, metrics); + + targetServer.call(); + } + + private SCSITargetDaemon() { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java new file mode 100644 index 0000000..300b2ae --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache; + +import java.io.IOException; + +/** + * Defines the interface for cache implementations. The cache will be called + * by cblock storage module when it performs IO operations. + */ +public interface CacheModule { + /** + * check if the key is cached, if yes, returned the cached object. + * otherwise, load from data source. Then put it into cache. + * + * @param blockID + * @return the target block. + */ + LogicalBlock get(long blockID) throws IOException; + + /** + * put the value of the key into cache. + * @param blockID + * @param value + */ + void put(long blockID, byte[] value) throws IOException; + + void flush() throws IOException; + + void start() throws IOException; + + void stop() throws IOException; + + void close() throws IOException; + + boolean isDirtyCache(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java new file mode 100644 index 0000000..470826f --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache; + +import java.nio.ByteBuffer; + +/** + * Logical Block is the data structure that we write to the cache, + * the key and data gets written to remote contianers. Rest is used for + * book keeping for the cache. + */ +public interface LogicalBlock { + /** + * Returns the data stream of this block. + * @return - ByteBuffer + */ + ByteBuffer getData(); + + /** + * Frees the byte buffer since we don't need it any more. + */ + void clearData(); + + /** + * Returns the Block ID for this Block. + * @return long - BlockID + */ + long getBlockID(); + + /** + * Flag that tells us if this block has been persisted to container. + * @return whether this block is now persistent + */ + boolean isPersisted(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java new file mode 100644 index 0000000..992578f --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.LevelDBStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A Queue that is used to write blocks asynchronously to the container. + */ +public class AsyncBlockWriter { + private static final Logger LOG = + LoggerFactory.getLogger(AsyncBlockWriter.class); + + /** + * XceiverClientManager is used to get client connections to a set of + * machines. + */ + private final XceiverClientManager xceiverClientManager; + + /** + * This lock is used as a signal to re-queuing thread. The requeue thread + * wakes up as soon as it is signaled some blocks are in the retry queue. + * We try really aggressively since this new block will automatically move + * to the end of the queue. + * <p> + * In the event a container is unavailable for a long time, we can either + * fail all writes or remap and let the writes succeed. The easier + * semantics is to fail the volume until the container is recovered by SCM. + */ + private final Lock lock; + private final Condition notEmpty; + /** + * The cache this writer is operating against. + */ + private final CBlockLocalCache parentCache; + private final BlockBufferManager blockBufferManager; + public final static String DIRTY_LOG_PREFIX = "DirtyLog"; + public static final String RETRY_LOG_PREFIX = "RetryLog"; + private AtomicLong localIoCount; + + /** + * Constructs an Async Block Writer. + * + * @param config - Config + * @param cache - Parent Cache for this writer + */ + public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) { + + Preconditions.checkNotNull(cache, "Cache cannot be null."); + Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null."); + localIoCount = new AtomicLong(); + lock = new ReentrantLock(); + notEmpty = lock.newCondition(); + parentCache = cache; + xceiverClientManager = cache.getClientManager(); + blockBufferManager = new BlockBufferManager(config, parentCache); + } + + public void start() throws IOException { + File logDir = new File(parentCache.getDbPath().toString()); + if (!logDir.exists() && !logDir.mkdirs()) { + LOG.error("Unable to create the log directory, Critical error cannot " + + "continue. Log Dir : {}", logDir); + throw new IllegalStateException("Cache Directory create failed, Cannot " + + "continue. Log Dir: {}" + logDir); + } + blockBufferManager.start(); + } + + /** + * Return the log to write to. + * + * @return Logger. + */ + public static Logger getLOG() { + return LOG; + } + + /** + * Get the CacheDB. + * + * @return LevelDB Handle + */ + LevelDBStore getCacheDB() { + return parentCache.getCacheDB(); + } + + /** + * Returns the client manager. + * + * @return XceiverClientManager + */ + XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } + + /** + * Incs the localIoPacket Count that has gone into this device. + */ + public long incrementLocalIO() { + return localIoCount.incrementAndGet(); + } + + /** + * Return the local io counts to this device. + * @return the count of io + */ + public long getLocalIOCount() { + return localIoCount.get(); + } + + /** + * Writes a block to LevelDB store and queues a work item for the system to + * sync the block to containers. + * + * @param block - Logical Block + */ + public void writeBlock(LogicalBlock block) throws IOException { + byte[] keybuf = Longs.toByteArray(block.getBlockID()); + String traceID = parentCache.getTraceID(block.getBlockID()); + if (parentCache.isShortCircuitIOEnabled()) { + long startTime = Time.monotonicNow(); + getCacheDB().put(keybuf, block.getData().array()); + incrementLocalIO(); + long endTime = Time.monotonicNow(); + parentCache.getTargetMetrics().updateDBWriteLatency( + endTime - startTime); + if (parentCache.isTraceEnabled()) { + String datahash = DigestUtils.sha256Hex(block.getData().array()); + parentCache.getTracer().info( + "Task=WriterTaskDBPut,BlockID={},Time={},SHA={}", + block.getBlockID(), endTime - startTime, datahash); + } + block.clearData(); + blockBufferManager.addToBlockBuffer(block.getBlockID()); + } else { + Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); + String containerName = pipeline.getContainerName(); + XceiverClientSpi client = null; + try { + long startTime = Time.monotonicNow(); + client = parentCache.getClientManager() + .acquireClient(parentCache.getPipeline(block.getBlockID())); + ContainerProtocolCalls.writeSmallFile(client, containerName, + Long.toString(block.getBlockID()), block.getData().array(), + traceID); + long endTime = Time.monotonicNow(); + if (parentCache.isTraceEnabled()) { + String datahash = DigestUtils.sha256Hex(block.getData().array()); + parentCache.getTracer().info( + "Task=DirectWriterPut,BlockID={},Time={},SHA={}", + block.getBlockID(), endTime - startTime, datahash); + } + parentCache.getTargetMetrics(). + updateDirectBlockWriteLatency(endTime - startTime); + parentCache.getTargetMetrics().incNumDirectBlockWrites(); + } catch (Exception ex) { + parentCache.getTargetMetrics().incNumFailedDirectBlockWrites(); + LOG.error("Direct I/O writing of block:{} traceID:{} to " + + "container {} failed", block.getBlockID(), traceID, + containerName, ex); + throw ex; + } finally { + if (client != null) { + parentCache.getClientManager().releaseClient(client); + } + block.clearData(); + } + } + } + + /** + * Shutdown by writing any pending I/O to dirtylog buffer. + */ + public void shutdown() { + blockBufferManager.shutdown(); + } + /** + * Returns tracer. + * + * @return Tracer + */ + Logger getTracer() { + return parentCache.getTracer(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java new file mode 100644 index 0000000..c61a7a4 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; + +/** + * This task is responsible for flushing the BlockIDBuffer + * to Dirty Log File. This Dirty Log file is used later by + * ContainerCacheFlusher when the data is written to container + */ +public class BlockBufferFlushTask implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(BlockBufferFlushTask.class); + private final CBlockLocalCache parentCache; + private final BlockBufferManager bufferManager; + private final ByteBuffer blockIDBuffer; + + BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache, + BlockBufferManager manager) { + this.parentCache = parentCache; + this.bufferManager = manager; + this.blockIDBuffer = blockIDBuffer; + } + + /** + * When an object implementing interface <code>Runnable</code> is used + * to create a thread, starting the thread causes the object's + * <code>run</code> method to be called in that separately executing + * thread. + * <p> + * The general contract of the method <code>run</code> is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + try { + writeBlockBufferToFile(blockIDBuffer); + } catch (Exception e) { + parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes(); + LOG.error("Unable to sync the Block map to disk with " + + (blockIDBuffer.position() / Long.SIZE) + "entries " + + "-- NOTE: This might cause a data loss or corruption", e); + } finally { + bufferManager.releaseBuffer(blockIDBuffer); + } + } + + /** + * Write Block Buffer to file. + * + * @param buffer - ByteBuffer + * @throws IOException + */ + private void writeBlockBufferToFile(ByteBuffer buffer) + throws IOException { + long startTime = Time.monotonicNow(); + boolean append = false; + + // If there is nothing written to blockId buffer, + // then skip flushing of blockId buffer + if (buffer.position() == 0) { + return; + } + + buffer.flip(); + String fileName = + String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX, + Time.monotonicNow()); + String log = Paths.get(parentCache.getDbPath().toString(), fileName) + .toString(); + + FileChannel channel = new FileOutputStream(log, append).getChannel(); + int bytesWritten = channel.write(buffer); + channel.close(); + buffer.clear(); + parentCache.processDirtyMessage(fileName); + long endTime = Time.monotonicNow(); + if (parentCache.isTraceEnabled()) { + parentCache.getTracer().info( + "Task=DirtyBlockLogWrite,Time={} bytesWritten={}", + endTime - startTime, bytesWritten); + } + + parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted(); + parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten); + parentCache.getTargetMetrics(). + updateBlockBufferFlushLatency(endTime - startTime); + LOG.debug("Block buffer writer bytesWritten:{} Time:{}", + bytesWritten, endTime - startTime); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java new file mode 100644 index 0000000..5d3209c --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_KEEP_ALIVE; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_THREAD_PRIORITY; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; + +/** + * This class manages the block ID buffer. + * Block ID Buffer keeps a list of blocks which are in leveldb cache + * This buffer is used later when the blocks are flushed to container + * + * Two blockIDBuffers are maintained so that write are not blocked when + * DirtyLog is being written. Once a blockIDBuffer is full, it will be + * enqueued for DirtyLog write while the other buffer accepts new write. + * Once the DirtyLog write is done, the buffer is returned back to the pool. + * + * There are three triggers for blockIDBuffer flush + * 1) BlockIDBuffer is full, + * 2) Time period defined for blockIDBuffer flush has elapsed. + * 3) Shutdown + */ +public class BlockBufferManager { + private static final Logger LOG = + LoggerFactory.getLogger(BlockBufferManager.class); + + private enum FlushReason { + BUFFER_FULL, + SHUTDOWN, + TIMER + }; + + private final int blockBufferSize; + private final CBlockLocalCache parentCache; + private final ScheduledThreadPoolExecutor scheduledExecutor; + private final ThreadPoolExecutor threadPoolExecutor; + private final long intervalSeconds; + private final ArrayBlockingQueue<ByteBuffer> acquireQueue; + private final ArrayBlockingQueue<Runnable> workQueue; + private ByteBuffer currentBuffer; + + BlockBufferManager(Configuration config, CBlockLocalCache parentCache) { + this.parentCache = parentCache; + this.scheduledExecutor = new ScheduledThreadPoolExecutor(1); + + this.intervalSeconds = + config.getTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT, + TimeUnit.SECONDS); + + long keepAlive = config.getTimeDuration(DFS_CBLOCK_CACHE_KEEP_ALIVE, + DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT, + TimeUnit.SECONDS); + this.workQueue = new ArrayBlockingQueue<>(2, true); + int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY, + DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); + ThreadFactory workerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("Cache Block Buffer Manager Thread #%d") + .setDaemon(true) + .setPriority(threadPri) + .build(); + /* + * starting a thread pool with core pool size of 1 and maximum of 2 threads + * as there are maximum of 2 buffers which can be flushed at the same time. + */ + this.threadPoolExecutor = new ThreadPoolExecutor(1, 2, + keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory, + new ThreadPoolExecutor.AbortPolicy()); + + this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); + this.acquireQueue = new ArrayBlockingQueue<>(2, true); + + for (int i = 0; i < 2; i++) { + acquireQueue.add(ByteBuffer.allocate(blockBufferSize)); + } + // get the first buffer to be used + this.currentBuffer = acquireQueue.remove(); + + LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}", + blockBufferSize, intervalSeconds); + } + + // triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns. + // This enqueue is asynchronous and hence triggerBlockBufferFlush will + // only block when there are no available buffers in acquireQueue + // Once the DirtyLog write is done, buffer is returned back to + // BlockBufferManager using releaseBuffer + private synchronized void triggerBlockBufferFlush(FlushReason reason) { + LOG.debug("Flush triggered because: " + reason.toString() + + " Num entries in buffer: " + + currentBuffer.position() / (Long.SIZE / Byte.SIZE) + + " Acquire Queue Size: " + acquireQueue.size()); + + parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered(); + BlockBufferFlushTask flushTask = + new BlockBufferFlushTask(currentBuffer, parentCache, this); + threadPoolExecutor.submit(flushTask); + try { + currentBuffer = acquireQueue.take(); + } catch (InterruptedException ex) { + currentBuffer = null; + parentCache.getTargetMetrics().incNumInterruptedBufferWaits(); + LOG.error("wait on take operation on acquire queue interrupted", ex); + Thread.currentThread().interrupt(); + } + } + + public synchronized void addToBlockBuffer(long blockId) { + parentCache.getTargetMetrics().incNumBlockBufferUpdates(); + currentBuffer.putLong(blockId); + // if no space left, flush this buffer + if (currentBuffer.remaining() == 0) { + triggerBlockBufferFlush(FlushReason.BUFFER_FULL); + } + } + + public void releaseBuffer(ByteBuffer buffer) { + if (buffer.position() != 0) { + LOG.error("requeuing a non empty buffer with:{}", + "elements enqueued in the acquire queue", + buffer.position() / (Long.SIZE / Byte.SIZE)); + buffer.reset(); + } + // There should always be space in the queue to add an element + acquireQueue.add(buffer); + } + + // Start a scheduled task to flush blockIDBuffer + public void start() { + Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER); + scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds, + intervalSeconds, TimeUnit.SECONDS); + threadPoolExecutor.prestartAllCoreThreads(); + } + + public void shutdown() { + triggerBlockBufferFlush(FlushReason.SHUTDOWN); + scheduledExecutor.shutdown(); + threadPoolExecutor.shutdown(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java new file mode 100644 index 0000000..1149164 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; +import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; +import org.apache.hadoop.utils.LevelDBStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_DISK_CACHE_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_TRACE_IO_DEFAULT; + +/** + * A local cache used by the CBlock ISCSI server. This class is enabled or + * disabled via config settings. + */ +public class CBlockLocalCache implements CacheModule { + private static final Logger LOG = + LoggerFactory.getLogger(CBlockLocalCache.class); + private static final Logger TRACER = + LoggerFactory.getLogger("TraceIO"); + + private final Configuration conf; + /** + * LevelDB cache file. + */ + private final LevelDBStore cacheDB; + + /** + * AsyncBlock writer updates the cacheDB and writes the blocks async to + * remote containers. + */ + private final AsyncBlockWriter blockWriter; + + /** + * Sync block reader tries to read from the cache and if we get a cache + * miss we will fetch the block from remote location. It will asynchronously + * update the cacheDB. + */ + private final SyncBlockReader blockReader; + private final String userName; + private final String volumeName; + + /** + * From a block ID we are able to get the pipeline by indexing this array. + */ + private final Pipeline[] containerList; + private final int blockSize; + private XceiverClientManager clientManager; + /** + * If this flag is enabled then cache traces all I/O, all reads and writes + * are visible in the log with sha of the block written. Makes the system + * slower use it only for debugging or creating trace simulations. + */ + private final boolean traceEnabled; + private final boolean enableShortCircuitIO; + private final long volumeSize; + private long currentCacheSize; + private File dbPath; + private final ContainerCacheFlusher flusher; + private CBlockTargetMetrics cblockTargetMetrics; + + /** + * Get Db Path. + * @return the file instance of the db. + */ + public File getDbPath() { + return dbPath; + } + + /** + * Constructor for CBlockLocalCache invoked via the builder. + * + * @param conf - Configuration + * @param volumeName - volume Name + * @param userName - user name + * @param containerPipelines - Pipelines that make up this contianer + * @param blockSize - blockSize + * @param flusher - flusher to flush data to container + * @throws IOException + */ + CBlockLocalCache( + Configuration conf, String volumeName, + String userName, List<Pipeline> containerPipelines, int blockSize, + long volumeSize, ContainerCacheFlusher flusher) throws IOException { + this.conf = conf; + this.userName = userName; + this.volumeName = volumeName; + this.blockSize = blockSize; + this.flusher = flusher; + this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO, + DFS_CBLOCK_TRACE_IO_DEFAULT); + this.enableShortCircuitIO = conf.getBoolean( + DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, + DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT); + dbPath = Paths.get(conf.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY, + DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT), userName, volumeName).toFile(); + + if (!dbPath.exists() && !dbPath.mkdirs()) { + LOG.error("Unable to create the cache paths. Path: {}", dbPath); + throw new IllegalArgumentException("Unable to create paths. Path: " + + dbPath); + } + cacheDB = flusher.getCacheDB(dbPath.toString()); + this.containerList = containerPipelines.toArray(new + Pipeline[containerPipelines.size()]); + this.volumeSize = volumeSize; + + blockWriter = new AsyncBlockWriter(conf, this); + blockReader = new SyncBlockReader(conf, this); + if (this.traceEnabled) { + getTracer().info("Task=StartingCache"); + } + } + + private void setClientManager(XceiverClientManager manager) { + this.clientManager = manager; + } + + private void setCblockTargetMetrics(CBlockTargetMetrics targetMetrics) { + this.cblockTargetMetrics = targetMetrics; + } + + /** + * Returns new builder class that builds a CBlockLocalCache. + * + * @return Builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + public void processDirtyMessage(String fileName) { + flusher.processDirtyBlocks(dbPath.toString(), fileName); + } + + /** + * Get usable disk space. + * + * @param dbPathString - Path to db + * @return long bytes remaining. + */ + private static long getRemainingDiskSpace(String dbPathString) { + try { + URI fileUri = new URI("file:///"); + Path dbPath = Paths.get(fileUri).resolve(dbPathString); + FileStore disk = Files.getFileStore(dbPath); + return disk.getUsableSpace(); + } catch (URISyntaxException | IOException ex) { + LOG.error("Unable to get free space on for path :" + dbPathString); + } + return 0L; + } + + /** + * Returns the Max current CacheSize. + * + * @return - Cache Size + */ + public long getCurrentCacheSize() { + return currentCacheSize; + } + + /** + * Sets the Maximum Cache Size. + * + * @param currentCacheSize - Max current Cache Size. + */ + public void setCurrentCacheSize(long currentCacheSize) { + this.currentCacheSize = currentCacheSize; + } + + /** + * True if block tracing is enabled. + * + * @return - bool + */ + public boolean isTraceEnabled() { + return traceEnabled; + } + + /** + * Checks if Short Circuit I/O is enabled. + * + * @return - true if it is enabled. + */ + public boolean isShortCircuitIOEnabled() { + return enableShortCircuitIO; + } + + /** + * Returns the default block size of this device. + * + * @return - int + */ + public int getBlockSize() { + return blockSize; + } + + /** + * Gets the client manager. + * + * @return XceiverClientManager + */ + public XceiverClientManager getClientManager() { + return clientManager; + } + + /** + * check if the key is cached, if yes, returned the cached object. + * otherwise, load from data source. Then put it into cache. + * + * @param blockID + * @return the block associated to the blockID + */ + @Override + public LogicalBlock get(long blockID) throws IOException { + cblockTargetMetrics.incNumReadOps(); + return blockReader.readBlock(blockID); + } + + /** + * put the value of the key into cache and remote container. + * + * @param blockID - BlockID + * @param data - byte[] + */ + @Override + public void put(long blockID, byte[] data) throws IOException { + cblockTargetMetrics.incNumWriteOps(); + LogicalBlock block = new DiskBlock(blockID, data, false); + blockWriter.writeBlock(block); + } + + @Override + public void flush() throws IOException { + + } + + @Override + public void start() throws IOException { + flusher.register(getDbPath().getPath(), containerList); + blockWriter.start(); + } + + @Override + public void stop() throws IOException { + } + + @Override + public void close() throws IOException { + blockReader.shutdown(); + blockWriter.shutdown(); + this.flusher.releaseCacheDB(dbPath.toString()); + if (this.traceEnabled) { + getTracer().info("Task=ShutdownCache"); + } + } + + /** + * Returns true if cache still has blocks pending to write. + * + * @return false if we have no pending blocks to write. + */ + @Override + public boolean isDirtyCache() { + return false; + } + + /** + * Returns the local cache DB. + * + * @return - DB + */ + LevelDBStore getCacheDB() { + return this.cacheDB; + } + + /** + * Returns the current userName. + * + * @return - UserName + */ + String getUserName() { + return this.userName; + } + + /** + * Returns the volume name. + * + * @return VolumeName. + */ + String getVolumeName() { + return this.volumeName; + } + + /** + * Returns the target metrics. + * + * @return CBlock Target Metrics. + */ + CBlockTargetMetrics getTargetMetrics() { + return this.cblockTargetMetrics; + } + + /** + * Returns the pipeline to use given a container. + * + * @param blockId - blockID + * @return - pipeline. + */ + Pipeline getPipeline(long blockId) { + int containerIdx = (int) blockId % containerList.length; + long cBlockIndex = + Longs.fromByteArray(containerList[containerIdx].getData()); + if (cBlockIndex > 0) { + // This catches the case when we get a wrong container in the ordering + // of the containers. + Preconditions.checkState(containerIdx % cBlockIndex == 0, + "The container ID computed should match with the container index " + + "returned from cBlock Server."); + } + return containerList[containerIdx]; + } + + String getTraceID(long blockID) { + return flusher.getTraceID(dbPath, blockID); + } + + /** + * Returns tracer. + * + * @return - Logger + */ + Logger getTracer() { + return TRACER; + } + + /** + * Builder class for CBlocklocalCache. + */ + public static class Builder { + private Configuration configuration; + private String userName; + private String volumeName; + private List<Pipeline> pipelines; + private XceiverClientManager clientManager; + private int blockSize; + private long volumeSize; + private ContainerCacheFlusher flusher; + private CBlockTargetMetrics metrics; + + /** + * Ctor. + */ + Builder() { + } + + /** + * Computes a cache size based on the configuration and available disk + * space. + * + * @param configuration - Config + * @param volumeSize - Size of Volume + * @param blockSize - Size of the block + * @return - cache size in bytes. + */ + private static long computeCacheSize(Configuration configuration, + long volumeSize, int blockSize) { + long cacheSize = 0; + String dbPath = configuration.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY, + DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT); + if (StringUtils.isBlank(dbPath)) { + return cacheSize; + } + long spaceRemaining = getRemainingDiskSpace(dbPath); + double cacheRatio = 1.0; + + if (spaceRemaining < volumeSize) { + cacheRatio = (double)spaceRemaining / volumeSize; + } + + // if cache is going to be at least 10% of the volume size it is worth + // doing, otherwise skip creating the cache. + if (cacheRatio >= 0.10) { + cacheSize = Double.doubleToLongBits(volumeSize * cacheRatio); + } + return cacheSize; + } + + /** + * Sets the Config to be used by this cache. + * + * @param conf - Config + * @return Builder + */ + public Builder setConfiguration(Configuration conf) { + this.configuration = conf; + return this; + } + + /** + * Sets the user name who is the owner of this volume. + * + * @param user - name of the owner, please note this is not the current + * user name. + * @return - Builder + */ + public Builder setUserName(String user) { + this.userName = user; + return this; + } + + /** + * Sets the VolumeName. + * + * @param volume - Name of the volume + * @return Builder + */ + public Builder setVolumeName(String volume) { + this.volumeName = volume; + return this; + } + + /** + * Sets the Pipelines that form this volume. + * + * @param pipelineList - list of pipelines + * @return Builder + */ + public Builder setPipelines(List<Pipeline> pipelineList) { + this.pipelines = pipelineList; + return this; + } + + /** + * Sets the Client Manager that manages the communication with containers. + * + * @param xceiverClientManager - clientManager. + * @return - Builder + */ + public Builder setClientManager(XceiverClientManager xceiverClientManager) { + this.clientManager = xceiverClientManager; + return this; + } + + /** + * Sets the block size -- Typical sizes are 4KB, 8KB etc. + * + * @param size - BlockSize. + * @return - Builder + */ + public Builder setBlockSize(int size) { + this.blockSize = size; + return this; + } + + /** + * Sets the volumeSize. + * + * @param size - VolumeSize + * @return - Builder + */ + public Builder setVolumeSize(long size) { + this.volumeSize = size; + return this; + } + + /** + * Set flusher. + * @param containerCacheFlusher - cache Flusher + * @return Builder. + */ + public Builder setFlusher(ContainerCacheFlusher containerCacheFlusher) { + this.flusher = containerCacheFlusher; + return this; + } + + /** + * Sets the cblock Metrics. + * + * @param targetMetrics - CBlock Target Metrics + * @return - Builder + */ + public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) { + this.metrics = targetMetrics; + return this; + } + + /** + * Constructs a CBlockLocalCache. + * + * @return the CBlockLocalCache with the preset properties. + * @throws IOException + */ + public CBlockLocalCache build() throws IOException { + Preconditions.checkNotNull(this.configuration, "A valid configuration " + + "is needed"); + Preconditions.checkState(StringUtils.isNotBlank(userName), "A valid " + + "username is needed"); + Preconditions.checkState(StringUtils.isNotBlank(volumeName), " A valid" + + " volume name is needed"); + Preconditions.checkNotNull(this.pipelines, "Pipelines cannot be null"); + Preconditions.checkState(this.pipelines.size() > 0, "At least one " + + "pipeline location is needed for a volume"); + + for (int x = 0; x < pipelines.size(); x++) { + Preconditions.checkNotNull(pipelines.get(x).getData(), "cBlock " + + "relies on private data on the pipeline, null data found."); + } + + Preconditions.checkNotNull(clientManager, "Client Manager cannot be " + + "null"); + Preconditions.checkState(blockSize > 0, " Block size has to be a " + + "number greater than 0"); + + Preconditions.checkState(volumeSize > 0, "Volume Size cannot be less " + + "than 1"); + Preconditions.checkNotNull(this.flusher, "Flusher cannot be null."); + + CBlockLocalCache cache = new CBlockLocalCache(this.configuration, + this.volumeName, this.userName, this.pipelines, blockSize, + volumeSize, flusher); + cache.setCblockTargetMetrics(this.metrics); + cache.setClientManager(this.clientManager); + + // TODO : Support user configurable maximum size. + long cacheSize = computeCacheSize(this.configuration, this.volumeSize, + this.blockSize); + cache.setCurrentCacheSize(cacheSize); + return cache; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
