http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java deleted file mode 100644 index e6a1229..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java +++ /dev/null @@ -1,1045 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.TieredCompactionJUnitTest.TestHoplog; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil; -import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference; -import com.gemstone.gemfire.internal.util.BlobHelper; -import com.gemstone.gemfire.test.junit.categories.HoplogTest; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest; - -import dunit.DistributedTestCase; -import dunit.DistributedTestCase.ExpectedException; -@Category({IntegrationTest.class, HoplogTest.class}) -public class HdfsSortedOplogOrganizerJUnitTest extends BaseHoplogTestCase { - /** - * Tests flush operation - */ - public void testFlush() throws Exception { - int count = 10; - int bucketId = (int) System.nanoTime(); - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - // flush and create hoplog - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - for (int i = 0; i < count; i++) { - items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime()))); - } - organizer.flush(items.iterator(), count); - - // check file existence in bucket directory - FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - - // only one hoplog should exists - assertEquals(1, hoplogs.length); - - assertEquals(count, organizer.sizeEstimate()); - assertEquals(0, stats.getActiveReaderCount()); - } - - /** - * Tests reads from a set of hoplogs containing both valid and stale KVs - */ - public void testReopen() throws Exception { - int bucketId = (int) System.nanoTime(); - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - // flush and create hoplog - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - for (int i = 0; i < 100; i++) { - items.add(new TestEvent("" + i, ("1-1"))); - } - organizer.flush(items.iterator(), items.size()); - - Hoplog hoplog = organizer.getSortedOplogs().iterator().next().get(); - byte[] keyBytes1 = BlobHelper.serializeToBlob("1"); - hoplog.close(); - - for (int i = 0; i < 10; i++) { - Path path = new Path(testDataDir, getName() + "/" + bucketId + "/" + hoplog.getFileName()); - HFileSortedOplog oplog = new HFileSortedOplog(hdfsStore, path, blockCache, stats, storeStats); - oplog.getReader().read(keyBytes1); - oplog.close(false); - } - } - - /** - * Tests reads from a set of hoplogs containing both valid and stale KVs - */ - public void testRead() throws Exception { - doRead(regionManager); - } - -// public void testNewReaderWithNameNodeHA() throws Exception { -// deleteMiniClusterDir(); -// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort(); -// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort(); -// -// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port); -// initClientHAConf(nn1port, nn2port); -// -// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1"); -// regionfactory.setHDFSStoreName(store1.getName()); -// Region<Object, Object> region1 = regionfactory.create("region-1"); -// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager(); -// -// HoplogOrganizer<SortedHoplogPersistedEvent> organizer = doRead(regionManager1); -// organizer.close(); -// -// dunit.DistributedTestCase.ExpectedException ex = DistributedTestCase.addExpectedException("java.io.EOFException"); -// NameNode nnode2 = cluster.getNameNode(1); -// assertTrue(nnode2.isStandbyState()); -// cluster.shutdownNameNode(0); -// cluster.transitionToActive(1); -// assertFalse(nnode2.isStandbyState()); -// -// organizer = new HdfsSortedOplogOrganizer(regionManager1, 0); -// byte[] keyBytes1 = BlobHelper.serializeToBlob("1"); -// byte[] keyBytes3 = BlobHelper.serializeToBlob("3"); -// byte[] keyBytes4 = BlobHelper.serializeToBlob("4"); -// assertEquals("2-1", organizer.read(keyBytes1).getValue()); -// assertEquals("3-3", organizer.read(keyBytes3).getValue()); -// assertEquals("1-4", organizer.read(keyBytes4).getValue()); -// ex.remove(); -// -// region1.destroyRegion(); -// store1.destroy(); -// cluster.shutdown(); -// FileUtils.deleteDirectory(new File("hdfs-test-cluster")); -// } - -// public void testActiveReaderWithNameNodeHA() throws Exception { -// deleteMiniClusterDir(); -// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort(); -// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort(); -// -// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port); -// initClientHAConf(nn1port, nn2port); -// -// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1"); -// regionfactory.setHDFSStoreName(store1.getName()); -// Region<Object, Object> region1 = regionfactory.create("region-1"); -// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager(); -// -// HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager1, 0); -// ArrayList<TestEvent> items = new ArrayList<TestEvent>(); -// for (int i = 100000; i < 101000; i++) { -// items.add(new TestEvent(("" + i), (i + " some string " + i))); -// } -// organizer.flush(items.iterator(), items.size()); -// organizer.getSortedOplogs().get(0).get().getReader(); -// -// dunit.DistributedTestCase.ExpectedException ex = DistributedTestCase.addExpectedException("java.io.EOFException"); -// NameNode nnode2 = cluster.getNameNode(1); -// assertTrue(nnode2.isStandbyState()); -// cluster.shutdownNameNode(0); -// cluster.transitionToActive(1); -// assertFalse(nnode2.isStandbyState()); -// -// for (int i = 100000; i < 100500; i++) { -// byte[] keyBytes1 = BlobHelper.serializeToBlob("" + i); -// assertEquals(i + " some string " + i, organizer.read(keyBytes1).getValue()); -// } -// ex.remove(); -// region1.destroyRegion(); -// store1.destroy(); -// cluster.shutdown(); -// FileUtils.deleteDirectory(new File("hdfs-test-cluster")); -// } - -// public void testFlushWithNameNodeHA() throws Exception { -// deleteMiniClusterDir(); -// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort(); -// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort(); -// -// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port); -// -// initClientHAConf(nn1port, nn2port); -// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1"); -// -// regionfactory.setHDFSStoreName(store1.getName()); -// Region<Object, Object> region1 = regionfactory.create("region-1"); -// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager(); -// -// HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(regionManager1, 0); -// ArrayList<TestEvent> items = new ArrayList<TestEvent>(); -// items.add(new TestEvent(("1"), ("1-1"))); -// organizer.flush(items.iterator(), items.size()); -// -// dunit.DistributedTestCase.ExpectedException ex = DistributedTestCase.addExpectedException("java.io.EOFException"); -// NameNode nnode2 = cluster.getNameNode(1); -// assertTrue(nnode2.isStandbyState()); -// cluster.shutdownNameNode(0); -// cluster.transitionToActive(1); -// assertFalse(nnode2.isStandbyState()); -// -// items.add(new TestEvent(("4"), ("1-4"))); -// organizer.flush(items.iterator(), items.size()); -// byte[] keyBytes1 = BlobHelper.serializeToBlob("1"); -// byte[] keyBytes4 = BlobHelper.serializeToBlob("4"); -// assertEquals("1-1", organizer.read(keyBytes1).getValue()); -// assertEquals("1-4", organizer.read(keyBytes4).getValue()); -// ex.remove(); -// -// region1.destroyRegion(); -// store1.destroy(); -// cluster.shutdown(); -// FileUtils.deleteDirectory(new File("hdfs-test-cluster")); -// } - - public HoplogOrganizer<SortedHoplogPersistedEvent> doRead(HdfsRegionManager rm) throws Exception { - HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(rm, 0); - - // flush and create hoplog - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("1"), ("1-1"))); - items.add(new TestEvent(("4"), ("1-4"))); - organizer.flush(items.iterator(), items.size()); - - items.clear(); - items.add(new TestEvent(("1"), ("2-1"))); - items.add(new TestEvent(("3"), ("2-3"))); - organizer.flush(items.iterator(), items.size()); - - items.clear(); - items.add(new TestEvent(("3"), ("3-3"))); - items.add(new TestEvent(("5"), ("3-5"))); - organizer.flush(items.iterator(), items.size()); - - // check file existence in bucket directory - FileStatus[] hoplogs = getBucketHoplogs(rm.getStore().getFileSystem(), - rm.getRegionFolder() + "/" + 0, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - - // expect 3 files are 3 flushes - assertEquals(3, hoplogs.length); - byte[] keyBytes1 = BlobHelper.serializeToBlob("1"); - byte[] keyBytes3 = BlobHelper.serializeToBlob("3"); - byte[] keyBytes4 = BlobHelper.serializeToBlob("4"); - // expect key 1 from hoplog 2 - assertEquals("2-1", organizer.read(keyBytes1).getValue()); - // expect key 3 from hoplog 3 - assertEquals("3-3", organizer.read(keyBytes3).getValue()); - // expect key 4 from hoplog 1 - assertEquals("1-4", organizer.read(keyBytes4).getValue()); - return organizer; - } - - /** - * Tests bucket organizer initialization during startup. Existing hoplogs should identified and - * returned - */ - public void testHoplogIdentification() throws Exception { - // create one empty file and one directories in bucket directory - Path bucketPath = new Path(testDataDir, getName() + "/0"); - FileSystem fs = hdfsStore.getFileSystem(); - fs.createNewFile(new Path(bucketPath, "temp_file")); - fs.mkdirs(new Path(bucketPath, "temp_dir")); - - // create 2 hoplogs files each of type flush, minor and major hoplog - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION}; - for (String string : extensions) { - Hoplog oplog = organizer.getTmpSortedOplog(null, string); - createHoplog(0, oplog); - organizer.makeLegitimate(oplog); - } - - // create a temp hoplog - Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION); - createHoplog(0, oplog); - - // bucket directory should have 6 hoplogs, 1 temp log, 1 misc file and 1 directory - FileStatus[] results = fs.listStatus(bucketPath); - assertEquals(9, results.length); - - // only two are hoplogs - List<Hoplog> list = organizer.identifyAndLoadSortedOplogs(true); - assertEquals(6, list.size()); - } - - public void testExpiryMarkerIdentification() throws Exception { - // epxired hoplogs from the list below should be deleted - String[] files = { - "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, - "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, - "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION, - "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION, - "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION }; - - Path bucketPath = new Path(testDataDir, getName() + "/0"); - FileSystem fs = hdfsStore.getFileSystem(); - for (String file : files) { - Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file), - blockCache, stats, storeStats); - createHoplog(10, oplog); - } - - String marker1 = "0-4-1234" - + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION - + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION; - fs.createNewFile(new Path(bucketPath, marker1)); - String marker2 = "0-5-1235" - + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION - + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION; - fs.createNewFile(new Path(bucketPath, marker2)); - - FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", ""); - assertEquals(7, hoplogs.length); - - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer( - regionManager, 0); - - FileStatus[] markers = organizer.getExpiryMarkers(); - // one hoplog and one exp marker will be deletion targets - assertEquals(2, markers.length); - for (FileStatus marker : markers) { - String name = marker.getPath().getName(); - assertTrue(name.equals(marker1) || name.equals(marker2)); - } - organizer.close(); - } - - public void testExpiredHoplogCleanup() throws Exception { - // epxired hoplogs from the list below should be deleted - String[] files = { - "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, - "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, - "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION - + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION, - - "0-2-0000" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, - "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, - - "0-3-0000" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION, - "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION, - "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION - + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION, - - "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION }; - - Path bucketPath = new Path(testDataDir, getName() + "/0"); - FileSystem fs = hdfsStore.getFileSystem(); - for (String file : files) { - if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) { - fs.createNewFile(new Path(bucketPath, file)); - continue; - } - Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file), - blockCache, stats, storeStats); - createHoplog(10, oplog); - } - - FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", ""); - assertEquals(9, hoplogs.length); - - long target = System.currentTimeMillis(); - TimeUnit.SECONDS.sleep(1); - - // all but minor compacted files from below this will not be deleted as it - // is after target delete time - files = new String[] { - "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION - + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION, - - "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION - + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION, - "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION, - - "0-6-6666" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION - }; - for (String file : files) { - if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) { - fs.createNewFile(new Path(bucketPath, file)); - continue; - } - Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file), - blockCache, stats, storeStats); - createHoplog(10, oplog); - } - - hoplogs = getBucketHoplogs(getName() + "/0", ""); - assertEquals(13, hoplogs.length); - int hopSize = 0; - for (FileStatus file : hoplogs) { - if(file.getLen() > hopSize) { - hopSize = (int) file.getLen(); - } - } - - final AtomicInteger behavior = new AtomicInteger(0); - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) { - @Override - protected FileStatus[] getExpiryMarkers() throws IOException { - if (behavior.get() == 1) { - ArrayList<FileStatus> markers = new ArrayList<FileStatus>(); - for (FileStatus marker : super.getExpiryMarkers()) { - markers.add(marker); - } - // inject a dummy old expiry marker for major compacted file - long age = 2 * HDFSStore.DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS * 60 * 1000; - String markerName = "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION; - FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName)); - markers.add(marker); - return markers.toArray(new FileStatus[markers.size()]); - } - return super.getExpiryMarkers(); - } - }; - - List<FileStatus> list = organizer.getOptimizationTargets(target); - assertEquals(6, list.size()); - - behavior.set(1); - list = organizer.getOptimizationTargets(target); - assertEquals(8, list.size()); - - assertEquals(9 * hopSize, stats.getStoreUsageBytes()); - int count = organizer.deleteExpiredFiles(list); - assertEquals(8, count); - assertEquals(5 * hopSize, stats.getStoreUsageBytes()); - - List<FileStatus> tmp = new ArrayList<FileStatus>(Arrays.asList(hoplogs)); - for (Iterator<FileStatus> iter = tmp.iterator(); iter.hasNext();) { - hoplogs = getBucketHoplogs(getName() + "/0", ""); - FileStatus file = iter.next(); - for (FileStatus hoplog : hoplogs) { - if(hoplog.getPath().getName().startsWith("0-5-5555")) { - fail("this file should have been deleted" + hoplog.getPath().getName()); - } - - if (hoplog.getPath().getName().equals(file.getPath().getName())) { - iter.remove(); - break; - } - } - } - - assertEquals(7, tmp.size()); - organizer.close(); - } - - public void testAlterPurgeInterval() throws Exception { - // epxired hoplogs from the list below should be deleted - String[] files = { - "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, - "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, - "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, - "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION }; - - Path bucketPath = new Path(testDataDir, getName() + "/0"); - hdfsStore.getFileSystem(); - for (String file : files) { - Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file), - blockCache, stats, storeStats); - createHoplog(10, oplog); - } - - FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", ""); - int hopSize = 0; - for (FileStatus file : hoplogs) { - if(file.getLen() > hopSize) { - hopSize = (int) file.getLen(); - } - } - - final AtomicInteger behavior = new AtomicInteger(0); - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) { - @Override - protected FileStatus[] getExpiryMarkers() throws IOException { - if (behavior.get() == 1) { - ArrayList<FileStatus> markers = new ArrayList<FileStatus>(); - // inject dummy old expiry markers - long age = 120 * 1000; // 120 seconds old - String markerName = "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION; - FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName)); - markers.add(marker); - markerName = "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION; - marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName)); - markers.add(marker); - return markers.toArray(new FileStatus[markers.size()]); - } - return super.getExpiryMarkers(); - } - }; - - behavior.set(1); - int count = organizer.initiateCleanup(); - assertEquals(0, count); - - HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator(); - mutator.setPurgeInterval(1); - hdfsStore.alter(mutator); - count = organizer.initiateCleanup(); - assertEquals(4, count); - } - - public void testInUseExpiredHoplogCleanup() throws Exception { - Path bucketPath = new Path(testDataDir, getName() + "/0"); - FileSystem fs = hdfsStore.getFileSystem(); - - String[] files = new String[] { - "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, - "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, - "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION, - "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION, - "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION }; - - for (String file : files) { - Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file), - blockCache, stats, storeStats); - createHoplog(10, oplog); - } - - final HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer( - regionManager, 0); - List<TrackedReference<Hoplog>> hopRefs = organizer.getSortedOplogs(); - assertEquals(files.length, hopRefs.size()); - - // this is expiry marker for one of the files that will be compacted below. - // While compaction is going on file deletion should not happen - files = new String[] { "0-5-1235" - + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION - + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION }; - - for (String file : files) { - fs.createNewFile(new Path(bucketPath, file)); - } - FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", ""); - assertEquals(hopRefs.size() + files.length, hoplogs.length); - - TimeUnit.MILLISECONDS.sleep(200); - long target = System.currentTimeMillis(); - List<FileStatus> list = organizer.getOptimizationTargets(target); - assertEquals(2, list.size()); - - for (TrackedReference<Hoplog> ref : hopRefs) { - ref.increment("test"); - } - - fs.delete(new Path(bucketPath, files[0]), false); - - TimeUnit.MILLISECONDS.sleep(50); - organizer.markSortedOplogForDeletion(hopRefs, false); - - list = organizer.getOptimizationTargets(target); - assertEquals(0, list.size()); - organizer.close(); - } - - /** - * Tests max sequence initialization when file already exists and server starts - */ - public void testSeqInitialization() throws Exception { - // create many hoplogs files - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION, - HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION}; - for (String string : extensions) { - Hoplog oplog = organizer.getTmpSortedOplog(null, string); - createHoplog(1, oplog); - organizer.makeLegitimate(oplog); - } - - // a organizer should start creating files starting at 6 as five files already existed - organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION); - createHoplog(1, oplog); - organizer.makeLegitimate(oplog); - assertEquals(6, HdfsSortedOplogOrganizer.getSequenceNumber(oplog)); - organizer.close(); - } - - /** - * Tests temp file creation and making file legitimate - */ - public void testMakeLegitimate() throws Exception { - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - - // create empty tmp hoplog - Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - createHoplog(0, oplog); - - Path hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName()); - FileSystem fs = hdfsStore.getFileSystem(); - FileStatus hoplogStatus = fs.getFileStatus(hoplogPath); - assertNotNull(hoplogStatus); - - organizer.makeLegitimate(oplog); - - try { - hoplogStatus = fs.getFileStatus(hoplogPath); - assertNull(hoplogStatus); - } catch (FileNotFoundException e) { - // tmp file is renamed hence should not exist, exception expected - } - - assertTrue(oplog.getFileName().endsWith(HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION)); - hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName()); - hoplogStatus = fs.getFileStatus(hoplogPath); - assertNotNull(hoplogStatus); - } - - /** - * Tests hoplog file name comparator - */ - public void testHoplogFileComparator() throws IOException { - String name1 = "bucket1-10-3.hop"; - String name2 = "bucket1-1-20.hop"; - String name3 = "bucket1-30-201.hop"; - String name4 = "bucket1-100-201.hop"; - - TreeSet<TrackedReference<Hoplog>> list = new TreeSet<TrackedReference<Hoplog>>(new HoplogComparator()); - // insert soplog is the list out of expected order - hdfsStore.getFileSystem(); - list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name2), blockCache, stats, storeStats))); - list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name4), blockCache, stats, storeStats))); - list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name1), blockCache, stats, storeStats))); - list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name3), blockCache, stats, storeStats))); - - Iterator<TrackedReference<Hoplog>> iter = list.iterator(); - assertEquals(name4, iter.next().get().getFileName()); - assertEquals(name3, iter.next().get().getFileName()); - assertEquals(name2, iter.next().get().getFileName()); - assertEquals(name1, iter.next().get().getFileName()); - } - - /** - * Tests clear on a set of hoplogs. - */ - public void testClear() throws Exception { - int bucketId = (int) System.nanoTime(); - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - // flush and create hoplog - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("1"), ("1-1"))); - items.add(new TestEvent(("4"), ("1-4"))); - organizer.flush(items.iterator(), items.size()); - - items.clear(); - items.add(new TestEvent(("1"), ("2-1"))); - items.add(new TestEvent(("3"), ("2-3"))); - organizer.flush(items.iterator(), items.size()); - - items.clear(); - items.add(new TestEvent(("3"), ("3-3"))); - items.add(new TestEvent(("5"), ("3-5"))); - organizer.flush(items.iterator(), items.size()); - - // check file existence in bucket directory - FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - - // expect 3 files are 3 flushes - assertEquals(3, hoplogs.length); - - organizer.clear(); - - // check that all files are now expired - hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - FileStatus[] exs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION); - FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, exs); - assertEquals(Collections.EMPTY_LIST, Arrays.asList(valids)); - - assertEquals(0, stats.getActiveFileCount()); - assertEquals(0, stats.getInactiveFileCount()); - } - - public void testFixedIntervalMajorCompaction() throws Exception { - final AtomicInteger majorCReqCount = new AtomicInteger(0); - - final Compactor compactor = new AbstractCompactor() { - @Override - public boolean compact(boolean isMajor, boolean isForced) throws IOException { - majorCReqCount.incrementAndGet(); - return true; - } - }; - - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) { - @Override - public synchronized Compactor getCompactor() { - return compactor; - } - }; - - regionManager.addOrganizer(0, organizer); - - System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "1"); - HDFSRegionDirector.resetJanitor(); - - alterMajorCompaction(hdfsStore, true); - - // create hoplog in the past, 90 seconds before current time - organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000)); - TimeUnit.MILLISECONDS.sleep(50); - organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000)); - - List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs(); - assertEquals(2, hoplogs.size()); - - for (int i = 0; i < 3; i++) { - TimeUnit.SECONDS.sleep(1); - assertEquals(0, majorCReqCount.get()); - } - HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator(); - mutator.setMajorCompactionInterval(1); - hdfsStore.alter(mutator); - TimeUnit.SECONDS.sleep(5); - assertTrue(3 < majorCReqCount.get()); - } - - - public void testCorruptHfileBucketFail() throws Exception { - // create a corrupt file - FileSystem fs = hdfsStore.getFileSystem(); - for (int i = 0; i < 113; i++) { - FSDataOutputStream opStream = fs.create(new Path(testDataDir.getName() + "/region-1/" + i + "/1-1-1.hop")); - opStream.writeBytes("Some random corrupt file"); - opStream.close(); - } - - // create region with store -// regionfactory.setHDFSStoreName(HDFS_STORE_NAME); - Region<Object, Object> region1 = regionfactory.create("region-1"); - ExpectedException ex = DistributedTestCase.addExpectedException("CorruptHFileException"); - try { - region1.get("key"); - fail("get should have failed with corrupt file error"); - } catch (HDFSIOException e) { - // expected - } finally { - ex.remove(); - } - - region1.destroyRegion(); - } - - public void testMaxOpenReaders() throws Exception { - System.setProperty("hoplog.bucket.max.open.files", "5"); - HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - for (int i = 0; i < 10; i++) { - items.clear(); - items.add(new TestEvent("" + i, "" + i)); - organizer.flush(items.iterator(), items.size()); - } - - HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer; - List<TrackedReference<Hoplog>> hoplogs = bucket.getSortedOplogs(); - int closedCount = 0 ; - for (TrackedReference<Hoplog> hoplog : hoplogs) { - HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get(); - if (hfile.isClosed()) { - closedCount++; - } - } - assertEquals(10, closedCount); - assertEquals(10, stats.getActiveFileCount()); - assertEquals(0, stats.getActiveReaderCount()); - - byte[] keyBytes1 = BlobHelper.serializeToBlob("1"); - organizer.read(keyBytes1).getValue(); - - closedCount = 0 ; - for (TrackedReference<Hoplog> hoplog : hoplogs) { - HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get(); - if (hfile.isClosed()) { - closedCount++; - } - } - assertEquals(5, closedCount); - assertEquals(10, stats.getActiveFileCount()); - assertEquals(0, stats.getInactiveFileCount()); - assertEquals(5, stats.getActiveReaderCount()); - - organizer.getCompactor().compact(false, false); - assertEquals(1, stats.getActiveFileCount()); - assertEquals(0, stats.getActiveReaderCount()); - assertEquals(0, stats.getInactiveFileCount()); - } - - public void testConcurrentReadInactiveClose() throws Exception { - final HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0); - alterMinorCompaction(hdfsStore, true); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - for (int i = 0; i < 4; i++) { - items.clear(); - items.add(new TestEvent("" + i, "" + i)); - organizer.flush(items.iterator(), items.size()); - } - - final byte[] keyBytes1 = BlobHelper.serializeToBlob("1"); - class ReadTask implements Runnable { - public void run() { - try { - organizer.read(keyBytes1); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - ScheduledExecutorService[] readers = new ScheduledExecutorService[10]; - for (int i = 0; i < readers.length; i++) { - readers[i] = Executors.newSingleThreadScheduledExecutor(); - readers[i].scheduleWithFixedDelay(new ReadTask(), 0, 1, TimeUnit.MILLISECONDS); - } - - for (int i = 0; i < 100; i++) { - items.clear(); - items.add(new TestEvent("" + i, "" + i)); - organizer.flush(items.iterator(), items.size()); - } - - for (int i = 0; i < readers.length; i++) { - readers[i].shutdown(); - readers[i].awaitTermination(1, TimeUnit.SECONDS); - TimeUnit.MILLISECONDS.sleep(50); - } - - for (int i = 0; i < 20; i++) { - if (stats.getActiveFileCount() < 4) { - break; - } - organizer.getCompactor().compact(false, false); - } - - organizer.performMaintenance(); - TimeUnit.SECONDS.sleep(1); - - assertTrue("" + stats.getActiveFileCount(), stats.getActiveFileCount() <= 4); - assertEquals(stats.getActiveReaderCount(), stats.getActiveReaderCount()); - assertEquals(0, stats.getInactiveFileCount()); - } - - public void testEmptyBucketCleanup() throws Exception { - HdfsSortedOplogOrganizer o = new HdfsSortedOplogOrganizer(regionManager, 0); - long target = System.currentTimeMillis(); - o.getOptimizationTargets(target); - // making sure empty bucket is not causing IO errors. no assertion needed - // for this test case. - } - - public void testExpiredFilterAtStartup() throws Exception { - HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("1"), ("1-1"))); - items.add(new TestEvent(("4"), ("1-4"))); - bucket.flush(items.iterator(), items.size()); - - items.clear(); - items.add(new TestEvent(("1"), ("2-1"))); - items.add(new TestEvent(("3"), ("2-3"))); - bucket.flush(items.iterator(), items.size()); - - FileStatus[] files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - assertEquals(2, files.length); - - files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION); - assertEquals(0, files.length); - - HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0); - List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs(); - assertEquals(2, hoplogs.size()); - - bucket.clear(); - - files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - assertEquals(2, files.length); - - files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION); - assertEquals(2, files.length); - - bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0); - hoplogs = bucket2.getSortedOplogs(); - assertEquals(0, hoplogs.size()); - - items.clear(); - items.add(new TestEvent(("1"), ("2-1"))); - items.add(new TestEvent(("3"), ("2-3"))); - bucket.flush(items.iterator(), items.size()); - - bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0); - hoplogs = bucket2.getSortedOplogs(); - assertEquals(1, hoplogs.size()); - bucket.close(); - bucket2.close(); - } - - public void testExpireFilterRetartAfterClear() throws Exception { - HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("1"), ("1-1"))); - items.add(new TestEvent(("4"), ("1-4"))); - bucket.flush(items.iterator(), items.size()); - - items.clear(); - items.add(new TestEvent(("1"), ("2-1"))); - items.add(new TestEvent(("3"), ("2-3"))); - bucket.flush(items.iterator(), items.size()); - - FileStatus[] files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - assertEquals(2, files.length); - - files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION); - assertEquals(0, files.length); - - HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0); - List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs(); - assertEquals(2, hoplogs.size()); - - bucket.clear(); - - files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - assertEquals(2, files.length); - - files = getBucketHoplogs(getName() + "/" + 0, - HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION); - assertEquals(2, files.length); - - bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0); - hoplogs = bucket2.getSortedOplogs(); - assertEquals(0, hoplogs.size()); - bucket.close(); - bucket2.close(); - } - - /** - * tests maintenance does not fail even if there are no hoplogs - */ - public void testNoFileJanitor() throws Exception { - HoplogOrganizer<? extends PersistedEventImpl> organizer; - organizer = regionManager.create(0); - organizer.performMaintenance(); - } - - public void testValidHoplogRegex() { - String[] valid = {"1-1-1.hop", "1-1-1.ihop", "1-1-1.chop"}; - String[] invalid = {"1-1-1.khop", "1-1-1.hop.tmphop", "1-1-1.hop.ehop", "1-1-.hop", "-1-1.hop"}; - - for (String string : valid) { - Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string); - assertTrue(matcher.matches()); - } - - for (String string : invalid) { - Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string); - assertFalse(matcher.matches()); - } - } - - public void testOneHoplogMajorCompaction() throws Exception { - HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - alterMajorCompaction(hdfsStore, true); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("1"), ("1-1"))); - organizer.flush(items.iterator(),items.size()); - - - FileStatus[] files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION); - assertEquals(1, files.length); - - //Minor compaction will not perform on 1 .hop file - organizer.getCompactor().compact(false, false); - files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION); - assertEquals(0, files.length); - - //Major compaction will perform on 1 .hop file - organizer.getCompactor().compact(true, false); - files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION); - assertEquals(1, files.length); - String hoplogName =files[0].getPath().getName(); - files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION); - assertEquals(0, files.length); - - organizer.getCompactor().compact(true, false); - files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION); - assertEquals(1, files.length); - assertEquals(hoplogName, files[0].getPath().getName()); - - //Minor compaction does not convert major compacted file - organizer.getCompactor().compact(false, false); - files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION); - assertEquals(0, files.length); - - files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION); - assertEquals(1, files.length); - assertEquals(hoplogName, files[0].getPath().getName()); - - files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION); - assertEquals(1, files.length); - assertNotSame(hoplogName + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION, files[0].getPath().getName() ); - } - - public void testExposeCleanupInterval() throws Exception { - FileSystem fs = hdfsStore.getFileSystem(); - Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME); - assertTrue(fs.exists(cleanUpIntervalPath)); - long interval = HDFSStore.DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS - *60 * 1000; - assertEquals(interval, HoplogUtil.readCleanUpIntervalMillis(fs,cleanUpIntervalPath)); - } - - @Override - protected void setUp() throws Exception { - System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "" + HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT); - super.setUp(); - } -} -
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java deleted file mode 100644 index 7420437..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java +++ /dev/null @@ -1,540 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.TreeMap; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; - -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.test.junit.categories.HoplogTest; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest -; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; -import org.junit.experimental.categories.Category; - -@Category({IntegrationTest.class, HoplogTest.class}) -public class HfileSortedOplogJUnitTest extends BaseHoplogTestCase { - ArrayList<Object> toBeCleaned = new ArrayList<>(); - - /** - * Tests hoplog creation using a writer. If this test fails, all the tests wills fail as hoplog - * creation is the first step - */ - public void testHoplogWriter() throws Exception { - String hoplogName = getRandomHoplogName(); - createHoplog(hoplogName, 1); - FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName)); - assertNotNull(hoplogStatus); - } - - /** - * Tests hoplog deletion. - */ - public void testDeletion() throws Exception { - String hoplogName = getRandomHoplogName(); - createHoplog(hoplogName, 1); - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - - testHoplog.delete(); - - try { - FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName)); - // hoplog should not exists. fail if it does - assertNull("File deletion failed", hoplogStatus); - } catch (FileNotFoundException e) { - // exception expected after deletion - } - } - - /** - * Tests hoplog reader creation and key based gets - */ - public void testHoplogReader() throws Exception { - String hop1 = getRandomHoplogName(); - Map<String, String> map = createHoplog(hop1, 10); - - HFileSortedOplog testHoplog1 = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hop1), blockCache, stats, storeStats); - HoplogReader reader = testHoplog1.getReader(); - // verify that each entry put in the hoplog is returned by reader - for (Entry<String, String> entry : map.entrySet()) { - byte[] value = reader.read(entry.getKey().getBytes()); - assertNotNull(value); - } - } - - /** - * Tests full iteration on a hoplog. Ensures all inserted keys are returned and no key is missing - */ - public void testIterator() throws IOException { - int count = 10; - ByteArrayComparator bac = new ByteArrayComparator(); - - String hoplogName = getRandomHoplogName(); - TreeMap<String, String> sortedMap = createHoplog(hoplogName, count); - - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - HoplogReader reader = testHoplog.getReader(); - - Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator(); - HoplogIterator<byte[], byte[]> iter = reader.scan(); - for (; iter.hasNext();) { - byte[] key = iter.next(); - Entry<String, String> entry = mapIter.next(); - assertEquals(0, bac.compare(key, iter.getKey())); - assertEquals(0, bac.compare(key, entry.getKey().getBytes())); - assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes())); - count--; - } - assertEquals(0, count); - } - - /** - * Tests hoplog iterator. after returning first key, has next should return false and all - * subsequent next calls should return null - */ - public void testSingleKVIterator() throws Exception { - String hoplogName = getRandomHoplogName(); - TreeMap<String, String> map = createHoplog(hoplogName, 1); - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - HoplogReader reader = testHoplog.getReader(); - - HoplogIterator<byte[], byte[]> iter = reader.scan(); - assertNull(iter.getKey()); - assertNull(iter.getValue()); - assertTrue(iter.hasNext()); - assertNull(iter.getKey()); - assertNull(iter.getValue()); - - Entry<String, String> entry = map.firstEntry(); - iter.next(); - assertNotNull(iter.getKey()); - assertEquals(entry.getKey(), new String(iter.getKey())); - assertNotNull(iter.getValue()); - assertEquals(entry.getValue(), new String(iter.getValue())); - - assertFalse(iter.hasNext()); - try { - iter.next(); - fail(); - } catch (NoSuchElementException e) { - } - } - - /** - * Tests iteration on a hoplog with no keys, using a scanner. Scanner should not return any value - * and hasNext should return false everytime - */ - public void testEmptyFileIterator() throws Exception { - String hoplogName = getRandomHoplogName(); - createHoplog(hoplogName, 0); - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - HoplogReader reader = testHoplog.getReader(); - HoplogIterator<byte[], byte[]> iter = reader.scan(); - assertNull(iter.getKey()); - assertNull(iter.getValue()); - assertFalse(iter.hasNext()); - assertNull(iter.getKey()); - assertNull(iter.getValue()); - try { - iter.next(); - fail(); - } catch (NoSuchElementException e) { - } - } - - /** - * Tests from exclusive iterator - */ - public void testFromExclusiveIterator() throws Exception { - fromIterator(false); - } - - /** - * Tests from inclusive iterator - */ - public void testFromInclusiveIterator() throws Exception { - fromIterator(true); - } - - /** - * Tests from condition based iteration. creates hoplog with 10 KVs. Creates a scanner starting at - * a middle key and verifies the count of KVs iterated on - */ - public void fromIterator(boolean includeFrom) throws Exception { - int count = 10; - ByteArrayComparator bac = new ByteArrayComparator(); - - String hoplogName = getRandomHoplogName(); - // sorted map contains the keys inserted in the hoplog for testing - TreeMap<String, String> sortedMap = createHoplog(hoplogName, count); - - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - HoplogReader reader = testHoplog.getReader(); - - int middleKey = 4; - // remove top keys from the sorted map as the hoplog scanner should not - // return those - Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator(); - for (int i = 0; i < middleKey; i++) { - mapIter.next(); - count--; - } - if (!includeFrom) { - mapIter.next(); - count--; - } - - // keys are like Key-X, for X=0 till X=9. Start iterator at fifth key, - // key-4. if excluding from key, start at sixth key, key-5. - HoplogIterator<byte[], byte[]> iter = reader.scan(("key-" + middleKey).getBytes(), includeFrom, - null, true); - - for (; iter.hasNext();) { - byte[] key = iter.next(); - Entry<String, String> entry = mapIter.next(); - // make sure the KV returned by iterator match the inserted KV - assertEquals(0, bac.compare(key, iter.getKey())); - assertEquals(0, bac.compare(key, entry.getKey().getBytes())); - assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes())); - count--; - } - assertEquals(0, count); - } - - /** - * Tests to exclusive iterator - */ - public void testToExclusiveIterator() throws Exception { - toIterator(false); - } - - /** - * Tests to inclusive iterator - */ - public void testToInclusiveIterator() throws Exception { - toIterator(true); - } - - /** - * Tests to condition based iteration. creates hoplog with 10 KVs. Creates a scanner ending at - * a middle key and verifies the count of KVs iterated on - */ - public void toIterator(boolean includeTo) throws Exception { - int count = 10; - ByteArrayComparator bac = new ByteArrayComparator(); - - String hoplogName = getRandomHoplogName(); - // sorted map contains the keys inserted in the hoplog for testing - TreeMap<String, String> sortedMap = createHoplog(hoplogName, count); - Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator(); - - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - HoplogReader reader = testHoplog.getReader(); - - int middleKey = 4; - // keys are like Key-X, for X=0 till X=9. End iterator at fifth key, - // key-4. if excluding to key, end at fourth key, key-3. - HoplogIterator<byte[], byte[]> iter = reader.scan(null, true, ("key-" + middleKey).getBytes(), includeTo); - - for (; iter.hasNext();) { - byte[] key = iter.next(); - Entry<String, String> entry = mapIter.next(); - // make sure the KV returned by iterator match the inserted KV - assertEquals(0, bac.compare(key, iter.getKey())); - assertEquals(0, bac.compare(key, entry.getKey().getBytes())); - assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes())); - - count --; - } - - if (includeTo) { - count++; - } - - assertEquals(10, count + middleKey); - } - - /** - * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled - */ - public void testFromToIterator() throws IOException { - ByteArrayComparator bac = new ByteArrayComparator(); - String hoplogName = getRandomHoplogName(); - HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - - int count = 5; - HoplogWriter writer = hoplog.createWriter(5); - for (int i = 0; i < count; i++) { - String value = "value-" + (i * 2); - // even keys key-[0 2 4 6 8] - writer.append(("key-" + (i * 2)).getBytes(), value.getBytes()); - } - writer.close(); - - HoplogReader reader = hoplog.getReader(); - HoplogIterator<byte[], byte[]> iter = reader.scan("key-1".getBytes(), true, "key-7".getBytes(), true); - - for (int i = 2; i < 7; i += 2) { - assertTrue(iter.hasNext()); - iter.next(); - assertEquals(0, bac.compare(("key-" + i).getBytes(), iter.getKey())); - assertEquals(0, bac.compare(("value-" + i).getBytes(), iter.getValue())); - System.out.println(new String(iter.getKey())); - } - assertFalse(iter.hasNext()); - } - - /** - * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled - */ - public void testDuplicateKeys() throws IOException { - String hoplogName = getRandomHoplogName(); - HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - - // write duplicate keys - int count = 2; - HoplogWriter writer = hoplog.createWriter(2); - List<String> values = new ArrayList<String>(); - for(int i = 1; i <= count; i++) { - String value = "value" + i; - writer.append("key-1".getBytes(), value.getBytes()); - values.add(value); - } - writer.close(); - - HoplogReader reader = hoplog.getReader(); - HoplogIterator<byte[], byte[]> scanner = reader.scan(); - for (byte[] key = null; scanner.hasNext();) { - key = scanner.next(); - count--; - assertEquals(0, Bytes.compareTo(key, "key-1".getBytes())); - values.remove(new String(scanner.getValue())); - } - assertEquals(0, count); - assertEquals(0, values.size()); - } - - public void testOffsetBasedScan() throws Exception { - // Each record is 43 bytes. each block is 256 bytes. each block will have 6 - // records - - int blocksize = 1 << 8; - System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF, - String.valueOf(blocksize)); - - int count = 50; - String hoplogName = getRandomHoplogName(); - createHoplog(hoplogName, count); - - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path( - testDataDir, hoplogName), blockCache, stats, storeStats); - - HoplogReader reader = testHoplog.getReader(); - - HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 1, blocksize * 2); - int range1Count = 0; - String range1EndKey = null; - for (byte[] key = null; scanner.hasNext();) { - key = scanner.next(); - range1Count++; - range1EndKey = new String(key); - } - int range1EndKeyNum = Integer.valueOf(range1EndKey.substring("Key-".length())); - - scanner = reader.scan(blocksize * 2, blocksize * 1); - int range2Count = 0; - String range2EndKey = null; - for (byte[] key = null; scanner.hasNext();) { - key = scanner.next(); - range2Count++; - range2EndKey = new String(key); - } - - assertEquals(range2EndKey, range1EndKey); - assertEquals(2, range1Count/range2Count); - - scanner = reader.scan(blocksize * 3, blocksize * 1); - String range3FirstKey = new String(scanner.next()); - - int range3FirstKeyNum = Integer.valueOf(range3FirstKey.substring("Key-" - .length())); - - // range 3 starts at the end of range 1. so the two keys must be consecutive - assertEquals(range1EndKeyNum + 1, range3FirstKeyNum); - - testHoplog.close(); - } - - public void testOffsetScanBeyondFileSize() throws Exception { - // Each record is 43 bytes. each block is 256 bytes. each block will have 6 - // records - - int blocksize = 1 << 8; - System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF, - String.valueOf(blocksize)); - - int count = 20; - String hoplogName = getRandomHoplogName(); - createHoplog(hoplogName, count); - - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path( - testDataDir, hoplogName), blockCache, stats, storeStats); - - HoplogReader reader = testHoplog.getReader(); - - HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 5, blocksize * 2); - assertFalse(scanner.hasNext()); - - testHoplog.close(); - } - - public void testZeroValueOffsetScan() throws Exception { - // Each record is 43 bytes. each block is 256 bytes. each block will have 6 - // records - - int blocksize = 1 << 8; - System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF, - String.valueOf(blocksize)); - - int count = 20; - String hoplogName = getRandomHoplogName(); - createHoplog(hoplogName, count); - - HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path( - testDataDir, hoplogName), blockCache, stats, storeStats); - - HoplogReader reader = testHoplog.getReader(); - - HoplogIterator<byte[], byte[]> scanner = reader.scan(0, blocksize * 2); - assertTrue(scanner.hasNext()); - int keyNum = Integer.valueOf(new String(scanner.next()).substring("Key-" - .length())); - assertEquals(100000, keyNum); - - testHoplog.close(); - } - - /* - * Tests reader succeeds to read data even if FS client is recycled without - * this reader knowing - */ - public void testReaderDetectAndUseRecycledFs() throws Exception { - HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory(); - HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1"); - toBeCleaned.add(store); - - HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats); - toBeCleaned.add(hop); - TreeMap<String, String> map = createHoplog(10, hop); - - HoplogReader reader = hop.getReader(); - // verify that each entry put in the hoplog is returned by reader - for (Entry<String, String> entry : map.entrySet()) { - byte[] value = reader.read(entry.getKey().getBytes()); - assertNotNull(value); - } - - cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>"); - try { - store.getFileSystem().close(); - store.checkAndClearFileSystem(); - - for (Entry<String, String> entry : map.entrySet()) { - reader = hop.getReader(); - byte[] value = reader.read(entry.getKey().getBytes()); - assertNotNull(value); - } - } finally { - cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>"); - } - } - - public void testNewScannerDetechAndUseRecycledFs() throws Exception { - HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory(); - HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1"); - toBeCleaned.add(store); - - HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats); - createHoplog(10, hop); - - HoplogIterator<byte[], byte[]> scanner = hop.getReader().scan(); - // verify that each entry put in the hoplog is returned by reader - int i = 0; - while (scanner.hasNext()) { - byte[] key = scanner.next(); - assertNotNull(key); - i++; - } - assertEquals(10, i); - // flush block cache - hop.close(true); - hop.delete(); - - hop = new HFileSortedOplog(store, new Path(getName()+"-1-1.hop"), blockCache, stats, storeStats); - createHoplog(10, hop); - toBeCleaned.add(hop); - hop.getReader(); - - cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>"); - try { - store.getFileSystem().close(); - store.checkAndClearFileSystem(); - - scanner = hop.getReader().scan(); - // verify that each entry put in the hoplog is returned by reader - i = 0; - while (scanner.hasNext()) { - byte[] key = scanner.next(); - assertNotNull(key); - i++; - } - assertEquals(10, i); - } finally { - cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>"); - } - } - - @Override - protected void tearDown() throws Exception { - for (Object obj : toBeCleaned) { - try { - if (HDFSStoreImpl.class.isInstance(obj)) { - ((HDFSStoreImpl) obj).clearFolder(); - } else if (AbstractHoplog.class.isInstance(obj)) { - ((AbstractHoplog) obj).close(); - ((AbstractHoplog) obj).delete(); - } - } catch (Exception e) { - System.out.println(e); - } - } - super.tearDown(); - } - - private TreeMap<String, String> createHoplog(String hoplogName, int numKeys) throws IOException { - HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats); - TreeMap<String, String> map = createHoplog(numKeys, hoplog); - return map; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java deleted file mode 100644 index 13aa6a9..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java +++ /dev/null @@ -1,178 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent; -import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference; -import com.gemstone.gemfire.internal.util.BlobHelper; -import com.gemstone.gemfire.test.junit.categories.HoplogTest; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest -; - -@Category({IntegrationTest.class, HoplogTest.class}) -public class SortedOplogListIterJUnitTest extends BaseHoplogTestCase { - public void testOneIterOneKey() throws Exception { - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("0"), ("0"))); - organizer.flush(items.iterator(), items.size()); - - List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs(); - HoplogSetIterator iter = new HoplogSetIterator(oplogs); - assertTrue(iter.hasNext()); - int count = 0; - for (ByteBuffer keyBB = null; iter.hasNext();) { - keyBB = iter.next(); - byte[] key = HFileSortedOplog.byteBufferToArray(keyBB); - assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key)); - count++; - } - assertEquals(1, count); - organizer.close(); - } - - public void testOneIterDuplicateKey() throws Exception { - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("0"), ("V2"))); - items.add(new TestEvent(("0"), ("V1"))); - items.add(new TestEvent(("1"), ("V2"))); - items.add(new TestEvent(("1"), ("V1"))); - organizer.flush(items.iterator(), items.size()); - - List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs(); - HoplogSetIterator iter = new HoplogSetIterator(oplogs); - assertTrue(iter.hasNext()); - int count = 0; - for (ByteBuffer keyBB = null; iter.hasNext();) { - keyBB = iter.next(); - byte[] key = HFileSortedOplog.byteBufferToArray(keyBB); - byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue()); - assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key)); - assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue()); - count++; - } - assertEquals(2, count); - organizer.close(); - } - - public void testTwoIterSameKey() throws Exception { - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("0"), ("V1"))); - organizer.flush(items.iterator(), items.size()); - items.clear(); - items.add(new TestEvent(("0"), ("V2"))); - organizer.flush(items.iterator(), items.size()); - - List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs(); - HoplogSetIterator iter = new HoplogSetIterator(oplogs); - assertTrue(iter.hasNext()); - int count = 0; - for (ByteBuffer keyBB = null; iter.hasNext();) { - keyBB = iter.next(); - byte[] key = HFileSortedOplog.byteBufferToArray(keyBB); - byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue()); - assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key)); - assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue()); - count++; - } - assertEquals(1, count); - organizer.close(); - } - - public void testTwoIterDiffKey() throws Exception { - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("0"), ("V1"))); - organizer.flush(items.iterator(), items.size()); - items.clear(); - items.add(new TestEvent(("1"), ("V1"))); - organizer.flush(items.iterator(), items.size()); - - List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs(); - HoplogSetIterator iter = new HoplogSetIterator(oplogs); - assertTrue(iter.hasNext()); - int count = 0; - for (ByteBuffer keyBB = null; iter.hasNext();) { - keyBB = iter.next(); - byte[] key = HFileSortedOplog.byteBufferToArray(keyBB); - byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue()); - assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key)); - assertEquals("V1", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue()); - count++; - } - assertEquals(2, count); - organizer.close(); - } - - public void testMergedIterator() throws Exception { - HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0); - - // #1 - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent(("1"), ("1"))); - items.add(new TestEvent(("2"), ("1"))); - items.add(new TestEvent(("3"), ("1"))); - items.add(new TestEvent(("4"), ("1"))); - organizer.flush(items.iterator(), items.size()); - - // #2 - items.clear(); - items.add(new TestEvent(("2"), ("1"))); - items.add(new TestEvent(("4"), ("1"))); - items.add(new TestEvent(("6"), ("1"))); - items.add(new TestEvent(("8"), ("1"))); - organizer.flush(items.iterator(), items.size()); - - // #3 - items.clear(); - items.add(new TestEvent(("1"), ("1"))); - items.add(new TestEvent(("3"), ("1"))); - items.add(new TestEvent(("5"), ("1"))); - items.add(new TestEvent(("7"), ("1"))); - items.add(new TestEvent(("9"), ("1"))); - organizer.flush(items.iterator(), items.size()); - - // #4 - items.clear(); - items.add(new TestEvent(("0"), ("1"))); - items.add(new TestEvent(("1"), ("1"))); - items.add(new TestEvent(("4"), ("1"))); - items.add(new TestEvent(("5"), ("1"))); - organizer.flush(items.iterator(), items.size()); - - List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs(); - HoplogSetIterator iter = new HoplogSetIterator(oplogs); - // the iteration pattern for this test should be 0-9: - // 0 1 4 5 oplog #4 - // 1 3 5 7 9 oplog #3 - // 2 4 6 8 oplog #2 - // 1 2 3 4 oplog #1 - int count = 0; - for (ByteBuffer keyBB = null; iter.hasNext();) { - keyBB = iter.next(); - byte[] key = HFileSortedOplog.byteBufferToArray(keyBB); - assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key)); - count++; - } - assertEquals(10, count); - organizer.close(); - } -}
