http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java deleted file mode 100644 index 30f87e24..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import com.google.common.primitives.Longs; -import static java.util.concurrent.TimeUnit.SECONDS; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; -import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.TimeoutException; - - -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_DISK_CACHE_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; - -/** - * Tests for Local Cache Buffer Manager. - */ -public class TestBufferManager { - private final static long GB = 1024 * 1024 * 1024; - private final static int KB = 1024; - private static MiniOzoneCluster cluster; - private static OzoneConfiguration config; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static XceiverClientManager xceiverClientManager; - - @BeforeClass - public static void init() throws IOException { - config = new OzoneConfiguration(); - String path = GenericTestUtils.getTempPath( - TestBufferManager.class.getSimpleName()); - config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - config.setBoolean(DFS_CBLOCK_TRACE_IO, true); - config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - cluster = new MiniOzoneClassicCluster.Builder(config) - .numDataNodes(1).setHandlerType("distributed").build(); - storageContainerLocationClient = cluster - .createStorageContainerLocationClient(); - xceiverClientManager = new XceiverClientManager(config); - } - - @AfterClass - public static void shutdown() throws InterruptedException { - if (cluster != null) { - cluster.shutdown(); - } - IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); - } - - /** - * createContainerAndGetPipeline creates a set of containers and returns the - * Pipelines that define those containers. - * - * @param count - Number of containers to create. - * @return - List of Pipelines. - * @throws IOException - */ - private List<Pipeline> createContainerAndGetPipeline(int count) - throws IOException { - List<Pipeline> containerPipelines = new LinkedList<>(); - for (int x = 0; x < count; x++) { - String traceID = "trace" + RandomStringUtils.randomNumeric(4); - String containerName = "container" + RandomStringUtils.randomNumeric(10); - Pipeline pipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName, "CBLOCK"); - XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); - ContainerProtocolCalls.createContainer(client, traceID); - // This step is needed since we set private data on pipelines, when we - // read the list from CBlockServer. So we mimic that action here. - pipeline.setData(Longs.toByteArray(x)); - containerPipelines.add(pipeline); - xceiverClientManager.releaseClient(client); - } - return containerPipelines; - } - - /** - * This test writes some block to the cache and then shuts down the cache. - * The cache is then restarted to check that the - * correct number of blocks are read from Dirty Log - * - * @throws IOException - */ - @Test - public void testEmptyBlockBufferHandling() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - List<Pipeline> pipelines = createContainerAndGetPipeline(10); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - // Write data to the cache - cache.put(1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(1, metrics.getNumWriteOps()); - cache.put(2, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(2, metrics.getNumWriteOps()); - - // Store the previous block buffer position - Assert.assertEquals(2, metrics.getNumBlockBufferUpdates()); - // Simulate a shutdown by closing the cache - cache.close(); - Thread.sleep(1000); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE), - metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); - Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits()); - - // Restart cache and check that right number of entries are read - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Thread fllushListenerThread = new Thread(newFlusher); - fllushListenerThread.setDaemon(true); - fllushListenerThread.start(); - - Thread.sleep(5000); - Assert.assertEquals(metrics.getNumBlockBufferUpdates(), - newMetrics.getNumDirtyLogBlockRead()); - Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() - * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); - // Now shutdown again, nothing should be flushed - newFlusher.shutdown(); - Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates()); - Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); - } - - @Test - public void testPeriodicFlush() throws IOException, - InterruptedException, TimeoutException{ - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig - .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 5, SECONDS); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread.sleep(8000); - // Ticks will be at 5s, 10s and so on, so this count should be 1 - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - // Nothing pushed to cache, so nothing should be written - Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); - cache.close(); - // After close, another trigger should happen but still no data written - Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); - } - - @Test - public void testSingleBufferFlush() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - - for (int i = 0; i < 511; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - // After writing 511 block no flush should happen - Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); - - - // After one more block it should - cache.put(512, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - Thread.sleep(1000); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - cache.close(); - Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE), - metrics.getNumBytesDirtyLogWritten()); - } - - @Test - public void testMultipleBuffersFlush() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig - .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 120, SECONDS); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - - for (int i = 0; i < 4; i++) { - for (int j = 0; j < 512; j++) { - cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8)); - } - // Flush should be triggered after every 512 block write - Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered()); - } - Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles()); - Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes()); - cache.close(); - Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE), - metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted()); - } - - @Test - public void testSingleBlockFlush() throws IOException, - InterruptedException, TimeoutException{ - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig - .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, - 5, SECONDS); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - cache.put(0, data.getBytes(StandardCharsets.UTF_8)); - Thread.sleep(8000); - // Ticks will be at 5s, 10s and so on, so this count should be 1 - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - // 1 block written to cache, which should be flushed - Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - cache.close(); - // After close, another trigger should happen but no data should be written - Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); - Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); - } - - @Test - public void testRepeatedBlockWrites() throws IOException, - InterruptedException, TimeoutException{ - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestBufferManager.class.getSimpleName() - + RandomStringUtils.randomNumeric(4)); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(createContainerAndGetPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - Thread fllushListenerThread = new Thread(flusher); - fllushListenerThread.setDaemon(true); - fllushListenerThread.start(); - cache.start(); - for (int i = 0; i < 512; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(512, metrics.getNumWriteOps()); - Assert.assertEquals(512, metrics.getNumBlockBufferUpdates()); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); - Thread.sleep(5000); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - - - for (int i = 0; i < 512; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(1024, metrics.getNumWriteOps()); - Assert.assertEquals(1024, metrics.getNumBlockBufferUpdates()); - Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); - - Thread.sleep(5000); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(2, metrics.getNumBlockBufferFlushCompleted()); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java deleted file mode 100644 index f8b05ed..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockCLI.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import org.apache.hadoop.cblock.cli.CBlockCli; -import org.apache.hadoop.cblock.meta.VolumeDescriptor; -import org.apache.hadoop.cblock.util.MockStorageClient; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.scm.client.ScmClient; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.util.List; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * A testing class for cblock command line tool. - */ -public class TestCBlockCLI { - private static final long GB = 1 * 1024 * 1024 * 1024L; - private static final int KB = 1024; - private static CBlockCli cmd; - private static OzoneConfiguration conf; - private static CBlockManager cBlockManager; - private static ByteArrayOutputStream outContent; - private static PrintStream testPrintOut; - - @BeforeClass - public static void setup() throws IOException { - outContent = new ByteArrayOutputStream(); - ScmClient storageClient = new MockStorageClient(); - conf = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestCBlockCLI.class.getSimpleName()); - File filePath = new File(path); - if (!filePath.exists() && !filePath.mkdirs()) { - throw new IOException("Unable to create test DB dir"); - } - conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat( - "/testCblockCli.dat")); - cBlockManager = new CBlockManager(conf, storageClient); - cBlockManager.start(); - testPrintOut = new PrintStream(outContent); - cmd = new CBlockCli(conf, testPrintOut); - } - - @AfterClass - public static void clean() { - if (cBlockManager != null) { - cBlockManager.stop(); - cBlockManager.join(); - cBlockManager.clean(); - } - } - - @After - public void reset() { - outContent.reset(); - } - - /** - * Test the help command. - * @throws Exception - */ - @Test - public void testCliHelp() throws Exception { - PrintStream initialStdOut = System.out; - System.setOut(testPrintOut); - String[] args = {"-h"}; - cmd.run(args); - String helpPrints = - "usage: cblock\n" + - " -c,--createVolume <user> <volume> <volumeSize in [GB/TB]> " + - "<blockSize> create a fresh new volume\n" + - " -d,--deleteVolume <user> <volume> " + - " delete a volume\n" + - " -h,--help " + - " help\n" + - " -i,--infoVolume <user> <volume> " + - " info a volume\n" + - " -l,--listVolume <user> " + - " list all volumes\n" + - " -s,--serverAddr <serverAddress>:<serverPort> " + - " specify server address:port\n"; - assertEquals(helpPrints, outContent.toString()); - outContent.reset(); - System.setOut(initialStdOut); - } - - /** - * Test volume listing command. - * @throws Exception - */ - @Test - public void testCliList() throws Exception { - String userName0 = "userTestCliList0"; - String userName1 = "userTestCliList1"; - String userTestNotExist = "userTestNotExist"; - String volumeName0 = "volumeTest0"; - String volumeName1 = "volumeTest1"; - String volumeSize0 = "30GB"; - String volumeSize1 = "40GB"; - String blockSize = Integer.toString(4); - String[] argsCreate0 = - {"-c", userName0, volumeName0, volumeSize0, blockSize}; - cmd.run(argsCreate0); - String[] argsCreate1 = - {"-c", userName0, volumeName1, volumeSize1, blockSize}; - cmd.run(argsCreate1); - String[] argsCreate2 = - {"-c", userName1, volumeName0, volumeSize0, blockSize}; - cmd.run(argsCreate2); - String[] argsList0 = {"-l"}; - cmd.run(argsList0); - String[] outExpected1 = { - "userTestCliList1:volumeTest0\t32212254720\t4096\n", - "userTestCliList0:volumeTest0\t32212254720\t4096\n", - "userTestCliList0:volumeTest1\t42949672960\t4096\n"}; - int length = 0; - for (String str : outExpected1) { - assertTrue(outContent.toString().contains(str)); - length += str.length(); - } - assertEquals(length, outContent.toString().length()); - outContent.reset(); - - String[] argsList1 = {"-l", userName1}; - cmd.run(argsList1); - String outExpected2 = "userTestCliList1:volumeTest0\t32212254720\t4096\n"; - assertEquals(outExpected2, outContent.toString()); - outContent.reset(); - - String[] argsList2 = {"-l", userTestNotExist}; - cmd.run(argsList2); - String outExpected3 = "\n"; - assertEquals(outExpected3, outContent.toString()); - } - - /** - * Test create volume command. - * @throws Exception - */ - @Test - public void testCliCreate() throws Exception { - String userName = "userTestCliCreate"; - String volumeName = "volumeTest"; - String volumeSize = "30GB"; - String blockSize = "4"; - String[] argsCreate = {"-c", userName, volumeName, volumeSize, blockSize}; - cmd.run(argsCreate); - List<VolumeDescriptor> allVolumes = cBlockManager.getAllVolumes(userName); - assertEquals(1, allVolumes.size()); - VolumeDescriptor volume = allVolumes.get(0); - assertEquals(userName, volume.getUserName()); - assertEquals(volumeName, volume.getVolumeName()); - long volumeSizeB = volume.getVolumeSize(); - assertEquals(30, (int)(volumeSizeB/ GB)); - assertEquals(4, volume.getBlockSize()/ KB); - } - - /** - * Test delete volume command. - * @throws Exception - */ - @Test - public void testCliDelete() throws Exception { - String userName = "userTestCliDelete"; - String volumeName = "volumeTest"; - String volumeSize = "30GB"; - String blockSize = "4"; - String[] argsCreate = {"-c", userName, volumeName, volumeSize, blockSize}; - cmd.run(argsCreate); - List<VolumeDescriptor> allVolumes = cBlockManager.getAllVolumes(userName); - assertEquals(1, allVolumes.size()); - VolumeDescriptor volume = allVolumes.get(0); - assertEquals(userName, volume.getUserName()); - assertEquals(volumeName, volume.getVolumeName()); - long volumeSizeB = volume.getVolumeSize(); - assertEquals(30, (int)(volumeSizeB/ GB)); - assertEquals(4, volume.getBlockSize()/ KB); - - String[] argsDelete = {"-d", userName, volumeName}; - cmd.run(argsDelete); - allVolumes = cBlockManager.getAllVolumes(userName); - assertEquals(0, allVolumes.size()); - } - - /** - * Test info volume command. - * @throws Exception - */ - @Test - public void testCliInfoVolume() throws Exception { - String userName0 = "userTestCliInfo"; - String volumeName0 = "volumeTest0"; - String volumeSize = "8000GB"; - String blockSize = "4"; - String[] argsCreate0 = { - "-c", userName0, volumeName0, volumeSize, blockSize}; - cmd.run(argsCreate0); - String[] argsInfo = {"-i", userName0, volumeName0}; - cmd.run(argsInfo); - // TODO : the usage field is not implemented yet, always 0 now. - String outExpected = " userName:userTestCliInfo " + - "volumeName:volumeTest0 " + - "volumeSize:8589934592000 " + - "blockSize:4096 (sizeInBlocks:2097152000) usageInBlocks:0\n"; - assertEquals(outExpected, outContent.toString()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java deleted file mode 100644 index cdd65c5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import com.google.common.primitives.Longs; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; -import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_DISK_CACHE_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; - -/** - * Tests for Cblock read write functionality. - */ -public class TestCBlockReadWrite { - private final static long GB = 1024 * 1024 * 1024; - private final static int KB = 1024; - private static MiniOzoneCluster cluster; - private static OzoneConfiguration config; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static XceiverClientManager xceiverClientManager; - - @BeforeClass - public static void init() throws IOException { - config = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestCBlockReadWrite.class.getSimpleName()); - config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - config.setBoolean(DFS_CBLOCK_TRACE_IO, true); - config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - cluster = new MiniOzoneClassicCluster.Builder(config) - .numDataNodes(1) - .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); - storageContainerLocationClient = cluster - .createStorageContainerLocationClient(); - xceiverClientManager = new XceiverClientManager(config); - } - - @AfterClass - public static void shutdown() throws InterruptedException { - if (cluster != null) { - cluster.shutdown(); - } - IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); - } - - /** - * getContainerPipelines creates a set of containers and returns the - * Pipelines that define those containers. - * - * @param count - Number of containers to create. - * @return - List of Pipelines. - * @throws IOException throws Exception - */ - private List<Pipeline> getContainerPipeline(int count) throws IOException { - List<Pipeline> containerPipelines = new LinkedList<>(); - for (int x = 0; x < count; x++) { - String traceID = "trace" + RandomStringUtils.randomNumeric(4); - String containerName = "container" + RandomStringUtils.randomNumeric(10); - Pipeline pipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName, "CBLOCK"); - XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); - ContainerProtocolCalls.createContainer(client, traceID); - // This step is needed since we set private data on pipelines, when we - // read the list from CBlockServer. So we mimic that action here. - pipeline.setData(Longs.toByteArray(x)); - containerPipelines.add(pipeline); - } - return containerPipelines; - } - - /** - * This test creates a cache and performs a simple write / read. - * The operations are done by bypassing the cache. - * - * @throws IOException - */ - @Test - public void testDirectIO() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration cConfig = new OzoneConfiguration(); - cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - final long blockID = 0; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - String dataHash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(cConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Assert.assertFalse(cache.isShortCircuitIOEnabled()); - cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(1, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(1, metrics.getNumWriteOps()); - // Please note that this read is directly from remote container - LogicalBlock block = cache.get(blockID); - Assert.assertEquals(1, metrics.getNumReadOps()); - Assert.assertEquals(0, metrics.getNumReadCacheHits()); - Assert.assertEquals(1, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); - - cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(2, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(2, metrics.getNumWriteOps()); - Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); - // Please note that this read is directly from remote container - block = cache.get(blockID + 1); - Assert.assertEquals(2, metrics.getNumReadOps()); - Assert.assertEquals(0, metrics.getNumReadCacheHits()); - Assert.assertEquals(2, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match.", dataHash, readHash); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - } - - /** - * This test writes some block to the cache and then shuts down the cache - * The cache is then restarted with "short.circuit.io" disable to check - * that the blocks are read correctly from the container. - * - * @throws IOException - */ - @Test - public void testContainerWrites() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestCBlockReadWrite.class.getSimpleName()); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 3, - TimeUnit.SECONDS); - XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - - int numUniqueBlocks = 4; - String[] data = new String[numUniqueBlocks]; - String[] dataHash = new String[numUniqueBlocks]; - for (int i = 0; i < numUniqueBlocks; i++) { - data[i] = RandomStringUtils.random(4 * KB); - dataHash[i] = DigestUtils.sha256Hex(data[i]); - } - - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xcm, metrics); - List<Pipeline> pipelines = getContainerPipeline(10); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xcm) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread flushListenerThread = new Thread(flusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - Assert.assertTrue(cache.isShortCircuitIOEnabled()); - // Write data to the cache - for (int i = 0; i < 512; i++) { - cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8)); - } - // Close the cache and flush the data to the containers - cache.close(); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(512, metrics.getNumWriteOps()); - Thread.sleep(3000); - flusher.shutdown(); - Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1); - Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); - // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xcm) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Assert.assertFalse(newCache.isShortCircuitIOEnabled()); - // this read will be from the container, also match the hash - for (int i = 0; i < 512; i++) { - LogicalBlock block = newCache.get(i); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match, for index:" - + i, dataHash[i % numUniqueBlocks], readHash); - } - Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); - Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks()); - newCache.close(); - newFlusher.shutdown(); - } - - @Test - public void testRetryLog() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestCBlockReadWrite.class.getSimpleName()); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, - 3, - TimeUnit.SECONDS); - - int numblocks = 10; - flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - - List<Pipeline> fakeContainerPipelines = new LinkedList<>(); - PipelineChannel pipelineChannel = new PipelineChannel("fake", - LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, - "fake"); - Pipeline fakePipeline = new Pipeline("fake", pipelineChannel); - fakePipeline.setData(Longs.toByteArray(1)); - fakeContainerPipelines.add(fakePipeline); - - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(fakeContainerPipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - Thread flushListenerThread = new Thread(flusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - - for (int i = 0; i < numblocks; i++) { - cache.put(i, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(numblocks, metrics.getNumWriteOps()); - Thread.sleep(3000); - - // all the writes to the container will fail because of fake pipelines - Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead()); - Assert.assertTrue( - metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks); - Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites()); - Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); - cache.close(); - flusher.shutdown(); - - // restart cache with correct pipelines, now blocks should be uploaded - // correctly - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Thread newFlushListenerThread = new Thread(newFlusher); - newFlushListenerThread.setDaemon(true); - newFlushListenerThread.start(); - Thread.sleep(3000); - Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks); - Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks()); - Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks()); - Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java deleted file mode 100644 index b5695fc..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.scm.client.ScmClient; -import org.apache.hadoop.cblock.util.MockStorageClient; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * This class tests the basics of CBlock server. Mainly about the four - * operations on volumes: create, delete, info and list. - */ -public class TestCBlockServer { - private static CBlockManager cBlockManager; - private static OzoneConfiguration conf; - - @Before - public void setup() throws Exception { - ScmClient storageClient = new MockStorageClient(); - conf = new OzoneConfiguration(); - conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); - cBlockManager = new CBlockManager(conf, storageClient); - cBlockManager.start(); - } - - @After - public void clean() { - cBlockManager.stop(); - cBlockManager.join(); - cBlockManager.clean(); - } - - /** - * Test create volume for different users. - * @throws Exception - */ - @Test - public void testCreateVolume() throws Exception { - String userName1 = "user" + RandomStringUtils.randomNumeric(5); - String userName2 = "user" + RandomStringUtils.randomNumeric(5); - String volumeName1 = "volume" + RandomStringUtils.randomNumeric(5); - String volumeName2 = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - cBlockManager.createVolume(userName1, volumeName1, volumeSize, blockSize); - List<VolumeInfo> volumes = cBlockManager.listVolume(userName1); - assertEquals(1, volumes.size()); - VolumeInfo existingVolume = volumes.get(0); - assertEquals(userName1, existingVolume.getUserName()); - assertEquals(volumeName1, existingVolume.getVolumeName()); - assertEquals(volumeSize, existingVolume.getVolumeSize()); - assertEquals(blockSize, existingVolume.getBlockSize()); - - cBlockManager.createVolume(userName1, volumeName2, volumeSize, blockSize); - cBlockManager.createVolume(userName2, volumeName1, volumeSize, blockSize); - volumes = cBlockManager.listVolume(userName1); - assertEquals(2, volumes.size()); - volumes = cBlockManager.listVolume(userName2); - assertEquals(1, volumes.size()); - } - - /** - * Test delete volume. - * @throws Exception - */ - @Test - public void testDeleteVolume() throws Exception { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName1 = "volume" + RandomStringUtils.randomNumeric(5); - String volumeName2 = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - cBlockManager.createVolume(userName, volumeName1, volumeSize, blockSize); - cBlockManager.createVolume(userName, volumeName2, volumeSize, blockSize); - cBlockManager.deleteVolume(userName, volumeName1, true); - List<VolumeInfo> volumes = cBlockManager.listVolume(userName); - assertEquals(1, volumes.size()); - - VolumeInfo existingVolume = volumes.get(0); - assertEquals(userName, existingVolume.getUserName()); - assertEquals(volumeName2, existingVolume.getVolumeName()); - assertEquals(volumeSize, existingVolume.getVolumeSize()); - assertEquals(blockSize, existingVolume.getBlockSize()); - } - - /** - * Test info volume. - * - * TODO : usage field is not being tested (as it is not implemented yet) - * @throws Exception - */ - @Test - public void testInfoVolume() throws Exception { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - cBlockManager.createVolume(userName, volumeName, volumeSize, blockSize); - VolumeInfo info = cBlockManager.infoVolume(userName, volumeName); - assertEquals(userName, info.getUserName()); - assertEquals(volumeName, info.getVolumeName()); - assertEquals(volumeSize, info.getVolumeSize()); - assertEquals(blockSize, info.getBlockSize()); - } - - /** - * Test listing a number of volumes. - * @throws Exception - */ - @Test - public void testListVolume() throws Exception { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName ="volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - int volumeNum = 100; - for (int i = 0; i<volumeNum; i++) { - cBlockManager.createVolume(userName, volumeName + i, - volumeSize, blockSize); - } - List<VolumeInfo> volumes = cBlockManager.listVolume(userName); - assertEquals(volumeNum, volumes.size()); - Set<String> volumeIds = new HashSet<>(); - for (int i = 0; i<volumeNum; i++) { - VolumeInfo volumeInfo = volumes.get(i); - assertEquals(userName, volumeInfo.getUserName()); - assertFalse(volumeIds.contains(volumeName + i)); - volumeIds.add(volumeName + i); - assertEquals(volumeSize, volumeInfo.getVolumeSize()); - assertEquals(blockSize, volumeInfo.getBlockSize()); - } - for (int i = 0; i<volumeNum; i++) { - assertTrue(volumeIds.contains(volumeName + i)); - } - } - - /** - * Test listing a number of volumes. - * @throws Exception - */ - @Test - public void testListVolumes() throws Exception { - String volumeName ="volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 1L*1024*1024; - int blockSize = 4096; - int volumeNum = 100; - int userCount = 10; - - assertTrue("We need at least one volume for each user", - userCount < volumeNum); - - for (int i = 0; i<volumeNum; i++) { - String userName = - "user-" + (i % userCount); - cBlockManager.createVolume(userName, volumeName + i, - volumeSize, blockSize); - } - List<VolumeInfo> allVolumes = cBlockManager.listVolumes(); - //check if we have the volumes from all the users. - - Set<String> volumeIds = new HashSet<>(); - Set<String> usernames = new HashSet<>(); - for (int i = 0; i < allVolumes.size(); i++) { - VolumeInfo volumeInfo = allVolumes.get(i); - assertFalse(volumeIds.contains(volumeName + i)); - usernames.add(volumeInfo.getUserName()); - volumeIds.add(volumeName + i); - assertEquals(volumeSize, volumeInfo.getVolumeSize()); - assertEquals(blockSize, volumeInfo.getBlockSize()); - } - - assertEquals(volumeNum, volumeIds.size()); - for (int i = 0; i<volumeNum; i++) { - assertTrue(volumeIds.contains(volumeName + i)); - } - - assertEquals(userCount, usernames.size()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java deleted file mode 100644 index c6896a1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServerPersistence.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import org.apache.hadoop.cblock.meta.VolumeDescriptor; -import org.apache.hadoop.scm.client.ScmClient; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.cblock.util.MockStorageClient; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.List; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; -import static org.junit.Assert.assertEquals; - -/** - * Test the CBlock server state is maintained in persistent storage and can be - * recovered on CBlock server restart. - */ -public class TestCBlockServerPersistence { - - /** - * Test when cblock server fails with volume meta data, the meta data can be - * restored correctly. - * @throws Exception - */ - @Test - public void testWriteToPersistentStore() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - String userName = "testWriteToPersistentStore"; - String volumeName1 = "testVolume1"; - String volumeName2 = "testVolume2"; - long volumeSize1 = 30L*1024*1024*1024; - long volumeSize2 = 15L*1024*1024*1024; - int blockSize = 4096; - CBlockManager cBlockManager = null; - CBlockManager cBlockManager1 = null; - String path = GenericTestUtils - .getTempPath(TestCBlockServerPersistence.class.getSimpleName()); - File filePath = new File(path); - if(!filePath.exists() && !filePath.mkdirs()) { - throw new IOException("Unable to create test DB dir"); - } - conf.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat( - "/testCblockPersistence.dat")); - try { - ScmClient storageClient = new MockStorageClient(); - conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); - conf.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); - cBlockManager = new CBlockManager(conf, storageClient); - cBlockManager.start(); - cBlockManager.createVolume(userName, volumeName1, volumeSize1, blockSize); - cBlockManager.createVolume(userName, volumeName2, volumeSize2, blockSize); - List<VolumeDescriptor> allVolumes = cBlockManager.getAllVolumes(); - // close the cblock server. Since meta data is written to disk on volume - // creation, closing server here is the same as a cblock server crash. - cBlockManager.close(); - cBlockManager.stop(); - cBlockManager.join(); - cBlockManager = null; - assertEquals(2, allVolumes.size()); - VolumeDescriptor volumeDescriptor1 = allVolumes.get(0); - VolumeDescriptor volumeDescriptor2 = allVolumes.get(1); - - // create a new cblock server instance. This is just the - // same as restarting cblock server. - ScmClient storageClient1 = new MockStorageClient(); - OzoneConfiguration conf1 = new OzoneConfiguration(); - conf1.set(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, path.concat( - "/testCblockPersistence.dat")); - conf1.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0"); - conf1.set(DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, "127.0.0.1:0"); - cBlockManager1 = new CBlockManager(conf1, storageClient1); - cBlockManager1.start(); - List<VolumeDescriptor> allVolumes1 = cBlockManager1.getAllVolumes(); - assertEquals(2, allVolumes1.size()); - VolumeDescriptor newvolumeDescriptor1 = allVolumes1.get(0); - VolumeDescriptor newvolumeDescriptor2 = allVolumes1.get(1); - - // It seems levelDB iterator gets keys in the same order as keys - // are inserted, in which case the else clause should never happen. - // But still kept the second clause if it is possible to get different - // key ordering from leveldb. And we do not rely on the ordering of keys - // here. - if (volumeDescriptor1.getVolumeName().equals( - newvolumeDescriptor1.getVolumeName())) { - assertEquals(volumeDescriptor1.toString(), - newvolumeDescriptor1.toString()); - assertEquals(volumeDescriptor2.toString(), - newvolumeDescriptor2.toString()); - } else { - assertEquals(volumeDescriptor1.toString(), - newvolumeDescriptor2.toString()); - assertEquals(volumeDescriptor2.toString(), - newvolumeDescriptor1.toString()); - } - } finally { - if (cBlockManager != null) { - cBlockManager.clean(); - } - if (cBlockManager1 != null) { - cBlockManager1.close(); - cBlockManager1.stop(); - cBlockManager1.join(); - cBlockManager1.clean(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java deleted file mode 100644 index 7749432..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ /dev/null @@ -1,444 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import com.google.common.primitives.Longs; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl; -import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; -import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.Time; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.lang.Math.abs; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_DISK_CACHE_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys. - DFS_CBLOCK_TRACE_IO; - -/** - * Tests for local cache. - */ -public class TestLocalBlockCache { - private static final Logger LOG = - LoggerFactory.getLogger(TestLocalBlockCache.class); - private final static long GB = 1024 * 1024 * 1024; - private final static int KB = 1024; - private static MiniOzoneCluster cluster; - private static OzoneConfiguration config; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static XceiverClientManager xceiverClientManager; - - @BeforeClass - public static void init() throws IOException { - config = new OzoneConfiguration(); - String path = GenericTestUtils - .getTempPath(TestLocalBlockCache.class.getSimpleName()); - config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - config.setBoolean(DFS_CBLOCK_TRACE_IO, true); - config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - cluster = new MiniOzoneClassicCluster.Builder(config) - .numDataNodes(1) - .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); - storageContainerLocationClient = cluster - .createStorageContainerLocationClient(); - xceiverClientManager = new XceiverClientManager(config); - } - - @AfterClass - public static void shutdown() throws InterruptedException { - if (cluster != null) { - cluster.shutdown(); - } - IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); - } - - /** - * getContainerPipelines creates a set of containers and returns the - * Pipelines that define those containers. - * - * @param count - Number of containers to create. - * @return - List of Pipelines. - * @throws IOException throws Exception - */ - private List<Pipeline> getContainerPipeline(int count) throws IOException { - List<Pipeline> containerPipelines = new LinkedList<>(); - for (int x = 0; x < count; x++) { - String traceID = "trace" + RandomStringUtils.randomNumeric(4); - String containerName = "container" + RandomStringUtils.randomNumeric(10); - Pipeline pipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName, "CBLOCK"); - XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); - ContainerProtocolCalls.createContainer(client, traceID); - // This step is needed since we set private data on pipelines, when we - // read the list from CBlockServer. So we mimic that action here. - pipeline.setData(Longs.toByteArray(x)); - containerPipelines.add(pipeline); - xceiverClientManager.releaseClient(client); - } - return containerPipelines; - } - - /** - * This test creates a cache and performs a simple write / read. - * Due to the cache - we have Read-after-write consistency for cBlocks. - * - * @throws IOException throws Exception - */ - @Test - public void testCacheWriteRead() throws IOException, - InterruptedException, TimeoutException { - final long blockID = 0; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - String dataHash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(1, metrics.getNumWriteOps()); - // Please note that this read is from the local cache. - LogicalBlock block = cache.get(blockID); - Assert.assertEquals(1, metrics.getNumReadOps()); - Assert.assertEquals(1, metrics.getNumReadCacheHits()); - Assert.assertEquals(0, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - - cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(2, metrics.getNumWriteOps()); - // Please note that this read is from the local cache. - block = cache.get(blockID + 1); - Assert.assertEquals(2, metrics.getNumReadOps()); - Assert.assertEquals(2, metrics.getNumReadCacheHits()); - Assert.assertEquals(0, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - String readHash = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("File content does not match.", dataHash, readHash); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - - } - - @Test - public void testCacheWriteToRemoteContainer() throws IOException, - InterruptedException, TimeoutException { - final long blockID = 0; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - } - - @Test - public void testCacheWriteToRemote50KBlocks() throws IOException, - InterruptedException, TimeoutException { - final long totalBlocks = 50 * 1000; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * 1024) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - long startTime = Time.monotonicNow(); - for (long blockid = 0; blockid < totalBlocks; blockid++) { - cache.put(blockid, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(totalBlocks, metrics.getNumWriteOps()); - Assert.assertEquals(totalBlocks, metrics.getNumBlockBufferUpdates()); - LOG.info("Wrote 50K blocks, waiting for replication to finish."); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - long endTime = Time.monotonicNow(); - LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks, - TimeUnit.MILLISECONDS.toSeconds(endTime - startTime)); - // TODO: Read this data back. - cache.close(); - } - - @Test - public void testCacheInvalidBlock() throws IOException { - final int blockID = 1024; - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - // Read a non-existent block ID. - LogicalBlock block = cache.get(blockID); - Assert.assertNotNull(block); - Assert.assertEquals(4 * 1024, block.getData().array().length); - Assert.assertEquals(1, metrics.getNumReadOps()); - Assert.assertEquals(1, metrics.getNumReadLostBlocks()); - Assert.assertEquals(1, metrics.getNumReadCacheMiss()); - cache.close(); - } - - @Test - public void testReadWriteCorrectness() throws IOException, - InterruptedException, TimeoutException { - Random r = new Random(); - final int maxBlock = 12500000; - final int blockCount = 10 * 1000; - Map<Long, String> blockShaMap = new HashMap<>(); - List<Pipeline> pipelines = getContainerPipeline(10); - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - final CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - for (int x = 0; x < blockCount; x++) { - String data = RandomStringUtils.random(4 * 1024); - String dataHash = DigestUtils.sha256Hex(data); - long blockId = abs(r.nextInt(maxBlock)); - blockShaMap.put(blockId, dataHash); - cache.put(blockId, data.getBytes(StandardCharsets.UTF_8)); - } - Assert.assertEquals(blockCount, metrics.getNumWriteOps()); - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - LOG.info("Finished with putting blocks ..starting reading blocks back. " + - "unique blocks : {}", blockShaMap.size()); - // Test reading from local cache. - for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) { - LogicalBlock block = cache.get(entry.getKey()); - String blockSha = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("Block data is not equal", entry.getValue(), - blockSha); - } - Assert.assertEquals(blockShaMap.size(), metrics.getNumReadOps()); - Assert.assertEquals(blockShaMap.size(), metrics.getNumReadCacheHits()); - Assert.assertEquals(0, metrics.getNumReadCacheMiss()); - Assert.assertEquals(0, metrics.getNumReadLostBlocks()); - - LOG.info("Finished with reading blocks, SUCCESS."); - // Close and discard local cache. - cache.close(); - LOG.info("Closing the and destroying local cache"); - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newflusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, newMetrics); - Assert.assertEquals(0, newMetrics.getNumReadCacheHits()); - CBlockLocalCache newCache = null; - try { - newCache = CBlockLocalCache.newBuilder() - .setConfiguration(this.config) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newflusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) { - LogicalBlock block = newCache.get(entry.getKey()); - String blockSha = DigestUtils.sha256Hex(block.getData().array()); - Assert.assertEquals("Block data is not equal", entry.getValue(), - blockSha); - } - - Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadOps()); - Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadCacheHits()); - Assert.assertEquals(0, newMetrics.getNumReadCacheMiss()); - Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); - - LOG.info("Finished with reading blocks from remote cache, SUCCESS."); - } finally { - if (newCache != null) { - newCache.close(); - } - } - } - - @Test - public void testStorageImplReadWrite() throws IOException, - InterruptedException, TimeoutException { - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 50L * (1024L * 1024L * 1024L); - int blockSize = 4096; - byte[] data = - RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024)) - .getBytes(StandardCharsets.UTF_8); - String hash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config, - xceiverClientManager, metrics); - CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() - .setUserName(userName) - .setVolumeName(volumeName) - .setVolumeSize(volumeSize) - .setBlockSize(blockSize) - .setContainerList(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setConf(this.config) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - ozoneStore.write(data, 0); - - byte[] newData = new byte[10 * 1024 * 1024]; - ozoneStore.read(newData, 0); - String newHash = DigestUtils.sha256Hex(newData); - Assert.assertEquals("hashes don't match.", hash, newHash); - GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(), - 100, 20 * 1000); - ozoneStore.close(); - } - - //@Test - // Disabling this test for time being since the bug in JSCSI - // forces us always to have a local cache. - public void testStorageImplNoLocalCache() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration oConfig = new OzoneConfiguration(); - oConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); - oConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - String userName = "user" + RandomStringUtils.randomNumeric(5); - String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - long volumeSize = 50L * (1024L * 1024L * 1024L); - int blockSize = 4096; - byte[] data = - RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024)) - .getBytes(StandardCharsets.UTF_8); - String hash = DigestUtils.sha256Hex(data); - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig, - xceiverClientManager, metrics); - CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() - .setUserName(userName) - .setVolumeName(volumeName) - .setVolumeSize(volumeSize) - .setBlockSize(blockSize) - .setContainerList(getContainerPipeline(10)) - .setClientManager(xceiverClientManager) - .setConf(oConfig) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - ozoneStore.write(data, 0); - - byte[] newData = new byte[10 * 1024 * 1024]; - ozoneStore.read(newData, 0); - String newHash = DigestUtils.sha256Hex(newData); - Assert.assertEquals("hashes don't match.", hash, newHash); - GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(), - 100, 20 * 1000); - ozoneStore.close(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java deleted file mode 100644 index 02c23ff..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/kubernetes/TestDynamicProvisioner.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.cblock.kubernetes; - -import io.kubernetes.client.JSON; -import io.kubernetes.client.models.V1PersistentVolume; -import io.kubernetes.client.models.V1PersistentVolumeClaim; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_ISCSI_ADVERTISED_IP; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.junit.Assert; -import org.junit.Test; - -import java.nio.file.Files; -import java.nio.file.Paths; - -/** - * Test the resource generation of Dynamic Provisioner. - */ -public class TestDynamicProvisioner { - - @Test - public void persitenceVolumeBuilder() throws Exception { - - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setStrings(DFS_CBLOCK_ISCSI_ADVERTISED_IP, "1.2.3.4"); - - DynamicProvisioner provisioner = - new DynamicProvisioner(conf, null); - - String pvc = new String(Files.readAllBytes( - Paths.get(getClass().getResource( - "/dynamicprovisioner/input1-pvc.json").toURI()))); - - String pv = new String(Files.readAllBytes( - Paths.get(getClass().getResource( - "/dynamicprovisioner/expected1-pv.json").toURI()))); - - JSON json = new io.kubernetes.client.JSON(); - - V1PersistentVolumeClaim claim = - json.getGson().fromJson(pvc, V1PersistentVolumeClaim.class); - - String volumeName = provisioner.createVolumeName(claim); - - V1PersistentVolume volume = - provisioner.persitenceVolumeBuilder(claim, volumeName); - - //remove the data which should not been compared - V1PersistentVolume expectedVolume = - json.getGson().fromJson(pv, V1PersistentVolume.class); - - - Assert.assertEquals(expectedVolume, volume); - } - -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org