http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsSizeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsSizeSelfTest.java deleted file mode 100644 index 708de42..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsSizeSelfTest.java +++ /dev/null @@ -1,875 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.transactions.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CachePreloadMode.*; -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.processors.fs.IgfsFileInfo.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * {@link IgfsAttributes} test case. - */ -public class IgfsSizeSelfTest extends IgfsCommonAbstractTest { - /** How many grids to start. */ - private static final int GRID_CNT = 3; - - /** How many files to save. */ - private static final int FILES_CNT = 10; - - /** Maximum amount of bytes that could be written to particular file. */ - private static final int MAX_FILE_SIZE = 1024 * 10; - - /** Block size. */ - private static final int BLOCK_SIZE = 384; - - /** Cache name. */ - private static final String DATA_CACHE_NAME = "dataCache"; - - /** Cache name. */ - private static final String META_CACHE_NAME = "metaCache"; - - /** GGFS name. */ - private static final String GGFS_NAME = "ggfs"; - - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** GGFS management port */ - private static int mgmtPort; - - /** Data cache mode. */ - private CacheMode cacheMode; - - /** Whether near cache is enabled (applicable for PARTITIONED cache only). */ - private boolean nearEnabled; - - /** GGFS maximum space. */ - private long ggfsMaxData; - - /** Trash purge timeout. */ - private long trashPurgeTimeout; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - cacheMode = null; - nearEnabled = false; - ggfsMaxData = 0; - trashPurgeTimeout = 0; - - mgmtPort = 11400; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - G.stopAll(true); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - IgfsConfiguration ggfsCfg = new IgfsConfiguration(); - - ggfsCfg.setDataCacheName(DATA_CACHE_NAME); - ggfsCfg.setMetaCacheName(META_CACHE_NAME); - ggfsCfg.setName(GGFS_NAME); - ggfsCfg.setBlockSize(BLOCK_SIZE); - ggfsCfg.setFragmentizerEnabled(false); - ggfsCfg.setMaxSpaceSize(ggfsMaxData); - ggfsCfg.setTrashPurgeTimeout(trashPurgeTimeout); - ggfsCfg.setManagementPort(++mgmtPort); - - CacheConfiguration dataCfg = defaultCacheConfiguration(); - - dataCfg.setName(DATA_CACHE_NAME); - dataCfg.setCacheMode(cacheMode); - - if (cacheMode == PARTITIONED) { - dataCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); - dataCfg.setBackups(0); - } - - dataCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCfg.setPreloadMode(SYNC); - dataCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); - dataCfg.setQueryIndexEnabled(false); - dataCfg.setAtomicityMode(TRANSACTIONAL); - - CacheConfiguration metaCfg = defaultCacheConfiguration(); - - metaCfg.setName(META_CACHE_NAME); - metaCfg.setCacheMode(REPLICATED); - - metaCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCfg.setPreloadMode(SYNC); - metaCfg.setQueryIndexEnabled(false); - metaCfg.setAtomicityMode(TRANSACTIONAL); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(metaCfg, dataCfg); - cfg.setGgfsConfiguration(ggfsCfg); - - return cfg; - } - - /** - * Perform initial startup. - * - * @throws Exception If failed. - */ - private void startUp() throws Exception { - startGrids(GRID_CNT); - } - - /** - * Ensure that PARTITIONED cache is correctly initialized. - * - * @throws Exception If failed. - */ - public void testPartitioned() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = true; - - check(); - } - - /** - * Ensure that co-located cache is correctly initialized. - * - * @throws Exception If failed. - */ - public void testColocated() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = false; - - check(); - } - - /** - * Ensure that REPLICATED cache is correctly initialized. - * - * @throws Exception If failed. - */ - public void testReplicated() throws Exception { - cacheMode = REPLICATED; - - check(); - } - - /** - * Ensure that exception is thrown in case PARTITIONED cache is oversized. - * - * @throws Exception If failed. - */ - public void testPartitionedOversize() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = true; - - checkOversize(); - } - - /** - * Ensure that exception is thrown in case co-located cache is oversized. - * - * @throws Exception If failed. - */ - public void testColocatedOversize() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = false; - - check(); - } - - /** - * Ensure that exception is thrown in case REPLICATED cache is oversized. - * - * @throws Exception If failed. - */ - public void testReplicatedOversize() throws Exception { - cacheMode = REPLICATED; - - check(); - } - - /** - * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testPartitionedOversizeDelay() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = true; - - checkOversizeDelay(); - } - - /** - * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testColocatedOversizeDelay() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = false; - - checkOversizeDelay(); - } - - /** - * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently. - * - * @throws Exception If failed. - */ - public void testReplicatedOversizeDelay() throws Exception { - cacheMode = REPLICATED; - - checkOversizeDelay(); - } - - /** - * Ensure that GGFS size is correctly updated in case of preloading for PARTITIONED cache. - * - * @throws Exception If failed. - */ - public void testPartitionedPreload() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = true; - - checkPreload(); - } - - /** - * Ensure that GGFS size is correctly updated in case of preloading for co-located cache. - * - * @throws Exception If failed. - */ - public void testColocatedPreload() throws Exception { - cacheMode = PARTITIONED; - nearEnabled = false; - - checkPreload(); - } - - /** - * Ensure that GGFS cache size is calculated correctly. - * - * @throws Exception If failed. - */ - private void check() throws Exception { - startUp(); - - // Ensure that cache was marked as GGFS data cache. - for (int i = 0; i < GRID_CNT; i++) { - IgniteEx g = grid(i); - - GridCacheProjectionEx cache = (GridCacheProjectionEx)g.cachex(DATA_CACHE_NAME).cache(); - - assert cache.isGgfsDataCache(); - } - - // Perform writes. - Collection<IgfsFile> files = write(); - - // Check sizes. - Map<UUID, Integer> expSizes = new HashMap<>(GRID_CNT, 1.0f); - - for (IgfsFile file : files) { - for (IgfsBlock block : file.blocks()) { - Collection<UUID> ids = primaryOrBackups(block.key()); - - for (UUID id : ids) { - if (expSizes.get(id) == null) - expSizes.put(id, block.length()); - else - expSizes.put(id, expSizes.get(id) + block.length()); - } - } - } - - for (int i = 0; i < GRID_CNT; i++) { - UUID id = grid(i).localNode().id(); - - GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id); - - int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; - - assert expSize == cache.ggfsDataSpaceUsed(); - } - - // Perform reads which could potentially be non-local. - byte[] buf = new byte[BLOCK_SIZE]; - - for (IgfsFile file : files) { - for (int i = 0; i < GRID_CNT; i++) { - int total = 0; - - IgfsInputStream is = ggfs(i).open(file.path()); - - while (true) { - int read = is.read(buf); - - if (read == -1) - break; - else - total += read; - } - - assert total == file.length() : "Not enough bytes read: [expected=" + file.length() + ", actual=" + - total + ']'; - - is.close(); - } - } - - // Check sizes after read. - if (cacheMode == PARTITIONED) { - // No changes since the previous check for co-located cache. - for (int i = 0; i < GRID_CNT; i++) { - UUID id = grid(i).localNode().id(); - - GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id); - - int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; - - assert expSize == cache.ggfsDataSpaceUsed(); - } - } - else { - // All data must exist on each cache. - int totalSize = 0; - - for (IgfsFile file : files) - totalSize += file.length(); - - for (int i = 0; i < GRID_CNT; i++) { - UUID id = grid(i).localNode().id(); - - GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id); - - assertEquals(totalSize, cache.ggfsDataSpaceUsed()); - } - } - - // Delete data and ensure that all counters are 0 now. - for (IgfsFile file : files) { - ggfs(0).delete(file.path(), false); - - // Await for actual delete to occur. - for (IgfsBlock block : file.blocks()) { - for (int i = 0; i < GRID_CNT; i++) { - while (cache(grid(i).localNode().id()).peek(block.key()) != null) - U.sleep(100); - } - } - } - - for (int i = 0; i < GRID_CNT; i++) { - GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(grid(i).localNode().id()); - - assert 0 == cache.ggfsDataSpaceUsed() : "Size counter is not 0: " + cache.ggfsDataSpaceUsed(); - } - } - - /** - * Ensure that an exception is thrown in case of GGFS oversize. - * - * @throws Exception If failed. - */ - private void checkOversize() throws Exception { - ggfsMaxData = BLOCK_SIZE; - - startUp(); - - final IgfsPath path = new IgfsPath("/file"); - - // This write is expected to be successful. - IgfsOutputStream os = ggfs(0).create(path, false); - os.write(chunk(BLOCK_SIZE - 1)); - os.close(); - - // This write must be successful as well. - os = ggfs(0).append(path, false); - os.write(chunk(1)); - os.close(); - - // This write must fail w/ exception. - GridTestUtils.assertThrows(log(), new Callable<Object>() { - @Override public Object call() throws Exception { - IgfsOutputStream osErr = ggfs(0).append(path, false); - - try { - osErr.write(chunk(BLOCK_SIZE)); - osErr.close(); - - return null; - } - catch (IOException e) { - Throwable e0 = e; - - while (e0.getCause() != null) - e0 = e0.getCause(); - - throw (Exception)e0; - } - finally { - U.closeQuiet(osErr); - } - } - }, IgfsOutOfSpaceException.class, "Failed to write data block (GGFS maximum data size exceeded) [used=" + - ggfsMaxData + ", allowed=" + ggfsMaxData + ']'); - } - - /** - * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory. - * - * @throws Exception If failed. - */ - private void checkOversizeDelay() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - - ggfsMaxData = 256; - trashPurgeTimeout = 2000; - - startUp(); - - IgfsImpl ggfs = ggfs(0); - - final IgfsPath path = new IgfsPath("/file"); - final IgfsPath otherPath = new IgfsPath("/fileOther"); - - // Fill cache with data up to it's limit. - IgfsOutputStream os = ggfs.create(path, false); - os.write(chunk((int)ggfsMaxData)); - os.close(); - - final GridCache<IgniteUuid, IgfsFileInfo> metaCache = ggfs.context().kernalContext().cache().cache( - ggfs.configuration().getMetaCacheName()); - - // Start a transaction in a separate thread which will lock file ID. - final IgniteUuid id = ggfs.context().meta().fileId(path); - final IgfsFileInfo info = ggfs.context().meta().info(id); - - final AtomicReference<Throwable> err = new AtomicReference<>(); - - try { - new Thread(new Runnable() { - @Override public void run() { - try { - - try (IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - metaCache.get(id); - - latch.await(); - - U.sleep(1000); // Sleep here so that data manager could "see" oversize. - - tx.commit(); - } - } - catch (Throwable e) { - err.set(e); - } - } - }).start(); - - // Now add file ID to trash listing so that delete worker could "see" it. - - try (IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(), - new IgfsListingEntry(info)); - - // Clear root listing. - metaCache.put(ROOT_ID, new IgfsFileInfo(ROOT_ID)); - - // Add file to trash listing. - IgfsFileInfo trashInfo = metaCache.get(TRASH_ID); - - if (trashInfo == null) - metaCache.put(TRASH_ID, new IgfsFileInfo(listing, new IgfsFileInfo(TRASH_ID))); - else - metaCache.put(TRASH_ID, new IgfsFileInfo(listing, trashInfo)); - - tx.commit(); - } - - assert metaCache.get(TRASH_ID) != null; - - // Now the file is locked and is located in trash, try adding some more data. - os = ggfs.create(otherPath, false); - os.write(new byte[1]); - - latch.countDown(); - - os.close(); - - assert err.get() == null; - } - finally { - latch.countDown(); // Safety. - } - } - - /** - * Ensure that GGFS size is correctly updated in case of preloading. - * - * @throws Exception If failed. - */ - private void checkPreload() throws Exception { - assert cacheMode == PARTITIONED; - - startUp(); - - // Perform writes. - Collection<IgfsFile> files = write(); - - // Check sizes. - Map<UUID, Integer> expSizes = new HashMap<>(GRID_CNT, 1.0f); - - for (IgfsFile file : files) { - for (IgfsBlock block : file.blocks()) { - Collection<UUID> ids = primaryOrBackups(block.key()); - - for (UUID id : ids) { - if (expSizes.get(id) == null) - expSizes.put(id, block.length()); - else - expSizes.put(id, expSizes.get(id) + block.length()); - } - } - } - - info("Size map before node start: " + expSizes); - - for (int i = 0; i < GRID_CNT; i++) { - UUID id = grid(i).localNode().id(); - - GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id); - - int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; - - assertEquals(expSize, cache.ggfsDataSpaceUsed()); - } - - // Start a node. - final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1); - - for (int i = 0; i < GRID_CNT - 1; i++) { - grid(0).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return true; - } - }, EVT_CACHE_PRELOAD_STOPPED); - } - - Ignite g = startGrid(GRID_CNT); - - info("Started grid: " + g.cluster().localNode().id()); - - U.awaitQuiet(latch); - - // Wait partitions are evicted. - awaitPartitionMapExchange(); - - // Check sizes again. - expSizes.clear(); - - for (IgfsFile file : files) { - for (IgfsBlock block : file.blocks()) { - Collection<UUID> ids = primaryOrBackups(block.key()); - - assert !ids.isEmpty(); - - for (UUID id : ids) { - if (expSizes.get(id) == null) - expSizes.put(id, block.length()); - else - expSizes.put(id, expSizes.get(id) + block.length()); - } - } - } - - info("Size map after node start: " + expSizes); - - for (int i = 0; i < GRID_CNT - 1; i++) { - UUID id = grid(i).localNode().id(); - - GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id); - - int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; - - assertEquals("For node: " + id, expSize, cache.ggfsDataSpaceUsed()); - } - } - - /** - * Create data chunk of the given length. - * - * @param len Length. - * @return Data chunk. - */ - private byte[] chunk(int len) { - byte[] chunk = new byte[len]; - - for (int i = 0; i < len; i++) - chunk[i] = (byte)i; - - return chunk; - } - - /** - * Create block key. - * - * @param path Path. - * @param blockId Block ID. - * @return Block key. - * @throws Exception If failed. - */ - private IgfsBlockKey blockKey(IgfsPath path, long blockId) throws Exception { - IgfsEx ggfs0 = (IgfsEx)grid(0).fileSystem(GGFS_NAME); - - IgniteUuid fileId = ggfs0.context().meta().fileId(path); - - return new IgfsBlockKey(fileId, null, true, blockId); - } - - /** - * Determine primary node for the given block key. - * - * @param key Block key. - * @return Node ID. - */ - private UUID primary(IgfsBlockKey key) { - IgniteEx grid = grid(0); - - for (ClusterNode node : grid.nodes()) { - if (grid.cachex(DATA_CACHE_NAME).affinity().isPrimary(node, key)) - return node.id(); - } - - return null; - } - - /** - * Determine primary and backup node IDs for the given block key. - * - * @param key Block key. - * @return Collection of node IDs. - */ - private Collection<UUID> primaryOrBackups(IgfsBlockKey key) { - IgniteEx grid = grid(0); - - Collection<UUID> ids = new HashSet<>(); - - for (ClusterNode node : grid.nodes()) { - if (grid.cachex(DATA_CACHE_NAME).affinity().isPrimaryOrBackup(node, key)) - ids.add(node.id()); - } - - return ids; - } - - /** - * Get GGfs of a node with the given index. - * - * @param idx Node index. - * @return GGFS. - * @throws Exception If failed. - */ - private IgfsImpl ggfs(int idx) throws Exception { - return (IgfsImpl)grid(idx).fileSystem(GGFS_NAME); - } - - /** - * Get GGfs of the given node. - * - * @param ignite Node; - * @return GGFS. - * @throws Exception If failed. - */ - private IgfsImpl ggfs(Ignite ignite) throws Exception { - return (IgfsImpl) ignite.fileSystem(GGFS_NAME); - } - - /** - * Get data cache for the given node ID. - * - * @param nodeId Node ID. - * @return Data cache. - */ - private GridCacheAdapter<IgfsBlockKey, byte[]> cache(UUID nodeId) { - return (GridCacheAdapter<IgfsBlockKey, byte[]>)((IgniteEx)G.ignite(nodeId)).cachex(DATA_CACHE_NAME) - .<IgfsBlockKey, byte[]>cache(); - } - - /** - * Perform write of the files. - * - * @return Collection of written file descriptors. - * @throws Exception If failed. - */ - private Collection<IgfsFile> write() throws Exception { - Collection<IgfsFile> res = new HashSet<>(FILES_CNT, 1.0f); - - ThreadLocalRandom8 rand = ThreadLocalRandom8.current(); - - for (int i = 0; i < FILES_CNT; i++) { - // Create empty file locally. - IgfsPath path = new IgfsPath("/file-" + i); - - ggfs(0).create(path, false).close(); - - IgfsMetaManager meta = ggfs(0).context().meta(); - - IgniteUuid fileId = meta.fileId(path); - - // Calculate file blocks. - int fileSize = rand.nextInt(MAX_FILE_SIZE); - - int fullBlocks = fileSize / BLOCK_SIZE; - int remainderSize = fileSize % BLOCK_SIZE; - - Collection<IgfsBlock> blocks = new ArrayList<>(fullBlocks + remainderSize > 0 ? 1 : 0); - - for (int j = 0; j < fullBlocks; j++) - blocks.add(new IgfsBlock(new IgfsBlockKey(fileId, null, true, j), BLOCK_SIZE)); - - if (remainderSize > 0) - blocks.add(new IgfsBlock(new IgfsBlockKey(fileId, null, true, fullBlocks), remainderSize)); - - IgfsFile file = new IgfsFile(path, fileSize, blocks); - - // Actual write. - for (IgfsBlock block : blocks) { - IgfsOutputStream os = ggfs(0).append(path, false); - - os.write(chunk(block.length())); - - os.close(); - } - - // Add written file to the result set. - res.add(file); - } - - return res; - } - - /** A file written to the file system. */ - private static class IgfsFile { - /** Path to the file, */ - private final IgfsPath path; - - /** File length. */ - private final int len; - - /** Blocks with their corresponding locations. */ - private final Collection<IgfsBlock> blocks; - - /** - * Constructor. - * - * @param path Path. - * @param len Length. - * @param blocks Blocks. - */ - private IgfsFile(IgfsPath path, int len, Collection<IgfsBlock> blocks) { - this.path = path; - this.len = len; - this.blocks = blocks; - } - - /** @return Path. */ - IgfsPath path() { - return path; - } - - /** @return Length. */ - int length() { - return len; - } - - /** @return Blocks. */ - Collection<IgfsBlock> blocks() { - return blocks; - } - } - - /** Block written to the file system. */ - private static class IgfsBlock { - /** Block key. */ - private final IgfsBlockKey key; - - /** Block length. */ - private final int len; - - /** - * Constructor. - * - * @param key Block key. - * @param len Block length. - */ - private IgfsBlock(IgfsBlockKey key, int len) { - this.key = key; - this.len = len; - } - - /** @return Block key. */ - private IgfsBlockKey key() { - return key; - } - - /** @return Block length. */ - private int length() { - return len; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsStreamsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsStreamsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsStreamsSelfTest.java deleted file mode 100644 index 3a2f8f3..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsStreamsSelfTest.java +++ /dev/null @@ -1,472 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.testframework.GridTestUtils.*; - -/** - * Tests for GGFS streams content. - */ -public class IgfsStreamsSelfTest extends IgfsCommonAbstractTest { - /** Test IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Meta-information cache name. */ - private static final String META_CACHE_NAME = "replicated"; - - /** Data cache name. */ - public static final String DATA_CACHE_NAME = "data"; - - /** Group size. */ - public static final int CFG_GRP_SIZE = 128; - - /** Pre-configured block size. */ - private static final int CFG_BLOCK_SIZE = 64000; - - /** Number of threads to test parallel readings. */ - private static final int WRITING_THREADS_CNT = 5; - - /** Number of threads to test parallel readings. */ - private static final int READING_THREADS_CNT = 5; - - /** Test nodes count. */ - private static final int NODES_CNT = 4; - - /** Number of retries for async ops. */ - public static final int ASSERT_RETRIES = 100; - - /** Delay between checks for async ops. */ - public static final int ASSERT_RETRY_INTERVAL = 100; - - /** File system to test. */ - private IgniteFs fs; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrids(NODES_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - if (NODES_CNT <= 0) - return; - - // Initialize FS. - fs = grid(0).fileSystem("ggfs"); - - // Cleanup FS. - fs.format(); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME)); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - IgfsConfiguration ggfsCfg = new IgfsConfiguration(); - - ggfsCfg.setMetaCacheName(META_CACHE_NAME); - ggfsCfg.setDataCacheName(DATA_CACHE_NAME); - ggfsCfg.setName("ggfs"); - ggfsCfg.setBlockSize(CFG_BLOCK_SIZE); - ggfsCfg.setFragmentizerEnabled(true); - - cfg.setGgfsConfiguration(ggfsCfg); - - return cfg; - } - - /** {@inheritDoc} */ - protected CacheConfiguration cacheConfiguration(String cacheName) { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setName(cacheName); - - if (META_CACHE_NAME.equals(cacheName)) - cacheCfg.setCacheMode(REPLICATED); - else { - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); - - cacheCfg.setBackups(0); - cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(CFG_GRP_SIZE)); - } - - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - cacheCfg.setQueryIndexEnabled(false); - - return cacheCfg; - } - - /** - * Test GGFS construction. - * - * @throws IgniteCheckedException In case of exception. - */ - public void testConfiguration() throws IgniteCheckedException { - GridCache metaCache = getFieldValue(fs, "meta", "metaCache"); - GridCache dataCache = getFieldValue(fs, "data", "dataCache"); - - assertNotNull(metaCache); - assertEquals(META_CACHE_NAME, metaCache.name()); - assertEquals(REPLICATED, metaCache.configuration().getCacheMode()); - - assertNotNull(dataCache); - assertEquals(DATA_CACHE_NAME, dataCache.name()); - assertEquals(PARTITIONED, dataCache.configuration().getCacheMode()); - } - - /** - * Test file creation. - * - * @throws Exception In case of exception. - */ - public void testCreateFile() throws Exception { - IgfsPath root = new IgfsPath("/"); - IgfsPath path = new IgfsPath("/asdf"); - - long max = 100L * CFG_BLOCK_SIZE / WRITING_THREADS_CNT; - - for (long size = 0; size <= max; size = size * 15 / 10 + 1) { - assertEquals(Collections.<IgfsPath>emptyList(), fs.listPaths(root)); - - testCreateFile(path, size, new Random().nextInt()); - } - } - - /** @throws Exception If failed. */ - public void testCreateFileColocated() throws Exception { - IgfsPath path = new IgfsPath("/colocated"); - - UUID uuid = UUID.randomUUID(); - - IgniteUuid affKey; - - long idx = 0; - - while (true) { - affKey = new IgniteUuid(uuid, idx); - - if (grid(0).mapKeyToNode(DATA_CACHE_NAME, affKey).id().equals(grid(0).localNode().id())) - break; - - idx++; - } - - try (IgfsOutputStream out = fs.create(path, 1024, true, affKey, 0, 1024, null)) { - // Write 5M, should be enough to test distribution. - for (int i = 0; i < 15; i++) - out.write(new byte[1024 * 1024]); - } - - IgfsFile info = fs.info(path); - - Collection<IgfsBlockLocation> affNodes = fs.affinity(path, 0, info.length()); - - assertEquals(1, affNodes.size()); - Collection<UUID> nodeIds = F.first(affNodes).nodeIds(); - - assertEquals(1, nodeIds.size()); - assertEquals(grid(0).localNode().id(), F.first(nodeIds)); - } - - /** @throws Exception If failed. */ - public void testCreateFileFragmented() throws Exception { - IgfsEx impl = (IgfsEx)grid(0).fileSystem("ggfs"); - - IgfsFragmentizerManager fragmentizer = impl.context().fragmentizer(); - - GridTestUtils.setFieldValue(fragmentizer, "fragmentizerEnabled", false); - - IgfsPath path = new IgfsPath("/file"); - - try { - IgniteFs fs0 = grid(0).fileSystem("ggfs"); - IgniteFs fs1 = grid(1).fileSystem("ggfs"); - IgniteFs fs2 = grid(2).fileSystem("ggfs"); - - try (IgfsOutputStream out = fs0.create(path, 128, false, 1, CFG_GRP_SIZE, - F.asMap(IgniteFs.PROP_PREFER_LOCAL_WRITES, "true"))) { - // 1.5 blocks - byte[] data = new byte[CFG_BLOCK_SIZE * 3 / 2]; - - Arrays.fill(data, (byte)1); - - out.write(data); - } - - try (IgfsOutputStream out = fs1.append(path, false)) { - // 1.5 blocks. - byte[] data = new byte[CFG_BLOCK_SIZE * 3 / 2]; - - Arrays.fill(data, (byte)2); - - out.write(data); - } - - // After this we should have first two block colocated with grid 0 and last block colocated with grid 1. - IgfsFileImpl fileImpl = (IgfsFileImpl)fs.info(path); - - GridCache<Object, Object> metaCache = grid(0).cachex(META_CACHE_NAME); - - IgfsFileInfo fileInfo = (IgfsFileInfo)metaCache.get(fileImpl.fileId()); - - IgfsFileMap map = fileInfo.fileMap(); - - List<IgfsFileAffinityRange> ranges = map.ranges(); - - assertEquals(2, ranges.size()); - - assertTrue(ranges.get(0).startOffset() == 0); - assertTrue(ranges.get(0).endOffset() == 2 * CFG_BLOCK_SIZE - 1); - - assertTrue(ranges.get(1).startOffset() == 2 * CFG_BLOCK_SIZE); - assertTrue(ranges.get(1).endOffset() == 3 * CFG_BLOCK_SIZE - 1); - - // Validate data read after colocated writes. - try (IgfsInputStream in = fs2.open(path)) { - // Validate first part of file. - for (int i = 0; i < CFG_BLOCK_SIZE * 3 / 2; i++) - assertEquals((byte)1, in.read()); - - // Validate second part of file. - for (int i = 0; i < CFG_BLOCK_SIZE * 3 / 2; i++) - assertEquals((byte)2, in.read()); - - assertEquals(-1, in.read()); - } - } - finally { - GridTestUtils.setFieldValue(fragmentizer, "fragmentizerEnabled", true); - - boolean hasData = false; - - for (int i = 0; i < NODES_CNT; i++) - hasData |= !grid(i).cachex(DATA_CACHE_NAME).isEmpty(); - - assertTrue(hasData); - - fs.delete(path, true); - } - - GridTestUtils.retryAssert(log, ASSERT_RETRIES, ASSERT_RETRY_INTERVAL, new CAX() { - @Override public void applyx() { - for (int i = 0; i < NODES_CNT; i++) - assertTrue(grid(i).cachex(DATA_CACHE_NAME).isEmpty()); - } - }); - } - - /** - * Test file creation. - * - * @param path Path to file to store. - * @param size Size of file to store. - * @param salt Salt for file content generation. - * @throws Exception In case of any exception. - */ - private void testCreateFile(final IgfsPath path, final long size, final int salt) throws Exception { - info("Create file [path=" + path + ", size=" + size + ", salt=" + salt + ']'); - - final AtomicInteger cnt = new AtomicInteger(0); - final Collection<IgfsPath> cleanUp = new ConcurrentLinkedQueue<>(); - - long time = runMultiThreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - int id = cnt.incrementAndGet(); - - IgfsPath f = new IgfsPath(path.parent(), "asdf" + (id > 1 ? "-" + id : "")); - - try (IgfsOutputStream out = fs.create(f, 0, true, null, 0, 1024, null)) { - assertNotNull(out); - - cleanUp.add(f); // Add all created into cleanup list. - - U.copy(new IgfsTestInputStream(size, salt), out); - } - - return null; - } - }, WRITING_THREADS_CNT, "perform-multi-thread-writing"); - - if (time > 0) { - double rate = size * 1000. / time / 1024 / 1024; - - info(String.format("Write file [path=%s, size=%d kB, rate=%2.1f MB/s]", path, - WRITING_THREADS_CNT * size / 1024, WRITING_THREADS_CNT * rate)); - } - - info("Read and validate saved file: " + path); - - final InputStream expIn = new IgfsTestInputStream(size, salt); - final IgfsInputStream actIn = fs.open(path, CFG_BLOCK_SIZE * READING_THREADS_CNT * 11 / 10); - - // Validate continuous reading of whole file. - assertEqualStreams(expIn, actIn, size, null); - - // Validate random seek and reading. - final Random rnd = new Random(); - - runMultiThreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - long skip = Math.abs(rnd.nextLong() % (size + 1)); - long range = Math.min(size - skip, rnd.nextInt(CFG_BLOCK_SIZE * 400)); - - assertEqualStreams(new IgfsTestInputStream(size, salt), actIn, range, skip); - - return null; - } - }, READING_THREADS_CNT, "validate-multi-thread-reading"); - - expIn.close(); - actIn.close(); - - info("Get stored file info: " + path); - - IgfsFile desc = fs.info(path); - - info("Validate stored file info: " + desc); - - assertNotNull(desc); - - if (log.isDebugEnabled()) - log.debug("File descriptor: " + desc); - - Collection<IgfsBlockLocation> aff = fs.affinity(path, 0, desc.length()); - - assertFalse("Affinity: " + aff, desc.length() != 0 && aff.isEmpty()); - - int blockSize = desc.blockSize(); - - assertEquals("File size", size, desc.length()); - assertEquals("Binary block size", CFG_BLOCK_SIZE, blockSize); - //assertEquals("Permission", "rwxr-xr-x", desc.getPermission().toString()); - //assertEquals("Permission sticky bit marks this is file", false, desc.getPermission().getStickyBit()); - assertEquals("Type", true, desc.isFile()); - assertEquals("Type", false, desc.isDirectory()); - - info("Cleanup files: " + cleanUp); - - for (IgfsPath f : cleanUp) { - fs.delete(f, true); - assertNull(fs.info(f)); - } - } - - /** - * Validate streams generate the same output. - * - * @param expIn Expected input stream. - * @param actIn Actual input stream. - * @param expSize Expected size of the streams. - * @param seek Seek to use async position-based reading or {@code null} to use simple continuous reading. - * @throws IOException In case of any IO exception. - */ - private void assertEqualStreams(InputStream expIn, IgfsInputStream actIn, - @Nullable Long expSize, @Nullable Long seek) throws IOException { - if (seek != null) - expIn.skip(seek); - - int bufSize = 2345; - byte buf1[] = new byte[bufSize]; - byte buf2[] = new byte[bufSize]; - long pos = 0; - - long start = System.currentTimeMillis(); - - while (true) { - int read = (int)Math.min(bufSize, expSize - pos); - - int i1; - - if (seek == null) - i1 = actIn.read(buf1, 0, read); - else if (seek % 2 == 0) - i1 = actIn.read(pos + seek, buf1, 0, read); - else { - i1 = read; - - actIn.readFully(pos + seek, buf1, 0, read); - } - - // Read at least 0 byte, but don't read more then 'i1' or 'read'. - int i2 = expIn.read(buf2, 0, Math.max(0, Math.min(i1, read))); - - if (i1 != i2) { - fail("Expects the same data [read=" + read + ", pos=" + pos + ", seek=" + seek + - ", i1=" + i1 + ", i2=" + i2 + ']'); - } - - if (i1 == -1) - break; // EOF - - // i1 == bufSize => compare buffers. - // i1 < bufSize => Compare part of buffers, rest of buffers are equal from previous iteration. - assertTrue("Expects the same data [read=" + read + ", pos=" + pos + ", seek=" + seek + - ", i1=" + i1 + ", i2=" + i2 + ']', Arrays.equals(buf1, buf2)); - - if (read == 0) - break; // Nothing more to read. - - pos += i1; - } - - if (expSize != null) - assertEquals(expSize.longValue(), pos); - - long time = System.currentTimeMillis() - start; - - if (time != 0 && log.isInfoEnabled()) { - log.info(String.format("Streams were compared in continuous reading " + - "[size=%7d, rate=%3.1f MB/sec]", expSize, expSize * 1000. / time / 1024 / 1024)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTaskSelfTest.java deleted file mode 100644 index 08a3266..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTaskSelfTest.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.igfs.mapreduce.records.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.igfs.IgfsMode.*; - -/** - * Tests for {@link org.apache.ignite.igfs.mapreduce.IgfsTask}. - */ -public class IgfsTaskSelfTest extends IgfsCommonAbstractTest { - /** Predefined words dictionary. */ - private static final String[] DICTIONARY = new String[] {"word0", "word1", "word2", "word3", "word4", "word5", - "word6", "word7"}; - - /** File path. */ - private static final IgfsPath FILE = new IgfsPath("/file"); - - /** Shared IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Block size: 64 Kb. */ - private static final int BLOCK_SIZE = 64 * 1024; - - /** Total words in file. */ - private static final int TOTAL_WORDS = 2 * 1024 * 1024; - - /** Node count */ - private static final int NODE_CNT = 4; - - /** Repeat count. */ - private static final int REPEAT_CNT = 10; - - /** GGFS. */ - private static IgniteFs ggfs; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - for (int i = 0; i < NODE_CNT; i++) { - Ignite g = G.start(config(i)); - - if (i + 1 == NODE_CNT) - ggfs = g.fileSystem("ggfs"); - } - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(false); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - ggfs.format(); - } - - /** - * Create grid configuration. - * - * @param idx Node index. - * @return Grid configuration - */ - private IgniteConfiguration config(int idx) { - IgfsConfiguration ggfsCfg = new IgfsConfiguration(); - - ggfsCfg.setDataCacheName("dataCache"); - ggfsCfg.setMetaCacheName("metaCache"); - ggfsCfg.setName("ggfs"); - ggfsCfg.setBlockSize(BLOCK_SIZE); - ggfsCfg.setDefaultMode(PRIMARY); - ggfsCfg.setFragmentizerEnabled(false); - - CacheConfiguration dataCacheCfg = new CacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setDistributionMode(PARTITIONED_ONLY); - dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(1)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setQueryIndexEnabled(false); - - CacheConfiguration metaCacheCfg = new CacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setGgfsConfiguration(ggfsCfg); - - cfg.setGridName("node-" + idx); - - return cfg; - } - - /** - * Test task. - * - * @throws Exception If failed. - */ - public void testTask() throws Exception { - U.sleep(3000); // TODO: Sleep in order to wait for fragmentizing to finish. - - for (int i = 0; i < REPEAT_CNT; i++) { - String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)]; - - generateFile(TOTAL_WORDS); - Long genLen = ggfs.info(FILE).length(); - - IgniteBiTuple<Long, Integer> taskRes = ggfs.execute(new Task(), - new IgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg); - - assert F.eq(genLen, taskRes.getKey()); - assert F.eq(TOTAL_WORDS, taskRes.getValue()); - } - } - - /** - * Test task. - * - * @throws Exception If failed. - */ - public void testTaskAsync() throws Exception { - U.sleep(3000); - - assertFalse(ggfs.isAsync()); - - IgniteFs ggfsAsync = ggfs.withAsync(); - - assertTrue(ggfsAsync.isAsync()); - - for (int i = 0; i < REPEAT_CNT; i++) { - String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)]; - - generateFile(TOTAL_WORDS); - Long genLen = ggfs.info(FILE).length(); - - assertNull(ggfsAsync.execute( - new Task(), new IgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg)); - - IgniteFuture<IgniteBiTuple<Long, Integer>> fut = ggfsAsync.future(); - - assertNotNull(fut); - - IgniteBiTuple<Long, Integer> taskRes = fut.get(); - - assert F.eq(genLen, taskRes.getKey()); - assert F.eq(TOTAL_WORDS, taskRes.getValue()); - } - - ggfsAsync.format(); - - IgniteFuture<?> fut = ggfsAsync.future(); - - assertNotNull(fut); - - fut.get(); - } - - /** - * Generate file with random data and provided argument. - * - * @param wordCnt Word count. - * @throws Exception If failed. - */ - private void generateFile(int wordCnt) - throws Exception { - Random rnd = new Random(System.currentTimeMillis()); - - try (OutputStreamWriter writer = new OutputStreamWriter(ggfs.create(FILE, true))) { - int cnt = 0; - - while (cnt < wordCnt) { - String word = DICTIONARY[rnd.nextInt(DICTIONARY.length)]; - - writer.write(word + " "); - - cnt++; - } - } - } - - /** - * Task. - */ - private static class Task extends IgfsTask<String, IgniteBiTuple<Long, Integer>> { - /** {@inheritDoc} */ - @Override public IgfsJob createJob(IgfsPath path, IgfsFileRange range, - IgfsTaskArgs<String> args) { - return new Job(); - } - - /** {@inheritDoc} */ - @Override public IgniteBiTuple<Long, Integer> reduce(List<ComputeJobResult> ress) { - long totalLen = 0; - int argCnt = 0; - - for (ComputeJobResult res : ress) { - IgniteBiTuple<Long, Integer> res0 = (IgniteBiTuple<Long, Integer>)res.getData(); - - if (res0 != null) { - totalLen += res0.getKey(); - argCnt += res0.getValue(); - } - } - - return F.t(totalLen, argCnt); - } - } - - /** - * Job. - */ - private static class Job implements IgfsJob, Serializable { - @IgniteInstanceResource - private Ignite ignite; - - @TaskSessionResource - private ComputeTaskSession ses; - - @JobContextResource - private ComputeJobContext ctx; - - /** {@inheritDoc} */ - @Override public Object execute(IgniteFs ggfs, IgfsFileRange range, IgfsInputStream in) - throws IOException { - assert ignite != null; - assert ses != null; - assert ctx != null; - - in.seek(range.start()); - - byte[] buf = new byte[(int)range.length()]; - - int totalRead = 0; - - while (totalRead < buf.length) { - int b = in.read(); - - assert b != -1; - - buf[totalRead++] = (byte)b; - } - - String str = new String(buf); - - String[] chunks = str.split(" "); - - int ctr = 0; - - for (String chunk : chunks) { - if (!chunk.isEmpty()) - ctr++; - } - - return F.t(range.length(), ctr); - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTestInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTestInputStream.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTestInputStream.java deleted file mode 100644 index 4d807a3..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/IgfsTestInputStream.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import java.io.*; - -/** - * Test input stream with predictable data output and zero memory usage. - */ -class IgfsTestInputStream extends InputStream { - /** This stream length. */ - private long size; - - /** Salt for input data generation. */ - private long salt; - - /** Current stream position. */ - private long pos; - - /** - * Constructs test input stream. - * - * @param size This stream length. - * @param salt Salt for input data generation. - */ - IgfsTestInputStream(long size, long salt) { - this.size = size; - this.salt = salt; - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - if (pos >= size) - return -1; - - long next = salt ^ (salt * pos++); - - next ^= next >>> 32; - next ^= next >>> 16; - next ^= next >>> 8; - - return (int)(0xFF & next); - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - pos += Math.min(n, size - pos); - - return size - pos; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html deleted file mode 100644 index 6556981..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains internal tests or test related classes and interfaces. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsAbstractRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsAbstractRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsAbstractRecordResolverSelfTest.java deleted file mode 100644 index 379ba22..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsAbstractRecordResolverSelfTest.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs.split; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.igfs.IgfsMode.*; - -/** - * Base class for all split resolvers - */ -public class IgfsAbstractRecordResolverSelfTest extends GridCommonAbstractTest { - /** File path. */ - protected static final IgfsPath FILE = new IgfsPath("/file"); - - /** Shared IP finder. */ - private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** GGFS. */ - protected static IgniteFs ggfs; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - IgfsConfiguration ggfsCfg = new IgfsConfiguration(); - - ggfsCfg.setDataCacheName("dataCache"); - ggfsCfg.setMetaCacheName("metaCache"); - ggfsCfg.setName("ggfs"); - ggfsCfg.setBlockSize(512); - ggfsCfg.setDefaultMode(PRIMARY); - - CacheConfiguration dataCacheCfg = new CacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setDistributionMode(NEAR_PARTITIONED); - dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setQueryIndexEnabled(false); - - CacheConfiguration metaCacheCfg = new CacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - metaCacheCfg.setWriteSynchronizationMode(FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName("grid"); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setGgfsConfiguration(ggfsCfg); - - Ignite g = G.start(cfg); - - ggfs = g.fileSystem("ggfs"); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(false); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - ggfs.format(); - } - - /** - * Convenient method for wrapping some bytes into byte array. - * - * @param data Data bytes. - * @return Byte array. - */ - protected static byte[] wrap(int... data) { - byte[] res = new byte[data.length]; - - for (int i = 0; i < data.length; i++) - res[i] = (byte)data[i]; - - return res; - } - - /** - * Create byte array consisting of the given chunks. - * - * @param chunks Array of chunks where the first value is the byte array and the second value is amount of repeats. - * @return Byte array. - */ - protected static byte[] array(Map.Entry<byte[], Integer>... chunks) { - int totalSize = 0; - - for (Map.Entry<byte[], Integer> chunk : chunks) - totalSize += chunk.getKey().length * chunk.getValue(); - - byte[] res = new byte[totalSize]; - - int pos = 0; - - for (Map.Entry<byte[], Integer> chunk : chunks) { - for (int i = 0; i < chunk.getValue(); i++) { - System.arraycopy(chunk.getKey(), 0, res, pos, chunk.getKey().length); - - pos += chunk.getKey().length; - } - } - - return res; - } - - /** - * Open file for read and return input stream. - * - * @return Input stream. - * @throws Exception In case of exception. - */ - protected IgfsInputStream read() throws Exception { - return ggfs.open(FILE); - } - - /** - * Write data to the file. - * - * @param chunks Data chunks. - * @throws Exception In case of exception. - */ - protected void write(byte[]... chunks) throws Exception { - IgfsOutputStream os = ggfs.create(FILE, true); - - if (chunks != null) { - for (byte[] chunk : chunks) - os.write(chunk); - } - - os.close(); - } - - /** - * Create split. - * - * @param start Start position. - * @param len Length. - * @return Split. - */ - protected IgfsFileRange split(long start, long len) { - return new IgfsFileRange(FILE, start, len); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsByteDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsByteDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsByteDelimiterRecordResolverSelfTest.java deleted file mode 100644 index 7e61bf9..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsByteDelimiterRecordResolverSelfTest.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs.split; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.igfs.mapreduce.records.*; -import org.apache.ignite.internal.util.typedef.*; - -/** - * Byte delimiter split resolver self test. - */ -public class IgfsByteDelimiterRecordResolverSelfTest extends IgfsAbstractRecordResolverSelfTest { - /** - * Test split resolution when there are no delimiters in the file. - * - * @throws Exception If failed. - */ - public void testNoDelimiters() throws Exception { - byte[] delim = wrap(2); - byte[] data = array(F.t(wrap(1), 8)); - - assertSplit(0, 4, 0, 8, data, delim); - assertSplit(0, 8, 0, 8, data, delim); - - assertSplitNull(2, 2, data, delim); - assertSplitNull(2, 6, data, delim); - } - - /** - * Test split resolution when there is one delimiter at the head. - * - * @throws Exception If failed. - */ - public void testHeadDelimiter() throws Exception { - byte[] delim = array(F.t(wrap(2), 8)); - byte[] data = array(F.t(delim, 1), F.t(wrap(1), 8)); - - assertSplit(0, 4, 0, 8, data, delim); - assertSplit(0, 8, 0, 8, data, delim); - assertSplit(0, 12, 0, 16, data, delim); - assertSplit(0, 16, 0, 16, data, delim); - - assertSplitNull(2, 2, data, delim); - assertSplitNull(2, 6, data, delim); - assertSplit(2, 10, 8, 8, data, delim); - assertSplit(2, 14, 8, 8, data, delim); - - assertSplit(8, 4, 8, 8, data, delim); - assertSplit(8, 8, 8, 8, data, delim); - - assertSplitNull(10, 2, data, delim); - assertSplitNull(10, 6, data, delim); - } - - /** - * Test split when there is one delimiter at the end. - * - * @throws Exception If failed. - */ - public void testEndDelimiter() throws Exception { - byte[] delim = array(F.t(wrap(2), 8)); - byte[] data = array(F.t(wrap(1), 8), F.t(delim, 1)); - - assertSplit(0, 4, 0, 16, data, delim); - assertSplit(0, 8, 0, 16, data, delim); - assertSplit(0, 12, 0, 16, data, delim); - assertSplit(0, 16, 0, 16, data, delim); - - assertSplitNull(2, 2, data, delim); - assertSplitNull(2, 6, data, delim); - assertSplitNull(2, 10, data, delim); - assertSplitNull(2, 14, data, delim); - - assertSplitNull(8, 4, data, delim); - assertSplitNull(8, 8, data, delim); - - assertSplitNull(10, 2, data, delim); - assertSplitNull(10, 6, data, delim); - } - - /** - * Test split when there is one delimiter in the middle. - * - * @throws Exception If failed. - */ - public void testMiddleDelimiter() throws Exception { - byte[] delim = array(F.t(wrap(2), 8)); - byte[] data = array(F.t(wrap(1), 8), F.t(delim, 1), F.t(wrap(1), 8)); - - assertSplit(0, 4, 0, 16, data, delim); - assertSplit(0, 8, 0, 16, data, delim); - assertSplit(0, 12, 0, 16, data, delim); - assertSplit(0, 16, 0, 16, data, delim); - assertSplit(0, 20, 0, 24, data, delim); - assertSplit(0, 24, 0, 24, data, delim); - - assertSplitNull(2, 2, data, delim); - assertSplitNull(2, 6, data, delim); - assertSplitNull(2, 10, data, delim); - assertSplitNull(2, 14, data, delim); - assertSplit(2, 18, 16, 8, data, delim); - assertSplit(2, 22, 16, 8, data, delim); - - assertSplitNull(8, 4, data, delim); - assertSplitNull(8, 8, data, delim); - assertSplit(8, 12, 16, 8, data, delim); - assertSplit(8, 16, 16, 8, data, delim); - - assertSplitNull(10, 2, data, delim); - assertSplitNull(10, 6, data, delim); - assertSplit(10, 10, 16, 8, data, delim); - assertSplit(10, 14, 16, 8, data, delim); - - assertSplit(16, 4, 16, 8, data, delim); - assertSplit(16, 8, 16, 8, data, delim); - - assertSplitNull(18, 2, data, delim); - assertSplitNull(18, 6, data, delim); - } - - /** - * Test split when there are two head delimiters. - * - * @throws Exception If failed. - */ - public void testTwoHeadDelimiters() throws Exception { - byte[] delim = array(F.t(wrap(2), 8)); - byte[] data = array(F.t(delim, 2), F.t(wrap(1), 8)); - - assertSplit(0, 4, 0, 8, data, delim); - assertSplit(0, 8, 0, 8, data, delim); - assertSplit(0, 12, 0, 16, data, delim); - assertSplit(0, 16, 0, 16, data, delim); - assertSplit(0, 20, 0, 24, data, delim); - assertSplit(0, 24, 0, 24, data, delim); - - assertSplitNull(2, 2, data, delim); - assertSplitNull(2, 6, data, delim); - assertSplit(2, 10, 8, 8, data, delim); - assertSplit(2, 14, 8, 8, data, delim); - assertSplit(2, 18, 8, 16, data, delim); - assertSplit(2, 22, 8, 16, data, delim); - - assertSplit(8, 4, 8, 8, data, delim); - assertSplit(8, 8, 8, 8, data, delim); - assertSplit(8, 12, 8, 16, data, delim); - assertSplit(8, 16, 8, 16, data, delim); - - assertSplitNull(10, 2, data, delim); - assertSplitNull(10, 6, data, delim); - assertSplit(10, 10, 16, 8, data, delim); - assertSplit(10, 14, 16, 8, data, delim); - - assertSplit(16, 4, 16, 8, data, delim); - assertSplit(16, 8, 16, 8, data, delim); - - assertSplitNull(18, 2, data, delim); - assertSplitNull(18, 6, data, delim); - } - - /** - * Test split when there are two tail delimiters. - * - * @throws Exception If failed. - */ - public void testTwoTailDelimiters() throws Exception { - byte[] delim = array(F.t(wrap(2), 8)); - byte[] data = array(F.t(wrap(1), 8), F.t(delim, 2)); - - assertSplit(0, 4, 0, 16, data, delim); - assertSplit(0, 8, 0, 16, data, delim); - assertSplit(0, 12, 0, 16, data, delim); - assertSplit(0, 16, 0, 16, data, delim); - assertSplit(0, 20, 0, 24, data, delim); - assertSplit(0, 24, 0, 24, data, delim); - - assertSplitNull(2, 2, data, delim); - assertSplitNull(2, 6, data, delim); - assertSplitNull(2, 10, data, delim); - assertSplitNull(2, 14, data, delim); - assertSplit(2, 18, 16, 8, data, delim); - assertSplit(2, 22, 16, 8, data, delim); - - assertSplitNull(8, 4, data, delim); - assertSplitNull(8, 8, data, delim); - assertSplit(8, 12, 16, 8, data, delim); - assertSplit(8, 16, 16, 8, data, delim); - - assertSplitNull(10, 2, data, delim); - assertSplitNull(10, 6, data, delim); - assertSplit(10, 10, 16, 8, data, delim); - assertSplit(10, 14, 16, 8, data, delim); - - assertSplit(16, 4, 16, 8, data, delim); - assertSplit(16, 8, 16, 8, data, delim); - - assertSplitNull(18, 2, data, delim); - assertSplitNull(18, 6, data, delim); - } - - /** - * Test split when there is one head delimiter, one tail delimiter and some data between them. - * - * @throws Exception If failed. - */ - public void testHeadAndTailDelimiters() throws Exception { - byte[] delim = array(F.t(wrap(2), 8)); - byte[] data = array(F.t(delim, 1), F.t(wrap(1), 8), F.t(delim, 1)); - - assertSplit(0, 4, 0, 8, data, delim); - assertSplit(0, 8, 0, 8, data, delim); - assertSplit(0, 12, 0, 24, data, delim); - assertSplit(0, 16, 0, 24, data, delim); - assertSplit(0, 20, 0, 24, data, delim); - assertSplit(0, 24, 0, 24, data, delim); - - assertSplitNull(2, 2, data, delim); - assertSplitNull(2, 6, data, delim); - assertSplit(2, 10, 8, 16, data, delim); - assertSplit(2, 14, 8, 16, data, delim); - assertSplit(2, 18, 8, 16, data, delim); - assertSplit(2, 22, 8, 16, data, delim); - - assertSplit(8, 4, 8, 16, data, delim); - assertSplit(8, 8, 8, 16, data, delim); - assertSplit(8, 12, 8, 16, data, delim); - assertSplit(8, 16, 8, 16, data, delim); - - assertSplitNull(10, 2, data, delim); - assertSplitNull(10, 6, data, delim); - assertSplitNull(10, 10, data, delim); - assertSplitNull(10, 14, data, delim); - - assertSplitNull(16, 4, data, delim); - assertSplitNull(16, 8, data, delim); - - assertSplitNull(18, 2, data, delim); - assertSplitNull(18, 6, data, delim); - } - - /** - * Test special case when delimiter starts with the same bytes as the last previos data byte. - * - * @throws Exception If failed. - */ - public void testDelimiterStartsWithTheSameBytesAsLastPreviousDataByte() throws Exception { - byte[] delim = array(F.t(wrap(1, 1, 2), 1)); - byte[] data = array(F.t(wrap(1), 1), F.t(delim, 1), F.t(wrap(1), 1)); - - assertSplit(0, 1, 0, 4, data, delim); - assertSplit(0, 2, 0, 4, data, delim); - assertSplit(0, 4, 0, 4, data, delim); - assertSplit(0, 5, 0, 5, data, delim); - - assertSplit(1, 4, 4, 1, data, delim); - } - - /** - * Check split resolution. - * - * @param suggestedStart Suggested start. - * @param suggestedLen Suggested length. - * @param expStart Expected start. - * @param expLen Expected length. - * @param data File data. - * @param delims Delimiters. - * @throws Exception If failed. - */ - public void assertSplit(long suggestedStart, long suggestedLen, long expStart, long expLen, byte[] data, - byte[]... delims) throws Exception { - write(data); - - IgfsByteDelimiterRecordResolver rslvr = resolver(delims); - - IgfsFileRange split; - - try (IgfsInputStream is = read()) { - split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); - } - - assert split != null : "Split is null."; - assert split.start() == expStart : "Incorrect start [expected=" + expStart + ", actual=" + split.start() + ']'; - assert split.length() == expLen : "Incorrect length [expected=" + expLen + ", actual=" + split.length() + ']'; - } - - /** - * Check the split resolution resulted in {@code null}. - * - * @param suggestedStart Suggested start. - * @param suggestedLen Suggested length. - * @param data File data. - * @param delims Delimiters. - * @throws Exception If failed. - */ - public void assertSplitNull(long suggestedStart, long suggestedLen, byte[] data, byte[]... delims) - throws Exception { - write(data); - - IgfsByteDelimiterRecordResolver rslvr = resolver(delims); - - IgfsFileRange split; - - try (IgfsInputStream is = read()) { - split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); - } - - assert split == null : "Split is not null."; - } - - /** - * Create resolver. - * - * @param delims Delimiters. - * @return Resolver. - */ - private IgfsByteDelimiterRecordResolver resolver(byte[]... delims) { - return new IgfsByteDelimiterRecordResolver(delims); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsFixedLengthRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsFixedLengthRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsFixedLengthRecordResolverSelfTest.java deleted file mode 100644 index cd6b267..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/IgfsFixedLengthRecordResolverSelfTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs.split; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.igfs.mapreduce.records.*; -import org.apache.ignite.internal.util.typedef.*; - -/** - * Fixed length split resolver self test. - */ -public class IgfsFixedLengthRecordResolverSelfTest extends IgfsAbstractRecordResolverSelfTest { - /** - * Test split resolver. - * - * @throws Exception If failed. - */ - public void testResolver() throws Exception { - byte[] data = array(F.t(wrap(1), 24)); - - assertSplit(0, 4, 0, 8, data, 8); - assertSplit(0, 8, 0, 8, data, 8); - assertSplit(0, 12, 0, 16, data, 8); - assertSplit(0, 16, 0, 16, data, 8); - assertSplit(0, 20, 0, 24, data, 8); - assertSplit(0, 24, 0, 24, data, 8); - assertSplit(0, 28, 0, 24, data, 8); - assertSplit(0, 32, 0, 24, data, 8); - - assertSplitNull(2, 2, data, 8); - assertSplitNull(2, 6, data, 8); - assertSplit(2, 10, 8, 8, data, 8); - assertSplit(2, 14, 8, 8, data, 8); - assertSplit(2, 18, 8, 16, data, 8); - assertSplit(2, 22, 8, 16, data, 8); - assertSplit(2, 26, 8, 16, data, 8); - assertSplit(2, 30, 8, 16, data, 8); - - assertSplit(8, 4, 8, 8, data, 8); - assertSplit(8, 8, 8, 8, data, 8); - assertSplit(8, 12, 8, 16, data, 8); - assertSplit(8, 16, 8, 16, data, 8); - assertSplit(8, 20, 8, 16, data, 8); - assertSplit(8, 24, 8, 16, data, 8); - - assertSplitNull(10, 2, data, 8); - assertSplitNull(10, 6, data, 8); - assertSplit(10, 10, 16, 8, data, 8); - assertSplit(10, 14, 16, 8, data, 8); - assertSplit(10, 18, 16, 8, data, 8); - assertSplit(10, 22, 16, 8, data, 8); - - assertSplit(16, 4, 16, 8, data, 8); - assertSplit(16, 8, 16, 8, data, 8); - assertSplit(16, 12, 16, 8, data, 8); - assertSplit(16, 16, 16, 8, data, 8); - - assertSplitNull(18, 2, data, 8); - assertSplitNull(18, 6, data, 8); - assertSplitNull(18, 10, data, 8); - assertSplitNull(18, 14, data, 8); - - assertSplitNull(24, 4, data, 8); - assertSplitNull(24, 8, data, 8); - - assertSplitNull(26, 2, data, 8); - assertSplitNull(26, 6, data, 8); - } - - /** - * Check split resolution. - * - * @param suggestedStart Suggested start. - * @param suggestedLen Suggested length. - * @param expStart Expected start. - * @param expLen Expected length. - * @param data File data. - * @param len Length. - * @throws Exception If failed. - */ - public void assertSplit(long suggestedStart, long suggestedLen, long expStart, long expLen, byte[] data, int len) - throws Exception { - write(data); - - IgfsFixedLengthRecordResolver rslvr = resolver(len); - - IgfsFileRange split; - - try (IgfsInputStream is = read()) { - split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); - } - - assert split != null : "Split is null."; - assert split.start() == expStart : "Incorrect start [expected=" + expStart + ", actual=" + split.start() + ']'; - assert split.length() == expLen : "Incorrect length [expected=" + expLen + ", actual=" + split.length() + ']'; - } - - /** - * Check the split resolution resulted in {@code null}. - * - * @param suggestedStart Suggested start. - * @param suggestedLen Suggested length. - * @param data File data. - * @param len Length. - * @throws Exception If failed. - */ - public void assertSplitNull(long suggestedStart, long suggestedLen, byte[] data, int len) - throws Exception { - write(data); - - IgfsFixedLengthRecordResolver rslvr = resolver(len); - - IgfsFileRange split; - - try (IgfsInputStream is = read()) { - split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); - } - - assert split == null : "Split is not null."; - } - - /** - * Create resolver. - * - * @param len Length. - * @return Resolver. - */ - private IgfsFixedLengthRecordResolver resolver(int len) { - return new IgfsFixedLengthRecordResolver(len); - } -}
