GEODE-429: Remove HdfsStore Junit and Dunits
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/74c3156a Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/74c3156a Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/74c3156a Branch: refs/heads/feature/GEODE-409 Commit: 74c3156aaa0d29ccc4ec0b4c9a53659d2c9eb003 Parents: 1b4fd2f Author: Ashvin Agrawal <[email protected]> Authored: Mon Oct 19 14:58:00 2015 -0700 Committer: Ashvin Agrawal <[email protected]> Committed: Wed Oct 21 08:55:23 2015 -0700 ---------------------------------------------------------------------- .../ColocatedRegionWithHDFSDUnitTest.java | 189 ---- .../hdfs/internal/HDFSEntriesSetJUnitTest.java | 228 ---- .../internal/HdfsStoreMutatorJUnitTest.java | 191 ---- .../hdfs/internal/RegionWithHDFSTestBase.java | 715 ------------ .../internal/hoplog/BaseHoplogTestCase.java | 389 ------- .../hoplog/CardinalityEstimatorJUnitTest.java | 188 ---- .../hoplog/HDFSCacheLoaderJUnitTest.java | 106 -- .../hoplog/HDFSCompactionManagerJUnitTest.java | 449 -------- .../hoplog/HDFSRegionDirectorJUnitTest.java | 97 -- .../internal/hoplog/HDFSStatsJUnitTest.java | 250 ----- .../HDFSUnsortedHoplogOrganizerJUnitTest.java | 297 ----- .../HdfsSortedOplogOrganizerJUnitTest.java | 1045 ------------------ .../hoplog/HfileSortedOplogJUnitTest.java | 540 --------- .../hoplog/SortedOplogListIterJUnitTest.java | 178 --- .../hoplog/TieredCompactionJUnitTest.java | 904 --------------- .../hoplog/mapreduce/GFKeyJUnitTest.java | 50 - .../mapreduce/HDFSSplitIteratorJUnitTest.java | 265 ----- .../hoplog/mapreduce/HoplogUtilJUnitTest.java | 305 ----- 18 files changed, 6386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java deleted file mode 100644 index 44206dc..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.EvictionAction; -import com.gemstone.gemfire.cache.EvictionAttributes; -import com.gemstone.gemfire.cache.PartitionAttributesFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory; -import com.gemstone.gemfire.internal.cache.LocalRegion; - -import dunit.AsyncInvocation; -import dunit.SerializableCallable; -import dunit.VM; - -/** - * A class for testing the basic HDFS functionality - * - * @author Hemant Bhanawat - */ -@SuppressWarnings({"serial", "rawtypes", "unchecked", "deprecation"}) -public class ColocatedRegionWithHDFSDUnitTest extends RegionWithHDFSTestBase { - - public ColocatedRegionWithHDFSDUnitTest(String name) { - super(name); - } - - @Override - protected SerializableCallable getCreateRegionCallable( - final int totalnumOfBuckets, final int batchSizeMB, - final int maximumEntries, final String folderPath, - final String uniqueName, final int batchInterval, - final boolean queuePersistent, final boolean writeonly, - final long timeForRollover, final long maxFileSize) { - SerializableCallable createRegion = new SerializableCallable() { - public Object call() throws Exception { - HDFSStoreFactory hsf = getCache().createHDFSStoreFactory(); - hsf.setBatchSize(batchSizeMB); - hsf.setBufferPersistent(queuePersistent); - hsf.setMaxMemory(3); - hsf.setBatchInterval(batchInterval); - hsf.setHomeDir(tmpDir + "/" + folderPath); - homeDir = new File(tmpDir + "/" + folderPath).getCanonicalPath(); - hsf.setHomeDir(homeDir); - hsf.create(uniqueName); - - AttributesFactory af = new AttributesFactory(); - af.setDataPolicy(DataPolicy.PARTITION); - PartitionAttributesFactory paf = new PartitionAttributesFactory(); - paf.setTotalNumBuckets(totalnumOfBuckets); - paf.setRedundantCopies(1); - - af.setHDFSStoreName(uniqueName); - af.setPartitionAttributes(paf.create()); - af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes( - maximumEntries, EvictionAction.LOCAL_DESTROY)); - - af.setHDFSWriteOnly(writeonly); - Region r1 = createRootRegion(uniqueName + "-r1", af.create()); - - paf.setColocatedWith(uniqueName + "-r1"); - af.setPartitionAttributes(paf.create()); - af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes( - maximumEntries, EvictionAction.LOCAL_DESTROY)); - Region r2 = createRootRegion(uniqueName + "-r2", af.create()); - - ((LocalRegion) r1).setIsTest(); - ((LocalRegion) r2).setIsTest(); - - return 0; - } - }; - return createRegion; - } - - @Override - protected void doPuts(String uniqueName, int start, int end) { - Region r1 = getRootRegion(uniqueName + "-r1"); - Region r2 = getRootRegion(uniqueName + "-r2"); - - for (int i = start; i < end; i++) { - r1.put("K" + i, "V" + i); - r2.put("K" + i, "V" + i); - } - } - - protected AsyncInvocation doAsyncPuts(VM vm, final String regionName, - final int start, final int end, final String suffix) throws Exception { - return vm.invokeAsync(new SerializableCallable() { - public Object call() throws Exception { - Region r1 = getRootRegion(regionName + "-r1"); - Region r2 = getRootRegion(regionName + "-r2"); - - getCache().getLogger().info("Putting entries "); - for (int i = start; i < end; i++) { - r1.put("K" + i, "V" + i + suffix); - r2.put("K" + i, "V" + i + suffix); - } - return null; - } - - }); - } - - protected void doPutAll(final String uniqueName, Map map) { - Region r1 = getRootRegion(uniqueName + "-r1"); - Region r2 = getRootRegion(uniqueName + "-r2"); - r1.putAll(map); - r2.putAll(map); - } - - @Override - protected void doDestroys(String uniqueName, int start, int end) { - Region r1 = getRootRegion(uniqueName + "-r1"); - Region r2 = getRootRegion(uniqueName + "-r2"); - - for (int i = start; i < end; i++) { - r1.destroy("K" + i); - r2.destroy("K" + i); - } - } - - @Override - protected void checkWithGet(String uniqueName, int start, int end, - boolean expectValue) { - Region r1 = getRootRegion(uniqueName + "-r1"); - Region r2 = getRootRegion(uniqueName + "-r2"); - for (int i = start; i < end; i++) { - String expected = expectValue ? "V" + i : null; - assertEquals("Mismatch on key " + i, expected, r1.get("K" + i)); - assertEquals("Mismatch on key " + i, expected, r2.get("K" + i)); - } - } - - protected void checkWithGetAll(String uniqueName, ArrayList arrayl) { - Region r1 = getRootRegion(uniqueName + "-r1"); - Region r2 = getRootRegion(uniqueName + "-r2"); - Map map1 = r1.getAll(arrayl); - Map map2 = r2.getAll(arrayl); - for (Object e : map1.keySet()) { - String v = e.toString().replaceFirst("K", "V"); - assertTrue("Reading entries failed for key " + e + " where value = " - + map1.get(e), v.equals(map1.get(e))); - assertTrue("Reading entries failed for key " + e + " where value = " - + map2.get(e), v.equals(map2.get(e))); - } - } - - @Override - protected void verifyHDFSData(VM vm, String uniqueName) throws Exception { - HashMap<String, HashMap<String, String>> filesToEntriesMap = createFilesAndEntriesMap( - vm, uniqueName, uniqueName + "-r1"); - HashMap<String, String> entriesMap = new HashMap<String, String>(); - for (Map.Entry<String, HashMap<String, String>> e : filesToEntriesMap - .entrySet()) { - entriesMap.putAll(e.getValue()); - } - - verifyInEntriesMap(entriesMap, 1, 50, "vm0"); - verifyInEntriesMap(entriesMap, 40, 100, "vm1"); - verifyInEntriesMap(entriesMap, 40, 100, "vm2"); - verifyInEntriesMap(entriesMap, 90, 150, "vm3"); - - filesToEntriesMap = createFilesAndEntriesMap(vm, uniqueName, uniqueName - + "-r2"); - entriesMap = new HashMap<String, String>(); - for (Map.Entry<String, HashMap<String, String>> e : filesToEntriesMap - .entrySet()) { - entriesMap.putAll(e.getValue()); - } - - verifyInEntriesMap(entriesMap, 1, 50, "vm0"); - verifyInEntriesMap(entriesMap, 40, 100, "vm1"); - verifyInEntriesMap(entriesMap, 40, 100, "vm2"); - verifyInEntriesMap(entriesMap, 90, 150, "vm3"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java deleted file mode 100644 index 3085a66..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import junit.framework.TestCase; - -import org.apache.hadoop.fs.Path; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.Operation; -import com.gemstone.gemfire.cache.PartitionAttributesFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionFactory; -import com.gemstone.gemfire.cache.RegionShortcut; -import com.gemstone.gemfire.cache.asyncqueue.internal.ParallelAsyncEventQueueImpl; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory; -import com.gemstone.gemfire.cache.hdfs.internal.SortedListForAsyncQueueJUnitTest.KeyValue; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.internal.cache.BucketRegion; -import com.gemstone.gemfire.internal.cache.CachedDeserializable; -import com.gemstone.gemfire.internal.cache.EntryEventImpl; -import com.gemstone.gemfire.internal.cache.EnumListenerEvent; -import com.gemstone.gemfire.internal.cache.EventID; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes; -import com.gemstone.gemfire.test.junit.categories.HoplogTest; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest; - -@SuppressWarnings("rawtypes") -@Category({IntegrationTest.class, HoplogTest.class}) -public class HDFSEntriesSetJUnitTest extends TestCase { - private GemFireCacheImpl cache; - private HDFSStoreImpl store; - private PartitionedRegion region; - private BucketRegion bucket; - private HDFSParallelGatewaySenderQueue queue; - - private HDFSBucketRegionQueue brq; - private HoplogOrganizer hdfs; - - public void setUp() throws Exception { - System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true"); - cache = (GemFireCacheImpl) new CacheFactory() - .set("mcast-port", "0") - .set("log-level", "info") - .create(); - - HDFSStoreFactory hsf = this.cache.createHDFSStoreFactory(); - hsf.setHomeDir("hoplogs"); - store = (HDFSStoreImpl) hsf.create("test"); - - PartitionAttributesFactory paf = new PartitionAttributesFactory(); - paf.setTotalNumBuckets(1); - - RegionFactory rf = cache.createRegionFactory(RegionShortcut.PARTITION); -// rf.setHDFSStoreName("test"); - region = (PartitionedRegion) rf.setPartitionAttributes(paf.create()).create("test"); - - // prime the region so buckets get created - region.put("test", "test"); - GatewaySenderAttributes g = new GatewaySenderAttributes(); - g.isHDFSQueue = true; - g.id = "HDFSEntriesSetJUnitTest_Queue"; - ParallelAsyncEventQueueImpl gatewaySender = new ParallelAsyncEventQueueImpl(cache, g); - Set<Region> set = new HashSet<Region>(); - set.add(region); - - queue = new HDFSParallelGatewaySenderQueue(gatewaySender, set, 0, 1); - brq = (HDFSBucketRegionQueue)((PartitionedRegion) queue.getRegion()).getDataStore().getLocalBucketById(0); - bucket = region.getDataStore().getLocalBucketById(0); - - HdfsRegionManager mgr = HDFSRegionDirector.getInstance().manageRegion(region, "test", null); - hdfs = mgr.<SortedHoplogPersistedEvent>create(0); - AbstractHoplogOrganizer.JUNIT_TEST_RUN = true; - } - - public void tearDown() throws Exception { - store.getFileSystem().delete(new Path("hoplogs"), true); - hdfs.close(); - - cache.close(); - } - - public void testEmptyIterator() throws Exception { - checkIteration(Collections.<String>emptyList(), new KeyValue[] { }, new KeyValue[] { }); - } - - public void testQueueOnlyIterator() throws Exception { - KeyValue[] qvals = new KeyValue[] { - new KeyValue("K0", "0"), - new KeyValue("K1", "1"), - new KeyValue("K2", "2"), - new KeyValue("K3", "3"), - new KeyValue("K4", "4") - }; - checkIteration(getExpected(), qvals, new KeyValue[] { }); - } - - public void testHdfsOnlyIterator() throws Exception { - KeyValue[] hvals = new KeyValue[] { - new KeyValue("K0", "0"), - new KeyValue("K1", "1"), - new KeyValue("K2", "2"), - new KeyValue("K3", "3"), - new KeyValue("K4", "4") - }; - checkIteration(getExpected(), new KeyValue[] { }, hvals); - } - - public void testUnevenIterator() throws Exception { - KeyValue[] qvals = new KeyValue[] { - new KeyValue("K0", "0"), - new KeyValue("K2", "2"), - }; - - KeyValue[] hvals = new KeyValue[] { - new KeyValue("K1", "1"), - new KeyValue("K3", "3"), - new KeyValue("K4", "4") - }; - - checkIteration(getExpected(), qvals, hvals); - } - - public void testEitherOrIterator() throws Exception { - KeyValue[] qvals = new KeyValue[] { - new KeyValue("K0", "0"), - new KeyValue("K2", "2"), - new KeyValue("K4", "4") - }; - - KeyValue[] hvals = new KeyValue[] { - new KeyValue("K1", "1"), - new KeyValue("K3", "3") - }; - - checkIteration(getExpected(), qvals, hvals); - } - - public void testDuplicateIterator() throws Exception { - KeyValue[] qvals = new KeyValue[] { - new KeyValue("K0", "0"), - new KeyValue("K1", "1"), - new KeyValue("K2", "2"), - new KeyValue("K3", "3"), - new KeyValue("K4", "4"), - new KeyValue("K4", "4") - }; - - KeyValue[] hvals = new KeyValue[] { - new KeyValue("K0", "0"), - new KeyValue("K1", "1"), - new KeyValue("K2", "2"), - new KeyValue("K3", "3"), - new KeyValue("K4", "4"), - new KeyValue("K4", "4") - }; - - checkIteration(getExpected(), qvals, hvals); - } - - private List<String> getExpected() { - List<String> expected = new ArrayList<String>(); - expected.add("0"); - expected.add("1"); - expected.add("2"); - expected.add("3"); - expected.add("4"); - return expected; - } - - private void checkIteration(List<String> expected, KeyValue[] qvals, KeyValue[] hvals) - throws Exception { - int seq = 0; - List<PersistedEventImpl> evts = new ArrayList<PersistedEventImpl>(); - for (KeyValue kv : hvals) { - evts.add(new SortedHDFSQueuePersistedEvent(getNewEvent(kv.key, kv.value, seq++))); - } - hdfs.flush(evts.iterator(), evts.size()); - - for (KeyValue kv : qvals) { - queue.put(getNewEvent(kv.key, kv.value, seq++)); - } - - List<String> actual = new ArrayList<String>(); - Iterator vals = new HDFSEntriesSet(bucket, brq, hdfs, IteratorType.VALUES, null).iterator(); - while (vals.hasNext()) { - Object val = vals.next(); - if(val instanceof CachedDeserializable) { - val = ((CachedDeserializable) val).getDeserializedForReading(); - } - actual.add((String) val); - } - - assertEquals(expected, actual); - } - - private HDFSGatewayEventImpl getNewEvent(Object key, Object value, long seq) throws Exception { - EntryEventImpl evt = EntryEventImpl.create(region, Operation.CREATE, - key, value, null, false, (DistributedMember) cache.getMyId()); - - evt.setEventId(new EventID(cache.getDistributedSystem())); - HDFSGatewayEventImpl event = new HDFSGatewayEventImpl(EnumListenerEvent.AFTER_CREATE, evt, null, true, 0); - event.setShadowKey(seq); - - return event; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java deleted file mode 100644 index b8cbb0d..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase; -import com.gemstone.gemfire.test.junit.categories.HoplogTest; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest; - -@Category({IntegrationTest.class, HoplogTest.class}) -public class HdfsStoreMutatorJUnitTest extends BaseHoplogTestCase { - public void testMutatorInitialState() { - HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator(); - assertEquals(-1, mutator.getWriteOnlyFileRolloverInterval()); - assertEquals(-1, mutator.getWriteOnlyFileRolloverSize()); - - assertEquals(-1, mutator.getInputFileCountMax()); - assertEquals(-1, mutator.getInputFileSizeMax()); - assertEquals(-1, mutator.getInputFileCountMin()); - assertEquals(-1, mutator.getMinorCompactionThreads()); - assertNull(mutator.getMinorCompaction()); - - assertEquals(-1, mutator.getMajorCompactionInterval()); - assertEquals(-1, mutator.getMajorCompactionThreads()); - assertNull(mutator.getMajorCompaction()); - - assertEquals(-1, mutator.getPurgeInterval()); - - assertEquals(-1, mutator.getBatchSize()); - assertEquals(-1, mutator.getBatchInterval()); - } - - public void testMutatorSetInvalidValue() { - HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator(); - - try { - mutator.setWriteOnlyFileRolloverInterval(-3); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - mutator.setWriteOnlyFileRolloverSize(-5); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - - try { - mutator.setInputFileCountMin(-1); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - mutator.setInputFileCountMax(-1); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - mutator.setInputFileSizeMax(-1); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - mutator.setMinorCompactionThreads(-9); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - mutator.setMajorCompactionInterval(-6); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - mutator.setMajorCompactionThreads(-1); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - mutator.setPurgeInterval(-4); - fail(); - } catch (IllegalArgumentException e) { - // expected - } -/* try { - qMutator.setBatchSizeMB(-985); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - try { - qMutator.setBatchTimeInterval(-695); - fail(); - } catch (IllegalArgumentException e) { - // expected - } -*/ - try { - mutator.setInputFileCountMin(10); - mutator.setInputFileCountMax(5); - hdfsStore.alter(mutator); - fail(); - } catch (IllegalArgumentException e) { - // expected - } - } - - public void testMutatorReturnsUpdatedValues() { - HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator(); - - mutator.setWriteOnlyFileRolloverInterval(121); - mutator.setWriteOnlyFileRolloverSize(234); - - mutator.setInputFileCountMax(87); - mutator.setInputFileSizeMax(45); - mutator.setInputFileCountMin(34); - mutator.setMinorCompactionThreads(843); - mutator.setMinorCompaction(false); - - mutator.setMajorCompactionInterval(26); - mutator.setMajorCompactionThreads(92); - mutator.setMajorCompaction(false); - - mutator.setPurgeInterval(328); - - mutator.setBatchSize(985); - mutator.setBatchInterval(695); - - assertEquals(121, mutator.getWriteOnlyFileRolloverInterval()); - assertEquals(234, mutator.getWriteOnlyFileRolloverSize()); - - assertEquals(87, mutator.getInputFileCountMax()); - assertEquals(45, mutator.getInputFileSizeMax()); - assertEquals(34, mutator.getInputFileCountMin()); - assertEquals(843, mutator.getMinorCompactionThreads()); - assertFalse(mutator.getMinorCompaction()); - - assertEquals(26, mutator.getMajorCompactionInterval()); - assertEquals(92, mutator.getMajorCompactionThreads()); - assertFalse(mutator.getMajorCompaction()); - - assertEquals(328, mutator.getPurgeInterval()); - - assertEquals(985, mutator.getBatchSize()); - assertEquals(695, mutator.getBatchInterval()); - - // repeat the cycle once more - mutator.setWriteOnlyFileRolloverInterval(14); - mutator.setWriteOnlyFileRolloverSize(56); - - mutator.setInputFileCountMax(93); - mutator.setInputFileSizeMax(85); - mutator.setInputFileCountMin(64); - mutator.setMinorCompactionThreads(59); - mutator.setMinorCompaction(true); - - mutator.setMajorCompactionInterval(26); - mutator.setMajorCompactionThreads(92); - mutator.setMajorCompaction(false); - - mutator.setPurgeInterval(328); - - assertEquals(14, mutator.getWriteOnlyFileRolloverInterval()); - assertEquals(56, mutator.getWriteOnlyFileRolloverSize()); - - assertEquals(93, mutator.getInputFileCountMax()); - assertEquals(85, mutator.getInputFileSizeMax()); - assertEquals(64, mutator.getInputFileCountMin()); - assertEquals(59, mutator.getMinorCompactionThreads()); - assertTrue(mutator.getMinorCompaction()); - - assertEquals(26, mutator.getMajorCompactionInterval()); - assertEquals(92, mutator.getMajorCompactionThreads()); - assertFalse(mutator.getMajorCompaction()); - - assertEquals(328, mutator.getPurgeInterval()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java deleted file mode 100644 index 3330574..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java +++ /dev/null @@ -1,715 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog; -import com.gemstone.gemfire.cache30.CacheTestCase; -import com.gemstone.gemfire.internal.FileUtil; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics; -import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper; - -import dunit.AsyncInvocation; -import dunit.Host; -import dunit.SerializableCallable; -import dunit.SerializableRunnable; -import dunit.VM; - -@SuppressWarnings({"serial", "rawtypes", "unchecked"}) -public abstract class RegionWithHDFSTestBase extends CacheTestCase { - - protected String tmpDir; - - public static String homeDir = null; - - protected abstract void checkWithGetAll(String uniqueName, ArrayList arrayl); - - protected abstract void checkWithGet(String uniqueName, int start, - int end, boolean expectValue); - - protected abstract void doDestroys(final String uniqueName, int start, int end); - - protected abstract void doPutAll(final String uniqueName, Map map); - - protected abstract void doPuts(final String uniqueName, int start, int end); - - protected abstract SerializableCallable getCreateRegionCallable(final int totalnumOfBuckets, final int batchSizeMB, - final int maximumEntries, final String folderPath, final String uniqueName, final int batchInterval, final boolean queuePersistent, - final boolean writeonly, final long timeForRollover, final long maxFileSize); - - protected abstract void verifyHDFSData(VM vm, String uniqueName) throws Exception ; - - protected abstract AsyncInvocation doAsyncPuts(VM vm, final String regionName, - final int start, final int end, final String suffix) throws Exception; - - public RegionWithHDFSTestBase(String name) { - super(name); - } - - @Override - public void tearDown2() throws Exception { - super.tearDown2(); - for (int h = 0; h < Host.getHostCount(); h++) { - Host host = Host.getHost(h); - SerializableCallable cleanUp = cleanUpStoresAndDisconnect(); - for (int v = 0; v < host.getVMCount(); v++) { - VM vm = host.getVM(v); - // This store will be deleted by the first VM itself. Invocations from - // subsequent VMs will be no-op. - vm.invoke(cleanUp); - } - } - } - - public SerializableCallable cleanUpStoresAndDisconnect() throws Exception { - SerializableCallable cleanUp = new SerializableCallable("cleanUpStoresAndDisconnect") { - public Object call() throws Exception { - disconnectFromDS(); - File file; - if (homeDir != null) { - file = new File(homeDir); - FileUtil.delete(file); - homeDir = null; - } - file = new File(tmpDir); - FileUtil.delete(file); - return 0; - } - }; - return cleanUp; - } - - @Override - public void setUp() throws Exception { - super.setUp(); - tmpDir = /*System.getProperty("java.io.tmpdir") + "/" +*/ "RegionWithHDFSBasicDUnitTest_" + System.nanoTime(); - } - - int createServerRegion(VM vm, final int totalnumOfBuckets, - final int batchSize, final int maximumEntries, final String folderPath, - final String uniqueName, final int batchInterval) { - return createServerRegion(vm, totalnumOfBuckets, - batchSize, maximumEntries, folderPath, - uniqueName, batchInterval, false, false); - } - - protected int createServerRegion(VM vm, final int totalnumOfBuckets, - final int batchSizeMB, final int maximumEntries, final String folderPath, - final String uniqueName, final int batchInterval, final boolean writeonly, - final boolean queuePersistent) { - return createServerRegion(vm, totalnumOfBuckets, - batchSizeMB, maximumEntries, folderPath, - uniqueName, batchInterval, writeonly, queuePersistent, -1, -1); - } - protected int createServerRegion(VM vm, final int totalnumOfBuckets, - final int batchSizeMB, final int maximumEntries, final String folderPath, - final String uniqueName, final int batchInterval, final boolean writeonly, - final boolean queuePersistent, final long timeForRollover, final long maxFileSize) { - SerializableCallable createRegion = getCreateRegionCallable( - totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName, - batchInterval, queuePersistent, writeonly, timeForRollover, maxFileSize); - - return (Integer) vm.invoke(createRegion); - } - protected AsyncInvocation createServerRegionAsync(VM vm, final int totalnumOfBuckets, - final int batchSizeMB, final int maximumEntries, final String folderPath, - final String uniqueName, final int batchInterval, final boolean writeonly, - final boolean queuePersistent) { - SerializableCallable createRegion = getCreateRegionCallable( - totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName, - batchInterval, queuePersistent, writeonly, -1, -1); - - return vm.invokeAsync(createRegion); - } - protected AsyncInvocation createServerRegionAsync(VM vm, final int totalnumOfBuckets, - final int batchSizeMB, final int maximumEntries, final String folderPath, - final String uniqueName, final int batchInterval, final boolean writeonly, - final boolean queuePersistent, final long timeForRollover, final long maxFileSize) { - SerializableCallable createRegion = getCreateRegionCallable( - totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName, - batchInterval, queuePersistent, writeonly, timeForRollover, maxFileSize); - - return vm.invokeAsync(createRegion); - } - - /** - * Does puts, gets, destroy and getAll. Since there are many updates - * most of the time the data is not found in memory and queue and - * is fetched from HDFS - * @throws Throwable - */ - public void testGetFromHDFS() throws Throwable { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - final String uniqueName = getName(); - final String homeDir = "../../testGetFromHDFS"; - - createServerRegion(vm0, 7, 1, 50, homeDir, uniqueName, 50, false, true); - createServerRegion(vm1, 7, 1, 50, homeDir, uniqueName, 50, false, true); - - // Do some puts - vm0.invoke(new SerializableCallable() { - public Object call() throws Exception { - doPuts(uniqueName, 0, 40); - return null; - } - }); - - // Do some puts and destroys - // some order manipulation has been done because of an issue: - // " a higher version update on a key can be batched and - // sent to HDFS before a lower version update on the same key - // is batched and sent to HDFS. This will cause the latest - // update on a key in an older file. Hence, a fetch from HDFS - // will return an older update from a newer file." - - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - doPuts(uniqueName, 40, 50); - doDestroys(uniqueName, 40, 50); - doPuts(uniqueName, 50, 100); - doPuts(uniqueName, 30, 40); - return null; - } - }); - - // do some more puts and destroy - // some order manipulation has been done because of an issue: - // " a higher version update on a key can be batched and - // sent to HDFS before a lower version update on the same key - // is batched and sent to HDFS. This will cause the latest - // update on a key in an older file. Hence, a fetch from HDFS - // will return an older update from a newer file." - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - doPuts(uniqueName, 80, 90); - doDestroys(uniqueName, 80, 90); - doPuts(uniqueName, 110, 200); - doPuts(uniqueName, 90, 110); - return null; - } - - }); - - // get and getall the values and compare them. - SerializableCallable checkData = new SerializableCallable() { - public Object call() throws Exception { - checkWithGet(uniqueName, 0, 40, true); - checkWithGet(uniqueName, 40, 50, false); - checkWithGet(uniqueName, 50, 80, true); - checkWithGet(uniqueName, 80, 90, false); - checkWithGet(uniqueName, 90, 200, true); - checkWithGet(uniqueName, 200, 201, false); - - ArrayList arrayl = new ArrayList(); - for (int i =0; i< 200; i++) { - String k = "K" + i; - if ( !((40 <= i && i < 50) || (80 <= i && i < 90))) - arrayl.add(k); - } - checkWithGetAll(uniqueName, arrayl); - - return null; - } - }; - vm1.invoke(checkData); - - //Restart the members and verify that we can still get the data - closeCache(vm0); - closeCache(vm1); - AsyncInvocation async0 = createServerRegionAsync(vm0, 7, 1, 50, homeDir, uniqueName, 50, false, true); - AsyncInvocation async1 = createServerRegionAsync(vm1, 7, 1, 50, homeDir, uniqueName, 50, false, true); - - async0.getResult(); - async1.getResult(); - - - // get and getall the values and compare them. - vm1.invoke(checkData); - - //TODO:HDFS we are just reading the files here. Need to verify - // once the folder structure is finalized. - dumpFiles(vm1, uniqueName); - - } - - /** - * puts a few entries (keys with multiple updates ). Gets them immediately. - * High probability that it gets it from async queue. - */ - public void testGetForAsyncQueue() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - final String uniqueName = getName(); - final String homeDir = "../../testGetForAsyncQueue"; - - createServerRegion(vm0, 2, 5, 1, homeDir, uniqueName, 10000); - createServerRegion(vm1, 2, 5, 1, homeDir, uniqueName, 10000); - - vm0.invoke(new SerializableCallable() { - public Object call() throws Exception { - doPuts(uniqueName, 0, 4); - return null; - } - }); - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - doPuts(uniqueName, 0, 2); - doDestroys(uniqueName, 2, 3); - doPuts(uniqueName, 3, 7); - - checkWithGet(uniqueName, 0, 2, true); - checkWithGet(uniqueName, 2, 3, false); - checkWithGet(uniqueName, 3, 7, true); - return null; - } - }); - } - - /** - * puts a few entries (keys with multiple updates ). Calls getAll immediately. - * High probability that it gets it from async queue. - */ - public void testGetAllForAsyncQueue() { - - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - final String uniqueName = getName(); - createServerRegion(vm0, 2, 5, 2, uniqueName, uniqueName, 10000); - createServerRegion(vm1, 2, 5, 2, uniqueName, uniqueName, 10000); - - vm0.invoke(new SerializableCallable() { - public Object call() throws Exception { - doPuts(uniqueName, 0, 4); - return null; - } - }); - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - doPuts(uniqueName, 1, 5); - - ArrayList arrayl = new ArrayList(); - for (int i =0; i< 5; i++) { - String k = "K" + i; - arrayl.add(k); - } - checkWithGetAll(uniqueName, arrayl); - return null; - } - }); - } - - /** - * puts a few entries (keys with multiple updates ). Calls getAll immediately. - * High probability that it gets it from async queue. - */ - public void testPutAllForAsyncQueue() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - final String uniqueName = getName(); - final String homeDir = "../../testPutAllForAsyncQueue"; - createServerRegion(vm0, 2, 5, 2, homeDir, uniqueName, 10000); - createServerRegion(vm1, 2, 5, 2, homeDir, uniqueName, 10000); - - vm0.invoke(new SerializableCallable() { - public Object call() throws Exception { - HashMap putAllmap = new HashMap(); - for (int i =0; i< 4; i++) - putAllmap.put("K" + i, "V"+ i ); - doPutAll(uniqueName, putAllmap); - return null; - } - }); - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - HashMap putAllmap = new HashMap(); - for (int i =1; i< 5; i++) - putAllmap.put("K" + i, "V"+ i ); - doPutAll(uniqueName, putAllmap); - checkWithGet(uniqueName, 0, 5, true); - return null; - } - }); - } - - /** - * Does putAll and get. Since there are many updates - * most of the time the data is not found in memory and queue and - * is fetched from HDFS - */ - public void _testPutAllAndGetFromHDFS() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - final String uniqueName = getName(); - final String homeDir = "../../testPutAllAndGetFromHDFS"; - createServerRegion(vm0, 7, 1, 500, homeDir, uniqueName, 500); - createServerRegion(vm1, 7, 1, 500, homeDir, uniqueName, 500); - - // Do some puts - vm0.invoke(new SerializableCallable() { - public Object call() throws Exception { - - HashMap putAllmap = new HashMap(); - - for (int i =0; i< 500; i++) - putAllmap.put("K" + i, "V"+ i ); - doPutAll(uniqueName, putAllmap); - return null; - } - }); - - // Do putAll and some destroys - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - HashMap putAllmap = new HashMap(); - for (int i = 500; i< 1000; i++) - putAllmap.put("K" + i, "V"+ i ); - doPutAll(uniqueName, putAllmap); - return null; - } - }); - - // do some more puts - // some order manipulation has been done because of an issue: - // " a higher version update on a key can be batched and - // sent to HDFS before a lower version update on the same key - // is batched and sent to HDFS. This will cause the latest - // update on a key in an older file. Hence, a fetch from HDFS - // will return an older update from a newer file." - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - HashMap putAllmap = new HashMap(); - for (int i =1100; i< 2000; i++) - putAllmap.put("K" + i, "V"+ i ); - doPutAll(uniqueName, putAllmap); - putAllmap = new HashMap(); - for (int i = 900; i< 1100; i++) - putAllmap.put("K" + i, "V"+ i ); - doPutAll(uniqueName, putAllmap); - return null; - } - - }); - - // get and getall the values and compare them. - vm1.invoke(new SerializableCallable() { - public Object call() throws Exception { - checkWithGet(uniqueName, 0, 2000, true); - checkWithGet(uniqueName, 2000, 2001, false); - - ArrayList arrayl = new ArrayList(); - for (int i =0; i< 2000; i++) { - String k = "K" + i; - arrayl.add(k); - } - checkWithGetAll(uniqueName, arrayl); - return null; - } - }); - - } - - public void _testWObasicClose() throws Throwable{ - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - VM vm2 = host.getVM(2); - VM vm3 = host.getVM(3); - - String homeDir = "../../testWObasicClose"; - final String uniqueName = getName(); - - createServerRegion(vm0, 11, 1, 500, homeDir, uniqueName, 500, true, false); - createServerRegion(vm1, 11, 1, 500, homeDir, uniqueName, 500, true, false); - createServerRegion(vm2, 11, 1, 500, homeDir, uniqueName, 500, true, false); - createServerRegion(vm3, 11, 1, 500, homeDir, uniqueName, 500, true, false); - - AsyncInvocation a1 = doAsyncPuts(vm0, uniqueName, 1, 50, "vm0"); - AsyncInvocation a2 = doAsyncPuts(vm1, uniqueName, 40, 100, "vm1"); - AsyncInvocation a3 = doAsyncPuts(vm2, uniqueName, 40, 100, "vm2"); - AsyncInvocation a4 = doAsyncPuts(vm3, uniqueName, 90, 150, "vm3"); - - a1.join(); - a2.join(); - a3.join(); - a4.join(); - - Thread.sleep(5000); - cacheClose (vm0, false); - cacheClose (vm1, false); - cacheClose (vm2, false); - cacheClose (vm3, false); - - AsyncInvocation async1 = createServerRegionAsync(vm0, 11, 1, 500, homeDir, uniqueName, 500, true, false); - AsyncInvocation async2 = createServerRegionAsync(vm1, 11, 1, 500, homeDir, uniqueName, 500, true, false); - AsyncInvocation async3 = createServerRegionAsync(vm2, 11, 1, 500, homeDir, uniqueName, 500, true, false); - AsyncInvocation async4 = createServerRegionAsync(vm3, 11, 1, 500, homeDir, uniqueName, 500, true, false); - async1.getResult(); - async2.getResult(); - async3.getResult(); - async4.getResult(); - - verifyHDFSData(vm0, uniqueName); - - cacheClose (vm0, false); - cacheClose (vm1, false); - cacheClose (vm2, false); - cacheClose (vm3, false); - } - - - protected void cacheClose(VM vm, final boolean sleep){ - vm.invoke( new SerializableCallable() { - public Object call() throws Exception { - if (sleep) - Thread.sleep(2000); - getCache().getLogger().info("Cache close in progress "); - getCache().close(); - getCache().getLogger().info("Cache closed"); - return null; - } - }); - - } - - protected void verifyInEntriesMap (HashMap<String, String> entriesMap, int start, int end, String suffix) { - for (int i =start; i< end; i++) { - String k = "K" + i; - String v = "V"+ i + suffix; - Object s = entriesMap.get(v); - assertTrue( "The expected key " + k+ " didn't match the received value " + s + ". value: " + v, k.equals(s)); - } - } - - /** - * Reads all the sequence files and returns the list of key value pairs persisted. - * Returns the key value pair as <value, key> tuple as there can be multiple values - * for a key - * @throws Exception - */ - protected HashMap<String, HashMap<String, String>> createFilesAndEntriesMap(VM vm0, final String uniqueName, final String regionName) throws Exception { - HashMap<String, HashMap<String, String>> entriesToFileMap = (HashMap<String, HashMap<String, String>>) - vm0.invoke( new SerializableCallable() { - public Object call() throws Exception { - HashMap<String, HashMap<String, String>> entriesToFileMap = new HashMap<String, HashMap<String, String>>(); - HDFSStoreImpl hdfsStore = (HDFSStoreImpl) ((GemFireCacheImpl)getCache()).findHDFSStore(uniqueName); - FileSystem fs = hdfsStore.getFileSystem(); - System.err.println("dumping file names in HDFS directory: " + hdfsStore.getHomeDir()); - try { - Path basePath = new Path(hdfsStore.getHomeDir()); - Path regionPath = new Path(basePath, regionName); - RemoteIterator<LocatedFileStatus> files = fs.listFiles(regionPath, true); - - while(files.hasNext()) { - HashMap<String, String> entriesMap = new HashMap<String, String>(); - LocatedFileStatus next = files.next(); - /* MergeGemXDHDFSToGFE - Disabled as I am not pulling in DunitEnv */ - // System.err.println(DUnitEnv.get().getPid() + " - " + next.getPath()); - System.err.println(" - " + next.getPath()); - readSequenceFile(fs, next.getPath(), entriesMap); - entriesToFileMap.put(next.getPath().getName(), entriesMap); - } - } catch (FileNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - return entriesToFileMap; - } - @SuppressWarnings("deprecation") - public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, - HashMap<String, String> entriesMap) throws IOException { - SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null); - HoplogIterator<byte[], byte[]> iter = hoplog.getReader().scan(); - try { - while (iter.hasNext()) { - iter.next(); - PersistedEventImpl te = UnsortedHoplogPersistedEvent.fromBytes(iter.getValue()); - String stringkey = ((String)CacheServerHelper.deserialize(iter.getKey())); - String value = (String) te.getDeserializedValue(); - entriesMap.put(value, stringkey); - if (getCache().getLoggerI18n().fineEnabled()) - getCache().getLoggerI18n().fine("Key: " + stringkey + " value: " + value + " path " + sequenceFileName.getName()); - } - } catch (Exception e) { - assertTrue(e.toString(), false); - } - iter.close(); - hoplog.close(); - } - }); - return entriesToFileMap; - } - protected SerializableCallable validateEmpty(VM vm0, final int numEntries, final String uniqueName) { - SerializableCallable validateEmpty = new SerializableCallable("validateEmpty") { - public Object call() throws Exception { - Region r = getRootRegion(uniqueName); - - assertTrue(r.isEmpty()); - - //validate region is empty on peer as well - assertFalse(r.entrySet().iterator().hasNext()); - //Make sure the region is empty - for (int i =0; i< numEntries; i++) { - assertEquals("failure on key K" + i , null, r.get("K" + i)); - } - - return null; - } - }; - - vm0.invoke(validateEmpty); - return validateEmpty; - } - - protected void closeCache(VM vm0) { - //Restart and validate still empty. - SerializableRunnable closeCache = new SerializableRunnable("close cache") { - @Override - public void run() { - getCache().close(); - disconnectFromDS(); - } - }; - - vm0.invoke(closeCache); - } - - protected void verifyDataInHDFS(VM vm0, final String uniqueName, final boolean shouldHaveData, - final boolean wait, final boolean waitForQueueToDrain, final int numEntries) { - vm0.invoke(new SerializableCallable("check for data in hdfs") { - @Override - public Object call() throws Exception { - - HDFSRegionDirector director = HDFSRegionDirector.getInstance(); - final SortedOplogStatistics stats = director.getHdfsRegionStats("/" + uniqueName); - waitForCriterion(new WaitCriterion() { - @Override - public boolean done() { - return stats.getActiveFileCount() > 0 == shouldHaveData; - } - - @Override - public String description() { - return "Waiting for active file count to be greater than 0: " + stats.getActiveFileCount() + " stats=" + System.identityHashCode(stats); - } - }, 30000, 100, true); - - if(waitForQueueToDrain) { - PartitionedRegion region = (PartitionedRegion) getCache().getRegion(uniqueName); - final AsyncEventQueueStats queueStats = region.getHDFSEventQueueStats(); - waitForCriterion(new WaitCriterion() { - @Override - public boolean done() { - return queueStats.getEventQueueSize() <= 0; - } - - @Override - public String description() { - return "Waiting for queue stats to reach 0: " + queueStats.getEventQueueSize(); - } - }, 30000, 100, true); - } - return null; - } - }); - } - - protected void doPuts(VM vm0, final String uniqueName, final int numEntries) { - // Do some puts - vm0.invoke(new SerializableCallable("do puts") { - public Object call() throws Exception { - Region r = getRootRegion(uniqueName); - for (int i =0; i< numEntries; i++) - r.put("K" + i, "V"+ i ); - return null; - } - }); - } - - protected void validate(VM vm1, final String uniqueName, final int numEntries) { - SerializableCallable validate = new SerializableCallable("validate") { - public Object call() throws Exception { - Region r = getRootRegion(uniqueName); - - for (int i =0; i< numEntries; i++) { - assertEquals("failure on key K" + i , "V"+ i, r.get("K" + i)); - } - - return null; - } - }; - vm1.invoke(validate); - } - - protected void dumpFiles(VM vm0, final String uniqueName) { - vm0.invoke(new SerializableRunnable() { - - @Override - public void run() { - HDFSStoreImpl hdfsStore = (HDFSStoreImpl) ((GemFireCacheImpl)getCache()).findHDFSStore(uniqueName); - FileSystem fs; - try { - fs = hdfsStore.getFileSystem(); - } catch (IOException e1) { - throw new HDFSIOException(e1.getMessage(), e1); - } - System.err.println("dumping file names in HDFS directory: " + hdfsStore.getHomeDir()); - try { - RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(hdfsStore.getHomeDir()), true); - - while(files.hasNext()) { - LocatedFileStatus next = files.next(); - /* MergeGemXDHDFSToGFE - Disabled as I am not pulling in DunitEnv */ - // System.err.println(DUnitEnv.get().getPid() + " - " + next.getPath()); - System.err.println(" - " + next.getPath()); - } - } catch (FileNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - }); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java deleted file mode 100644 index 07d9f77..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java +++ /dev/null @@ -1,389 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.net.URI; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; - -import junit.framework.TestCase; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.Operation; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionFactory; -import com.gemstone.gemfire.cache.RegionShortcut; -import com.gemstone.gemfire.cache.SerializedCacheValue; -import com.gemstone.gemfire.cache.TransactionId; -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHDFSQueuePersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl.FileSystemFactory; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics; -import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag; -import com.gemstone.gemfire.internal.util.BlobHelper; -import org.apache.hadoop.hbase.io.hfile.BlockCache; - -import dunit.DistributedTestCase; -import dunit.DistributedTestCase.ExpectedException; - -public abstract class BaseHoplogTestCase extends TestCase { - public static final String HDFS_STORE_NAME = "hdfs"; - public static final Random rand = new Random(System.currentTimeMillis()); - protected Path testDataDir; - protected Cache cache; - - protected HDFSRegionDirector director; - protected HdfsRegionManager regionManager; - protected HDFSStoreFactory hsf; - protected HDFSStoreImpl hdfsStore; - protected RegionFactory<Object, Object> regionfactory; - protected Region<Object, Object> region; - protected SortedOplogStatistics stats; - protected HFileStoreStatistics storeStats; - protected BlockCache blockCache; - - Set<ExpectedException> exceptions = new HashSet<ExpectedException>(); - @Override - protected void setUp() throws Exception { - super.setUp(); - System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true"); - - //This is logged by HDFS when it is stopped. - exceptions.add(DistributedTestCase.addExpectedException("sleep interrupted")); - exceptions.add(DistributedTestCase.addExpectedException("java.io.InterruptedIOException")); - - testDataDir = new Path("test-case"); - - cache = createCache(); - - configureHdfsStoreFactory(); - hdfsStore = (HDFSStoreImpl) hsf.create(HDFS_STORE_NAME); - - regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION); -// regionfactory.setHDFSStoreName(HDFS_STORE_NAME); - region = regionfactory.create(getName()); - - // disable compaction by default and clear existing queues - HDFSCompactionManager compactionManager = HDFSCompactionManager.getInstance(hdfsStore); - compactionManager.reset(); - - director = HDFSRegionDirector.getInstance(); - director.setCache(cache); - regionManager = ((LocalRegion)region).getHdfsRegionManager(); - stats = director.getHdfsRegionStats("/" + getName()); - storeStats = hdfsStore.getStats(); - blockCache = hdfsStore.getBlockCache(); - AbstractHoplogOrganizer.JUNIT_TEST_RUN = true; - } - - protected void configureHdfsStoreFactory() throws Exception { - hsf = this.cache.createHDFSStoreFactory(); - hsf.setHomeDir(testDataDir.toString()); - hsf.setMinorCompaction(false); - hsf.setMajorCompaction(false); - } - - protected Cache createCache() { - CacheFactory cf = new CacheFactory().set("mcast-port", "0") - .set("log-level", "info") - ; - cache = cf.create(); - return cache; - } - - @Override - protected void tearDown() throws Exception { - if (region != null) { - region.destroyRegion(); - } - - if (hdfsStore != null) { - hdfsStore.getFileSystem().delete(testDataDir, true); - hdfsStore.destroy(); - } - - if (cache != null) { - cache.close(); - } - super.tearDown(); - for (ExpectedException ex: exceptions) { - ex.remove(); - } - } - - /** - * creates a hoplog file with numKeys records. Keys follow key-X pattern and values follow value-X - * pattern where X=0 to X is = numKeys -1 - * - * @return the sorted map of inserted KVs - */ - protected TreeMap<String, String> createHoplog(int numKeys, Hoplog oplog) throws IOException { - int offset = (numKeys > 10 ? 100000 : 0); - - HoplogWriter writer = oplog.createWriter(numKeys); - TreeMap<String, String> map = new TreeMap<String, String>(); - for (int i = offset; i < (numKeys + offset); i++) { - String key = ("key-" + i); - String value = ("value-" + System.nanoTime()); - writer.append(key.getBytes(), value.getBytes()); - map.put(key, value); - } - writer.close(); - return map; - } - - protected FileStatus[] getBucketHoplogs(String regionAndBucket, final String type) - throws IOException { - return getBucketHoplogs(hdfsStore.getFileSystem(), regionAndBucket, type); - } - - protected FileStatus[] getBucketHoplogs(FileSystem fs, String regionAndBucket, final String type) - throws IOException { - FileStatus[] hoplogs = fs.listStatus( - new Path(testDataDir, regionAndBucket), new PathFilter() { - @Override - public boolean accept(Path file) { - return file.getName().endsWith(type); - } - }); - return hoplogs; - } - - protected String getRandomHoplogName() { - String hoplogName = "hoplog-" + System.nanoTime() + "-" + rand.nextInt(10000) + ".hop"; - return hoplogName; - } - -// public static MiniDFSCluster initMiniCluster(int port, int numDN) throws Exception { -// HashMap<String, String> map = new HashMap<String, String>(); -// map.put(DFSConfigKeys.DFS_REPLICATION_KEY, "1"); -// return initMiniCluster(port, numDN, map); -// } -// -// public static MiniDFSCluster initMiniCluster(int port, int numDN, HashMap<String, String> map) throws Exception { -// System.setProperty("test.build.data", "hdfs-test-cluster"); -// Configuration hconf = new HdfsConfiguration(); -// for (Entry<String, String> entry : map.entrySet()) { -// hconf.set(entry.getKey(), entry.getValue()); -// } -// -// hconf.set("dfs.namenode.fs-limits.min-block-size", "1024"); -// -// Builder builder = new MiniDFSCluster.Builder(hconf); -// builder.numDataNodes(numDN); -// builder.nameNodePort(port); -// MiniDFSCluster cluster = builder.build(); -// return cluster; -// } - - public static void setConfigFile(HDFSStoreFactory factory, File configFile, String config) - throws Exception { - BufferedWriter bw = new BufferedWriter(new FileWriter(configFile)); - bw.write(config); - bw.close(); - factory.setHDFSClientConfigFile(configFile.getName()); - } - - public static void alterMajorCompaction(HDFSStoreImpl store, boolean enable) { - HDFSStoreMutator mutator = store.createHdfsStoreMutator(); - mutator.setMajorCompaction(enable); - store.alter(mutator); - } - - public static void alterMinorCompaction(HDFSStoreImpl store, boolean enable) { - HDFSStoreMutator mutator = store.createHdfsStoreMutator(); - mutator.setMinorCompaction(enable); - store.alter(mutator); - } - - public void deleteMiniClusterDir() throws Exception { - File clusterDir = new File("hdfs-test-cluster"); - if (clusterDir.exists()) { - FileUtils.deleteDirectory(clusterDir); - } - } - - public static class TestEvent extends SortedHDFSQueuePersistedEvent { - Object key; - - public TestEvent(String k, String v) throws Exception { - this(k, v, Operation.PUT_IF_ABSENT); - } - - public TestEvent(String k, String v, Operation op) throws Exception { - super(v, op, (byte) 0x02, false, new DiskVersionTag(), BlobHelper.serializeToBlob(k), 0); - this.key = k; - } - - public Object getKey() { - return key; - - } - - public Object getNewValue() { - return valueObject; - } - - public Operation getOperation() { - return op; - } - - public Region<Object, Object> getRegion() { - return null; - } - - public Object getCallbackArgument() { - return null; - } - - public boolean isCallbackArgumentAvailable() { - return false; - } - - public boolean isOriginRemote() { - return false; - } - - public DistributedMember getDistributedMember() { - return null; - } - - public boolean isExpiration() { - return false; - } - - public boolean isDistributed() { - return false; - } - - public Object getOldValue() { - return null; - } - - public SerializedCacheValue<Object> getSerializedOldValue() { - return null; - } - - public SerializedCacheValue<Object> getSerializedNewValue() { - return null; - } - - public boolean isLocalLoad() { - return false; - } - - public boolean isNetLoad() { - return false; - } - - public boolean isLoad() { - return false; - } - - public boolean isNetSearch() { - return false; - } - - public TransactionId getTransactionId() { - return null; - } - - public boolean isBridgeEvent() { - return false; - } - - public boolean hasClientOrigin() { - return false; - } - - public boolean isOldValueAvailable() { - return false; - } - } - - public abstract class AbstractCompactor implements Compactor { - @Override - public HDFSStore getHdfsStore() { - return hdfsStore; - } - - public void suspend() { - } - - public void resume() { - } - - public boolean isBusy(boolean isMajor) { - return false; - } - } - - public HDFSStoreFactoryImpl getCloseableLocalHdfsStoreFactory() { - final FileSystemFactory fsFactory = new FileSystemFactory() { - // by default local FS instance is not disabled by close. Hence this - // customization - class CustomFileSystem extends LocalFileSystem { - boolean isClosed = false; - - public void close() throws IOException { - isClosed = true; - super.close(); - } - - public FileStatus getFileStatus(Path f) throws IOException { - if (isClosed) { - throw new IOException(); - } - return super.getFileStatus(f); - } - } - - public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException { - CustomFileSystem fs = new CustomFileSystem(); - fs.initialize(namenode, conf); - return fs; - } - }; - - HDFSStoreFactoryImpl storeFactory = new HDFSStoreFactoryImpl(cache) { - public HDFSStore create(String name) { - return new HDFSStoreImpl(name, this.configHolder) { - public FileSystemFactory getFileSystemFactory() { - return fsFactory; - } - }; - } - }; - return storeFactory; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java deleted file mode 100644 index db050b3..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.Operation; -import com.gemstone.gemfire.test.junit.categories.HoplogTest; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest -; - - -@Category({IntegrationTest.class, HoplogTest.class}) -public class CardinalityEstimatorJUnitTest extends BaseHoplogTestCase { - - public void testSingleHoplogCardinality() throws Exception { - int count = 10; - int bucketId = (int) System.nanoTime(); - HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - // flush and create hoplog - ArrayList<TestEvent> items = new ArrayList<TestEvent>(); - for (int i = 0; i < count; i++) { - items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime()))); - } - // assert that size is 0 before flush begins - assertEquals(0, organizer.sizeEstimate()); - organizer.flush(items.iterator(), count); - - assertEquals(count, organizer.sizeEstimate()); - assertEquals(0, stats.getActiveReaderCount()); - - organizer.close(); - organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - assertEquals(count, organizer.sizeEstimate()); - assertEquals(1, stats.getActiveReaderCount()); - } - - public void testSingleHoplogCardinalityWithDuplicates() throws Exception { - int bucketId = (int) System.nanoTime(); - HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - List<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent("key-0", "value-0")); - items.add(new TestEvent("key-0", "value-0")); - items.add(new TestEvent("key-1", "value-1")); - items.add(new TestEvent("key-2", "value-2")); - items.add(new TestEvent("key-3", "value-3")); - items.add(new TestEvent("key-3", "value-3")); - items.add(new TestEvent("key-4", "value-4")); - - organizer.flush(items.iterator(), 7); - assertEquals(5, organizer.sizeEstimate()); - } - - public void testMultipleHoplogCardinality() throws Exception { - int bucketId = (int) System.nanoTime(); - HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - List<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent("key-0", "value-0")); - items.add(new TestEvent("key-1", "value-1")); - items.add(new TestEvent("key-2", "value-2")); - items.add(new TestEvent("key-3", "value-3")); - items.add(new TestEvent("key-4", "value-4")); - - organizer.flush(items.iterator(), 5); - assertEquals(5, organizer.sizeEstimate()); - - items.clear(); - items.add(new TestEvent("key-1", "value-0")); - items.add(new TestEvent("key-5", "value-5")); - items.add(new TestEvent("key-6", "value-6")); - items.add(new TestEvent("key-7", "value-7")); - items.add(new TestEvent("key-8", "value-8")); - items.add(new TestEvent("key-9", "value-9")); - - organizer.flush(items.iterator(), 6); - assertEquals(10, organizer.sizeEstimate()); - } - - public void testCardinalityAfterRestart() throws Exception { - int bucketId = (int) System.nanoTime(); - HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - List<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent("key-0", "value-0")); - items.add(new TestEvent("key-1", "value-1")); - items.add(new TestEvent("key-2", "value-2")); - items.add(new TestEvent("key-3", "value-3")); - items.add(new TestEvent("key-4", "value-4")); - - assertEquals(0, organizer.sizeEstimate()); - organizer.flush(items.iterator(), 5); - assertEquals(5, organizer.sizeEstimate()); - - // restart - organizer.close(); - organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - assertEquals(5, organizer.sizeEstimate()); - - items.clear(); - items.add(new TestEvent("key-1", "value-0")); - items.add(new TestEvent("key-5", "value-5")); - items.add(new TestEvent("key-6", "value-6")); - items.add(new TestEvent("key-7", "value-7")); - items.add(new TestEvent("key-8", "value-8")); - items.add(new TestEvent("key-9", "value-9")); - - organizer.flush(items.iterator(), 6); - assertEquals(10, organizer.sizeEstimate()); - - // restart - make sure that HLL from the youngest file is read - organizer.close(); - organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - assertEquals(10, organizer.sizeEstimate()); - - items.clear(); - items.add(new TestEvent("key-1", "value-1")); - items.add(new TestEvent("key-5", "value-5")); - items.add(new TestEvent("key-10", "value-10")); - items.add(new TestEvent("key-11", "value-11")); - items.add(new TestEvent("key-12", "value-12")); - items.add(new TestEvent("key-13", "value-13")); - items.add(new TestEvent("key-14", "value-14")); - - organizer.flush(items.iterator(), 7); - assertEquals(15, organizer.sizeEstimate()); - } - - public void testCardinalityAfterMajorCompaction() throws Exception { - doCardinalityAfterCompactionWork(true); - } - - public void testCardinalityAfterMinorCompaction() throws Exception { - doCardinalityAfterCompactionWork(false); - } - - private void doCardinalityAfterCompactionWork(boolean isMajor) throws Exception { - int bucketId = (int) System.nanoTime(); - HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId); - - List<TestEvent> items = new ArrayList<TestEvent>(); - items.add(new TestEvent("key-0", "value-0")); - items.add(new TestEvent("key-1", "value-1")); - items.add(new TestEvent("key-2", "value-2")); - items.add(new TestEvent("key-3", "value-3")); - items.add(new TestEvent("key-4", "value-4")); - - organizer.flush(items.iterator(), 5); - assertEquals(5, organizer.sizeEstimate()); - - items.clear(); - items.add(new TestEvent("key-0", "value-0")); - items.add(new TestEvent("key-1", "value-5", Operation.DESTROY)); - items.add(new TestEvent("key-2", "value-6", Operation.INVALIDATE)); - items.add(new TestEvent("key-5", "value-5")); - - organizer.flush(items.iterator(), 4); - assertEquals(6, organizer.sizeEstimate()); - - items.clear(); - items.add(new TestEvent("key-3", "value-5", Operation.DESTROY)); - items.add(new TestEvent("key-4", "value-6", Operation.INVALIDATE)); - items.add(new TestEvent("key-5", "value-0")); - items.add(new TestEvent("key-6", "value-5")); - - organizer.flush(items.iterator(), 4); - - items.add(new TestEvent("key-5", "value-0")); - items.add(new TestEvent("key-6", "value-5")); - - items.clear(); - organizer.flush(items.iterator(), items.size()); - assertEquals(7, organizer.sizeEstimate()); - - organizer.getCompactor().compact(isMajor, false); - assertEquals(3, organizer.sizeEstimate()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java deleted file mode 100644 index 67dcddf..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.util.List; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.AttributesMutator; -import com.gemstone.gemfire.cache.CacheLoader; -import com.gemstone.gemfire.cache.CacheLoaderException; -import com.gemstone.gemfire.cache.LoaderHelper; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; -import com.gemstone.gemfire.test.junit.categories.HoplogTest; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest -; - -/** - * Tests that entries loaded from a cache loader are inserted in the HDFS queue - * - * @author hemantb - */ -@Category({IntegrationTest.class, HoplogTest.class}) -public class HDFSCacheLoaderJUnitTest extends BaseHoplogTestCase { - - private static int totalEventsReceived = 0; - protected void configureHdfsStoreFactory() throws Exception { - hsf = this.cache.createHDFSStoreFactory(); - hsf.setHomeDir(testDataDir.toString()); - hsf.setBatchInterval(100000000); - hsf.setBatchSize(10000); - } - - /** - * Tests that entries loaded from a cache loader are inserted in the HDFS queue - * but are not inserted in async queues. - * @throws Exception - */ - public void testCacheLoaderForAsyncQAndHDFS() throws Exception { - - final AsyncEventQueueStats hdfsQueuestatistics = ((AsyncEventQueueImpl)cache. - getAsyncEventQueues().toArray()[0]).getStatistics(); - - AttributesMutator am = this.region.getAttributesMutator(); - am.setCacheLoader(new CacheLoader() { - private int i = 0; - public Object load(LoaderHelper helper) - throws CacheLoaderException { - return new Integer(i++); - } - - public void close() { } - }); - - - - String asyncQueueName = "myQueue"; - new AsyncEventQueueFactoryImpl(cache).setBatchTimeInterval(1). - create(asyncQueueName, new AsyncEventListener() { - - @Override - public void close() { - // TODO Auto-generated method stub - - } - - @Override - public boolean processEvents(List events) { - totalEventsReceived += events.size(); - return true; - } - }); - am.addAsyncEventQueueId(asyncQueueName); - - region.put(1, new Integer(100)); - region.destroy(1); - region.get(1); - region.destroy(1); - - assertTrue("HDFS queue should have received four events. But it received " + - hdfsQueuestatistics.getEventQueueSize(), 4 == hdfsQueuestatistics.getEventQueueSize()); - assertTrue("HDFS queue should have received four events. But it received " + - hdfsQueuestatistics.getEventsReceived(), 4 == hdfsQueuestatistics.getEventsReceived()); - - region.get(1); - Thread.sleep(2000); - - assertTrue("Async queue should have received only 5 events. But it received " + - totalEventsReceived, totalEventsReceived == 5); - assertTrue("HDFS queue should have received 5 events. But it received " + - hdfsQueuestatistics.getEventQueueSize(), 5 == hdfsQueuestatistics.getEventQueueSize()); - assertTrue("HDFS queue should have received 5 events. But it received " + - hdfsQueuestatistics.getEventsReceived(), 5 == hdfsQueuestatistics.getEventsReceived()); - - - } - -}
