http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsSizeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsSizeSelfTest.java new file mode 100644 index 0000000..4f59961 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsSizeSelfTest.java @@ -0,0 +1,874 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.jdk8.backport.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * {@link org.apache.ignite.internal.processors.fs.GridGgfsAttributes} test case. + */ +public class GridGgfsSizeSelfTest extends GridGgfsCommonAbstractTest { + /** How many grids to start. */ + private static final int GRID_CNT = 3; + + /** How many files to save. */ + private static final int FILES_CNT = 10; + + /** Maximum amount of bytes that could be written to particular file. */ + private static final int MAX_FILE_SIZE = 1024 * 10; + + /** Block size. */ + private static final int BLOCK_SIZE = 384; + + /** Cache name. */ + private static final String DATA_CACHE_NAME = "dataCache"; + + /** Cache name. */ + private static final String META_CACHE_NAME = "metaCache"; + + /** GGFS name. */ + private static final String GGFS_NAME = "ggfs"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** GGFS management port */ + private static int mgmtPort; + + /** Data cache mode. */ + private GridCacheMode cacheMode; + + /** Whether near cache is enabled (applicable for PARTITIONED cache only). */ + private boolean nearEnabled; + + /** GGFS maximum space. */ + private long ggfsMaxData; + + /** Trash purge timeout. */ + private long trashPurgeTimeout; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cacheMode = null; + nearEnabled = false; + ggfsMaxData = 0; + trashPurgeTimeout = 0; + + mgmtPort = 11400; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + G.stopAll(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName(DATA_CACHE_NAME); + ggfsCfg.setMetaCacheName(META_CACHE_NAME); + ggfsCfg.setName(GGFS_NAME); + ggfsCfg.setBlockSize(BLOCK_SIZE); + ggfsCfg.setFragmentizerEnabled(false); + ggfsCfg.setMaxSpaceSize(ggfsMaxData); + ggfsCfg.setTrashPurgeTimeout(trashPurgeTimeout); + ggfsCfg.setManagementPort(++mgmtPort); + + CacheConfiguration dataCfg = defaultCacheConfiguration(); + + dataCfg.setName(DATA_CACHE_NAME); + dataCfg.setCacheMode(cacheMode); + + if (cacheMode == PARTITIONED) { + dataCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + dataCfg.setBackups(0); + } + + dataCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + dataCfg.setPreloadMode(SYNC); + dataCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(128)); + dataCfg.setQueryIndexEnabled(false); + dataCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCfg = defaultCacheConfiguration(); + + metaCfg.setName(META_CACHE_NAME); + metaCfg.setCacheMode(REPLICATED); + + metaCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + metaCfg.setPreloadMode(SYNC); + metaCfg.setQueryIndexEnabled(false); + metaCfg.setAtomicityMode(TRANSACTIONAL); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCfg, dataCfg); + cfg.setGgfsConfiguration(ggfsCfg); + + return cfg; + } + + /** + * Perform initial startup. + * + * @throws Exception If failed. + */ + private void startUp() throws Exception { + startGrids(GRID_CNT); + } + + /** + * Ensure that PARTITIONED cache is correctly initialized. + * + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = true; + + check(); + } + + /** + * Ensure that co-located cache is correctly initialized. + * + * @throws Exception If failed. + */ + public void testColocated() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = false; + + check(); + } + + /** + * Ensure that REPLICATED cache is correctly initialized. + * + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + cacheMode = REPLICATED; + + check(); + } + + /** + * Ensure that exception is thrown in case PARTITIONED cache is oversized. + * + * @throws Exception If failed. + */ + public void testPartitionedOversize() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = true; + + checkOversize(); + } + + /** + * Ensure that exception is thrown in case co-located cache is oversized. + * + * @throws Exception If failed. + */ + public void testColocatedOversize() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = false; + + check(); + } + + /** + * Ensure that exception is thrown in case REPLICATED cache is oversized. + * + * @throws Exception If failed. + */ + public void testReplicatedOversize() throws Exception { + cacheMode = REPLICATED; + + check(); + } + + /** + * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently. + * + * @throws Exception If failed. + */ + public void testPartitionedOversizeDelay() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = true; + + checkOversizeDelay(); + } + + /** + * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently. + * + * @throws Exception If failed. + */ + public void testColocatedOversizeDelay() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = false; + + checkOversizeDelay(); + } + + /** + * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently. + * + * @throws Exception If failed. + */ + public void testReplicatedOversizeDelay() throws Exception { + cacheMode = REPLICATED; + + checkOversizeDelay(); + } + + /** + * Ensure that GGFS size is correctly updated in case of preloading for PARTITIONED cache. + * + * @throws Exception If failed. + */ + public void testPartitionedPreload() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = true; + + checkPreload(); + } + + /** + * Ensure that GGFS size is correctly updated in case of preloading for co-located cache. + * + * @throws Exception If failed. + */ + public void testColocatedPreload() throws Exception { + cacheMode = PARTITIONED; + nearEnabled = false; + + checkPreload(); + } + + /** + * Ensure that GGFS cache size is calculated correctly. + * + * @throws Exception If failed. + */ + private void check() throws Exception { + startUp(); + + // Ensure that cache was marked as GGFS data cache. + for (int i = 0; i < GRID_CNT; i++) { + GridEx g = grid(i); + + GridCacheProjectionEx cache = (GridCacheProjectionEx)g.cachex(DATA_CACHE_NAME).cache(); + + assert cache.isGgfsDataCache(); + } + + // Perform writes. + Collection<GgfsFile> files = write(); + + // Check sizes. + Map<UUID, Integer> expSizes = new HashMap<>(GRID_CNT, 1.0f); + + for (GgfsFile file : files) { + for (GgfsBlock block : file.blocks()) { + Collection<UUID> ids = primaryOrBackups(block.key()); + + for (UUID id : ids) { + if (expSizes.get(id) == null) + expSizes.put(id, block.length()); + else + expSizes.put(id, expSizes.get(id) + block.length()); + } + } + } + + for (int i = 0; i < GRID_CNT; i++) { + UUID id = grid(i).localNode().id(); + + GridCacheAdapter<GridGgfsBlockKey, byte[]> cache = cache(id); + + int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; + + assert expSize == cache.ggfsDataSpaceUsed(); + } + + // Perform reads which could potentially be non-local. + byte[] buf = new byte[BLOCK_SIZE]; + + for (GgfsFile file : files) { + for (int i = 0; i < GRID_CNT; i++) { + int total = 0; + + IgniteFsInputStream is = ggfs(i).open(file.path()); + + while (true) { + int read = is.read(buf); + + if (read == -1) + break; + else + total += read; + } + + assert total == file.length() : "Not enough bytes read: [expected=" + file.length() + ", actual=" + + total + ']'; + + is.close(); + } + } + + // Check sizes after read. + if (cacheMode == PARTITIONED) { + // No changes since the previous check for co-located cache. + for (int i = 0; i < GRID_CNT; i++) { + UUID id = grid(i).localNode().id(); + + GridCacheAdapter<GridGgfsBlockKey, byte[]> cache = cache(id); + + int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; + + assert expSize == cache.ggfsDataSpaceUsed(); + } + } + else { + // All data must exist on each cache. + int totalSize = 0; + + for (GgfsFile file : files) + totalSize += file.length(); + + for (int i = 0; i < GRID_CNT; i++) { + UUID id = grid(i).localNode().id(); + + GridCacheAdapter<GridGgfsBlockKey, byte[]> cache = cache(id); + + assertEquals(totalSize, cache.ggfsDataSpaceUsed()); + } + } + + // Delete data and ensure that all counters are 0 now. + for (GgfsFile file : files) { + ggfs(0).delete(file.path(), false); + + // Await for actual delete to occur. + for (GgfsBlock block : file.blocks()) { + for (int i = 0; i < GRID_CNT; i++) { + while (cache(grid(i).localNode().id()).peek(block.key()) != null) + U.sleep(100); + } + } + } + + for (int i = 0; i < GRID_CNT; i++) { + GridCacheAdapter<GridGgfsBlockKey, byte[]> cache = cache(grid(i).localNode().id()); + + assert 0 == cache.ggfsDataSpaceUsed() : "Size counter is not 0: " + cache.ggfsDataSpaceUsed(); + } + } + + /** + * Ensure that an exception is thrown in case of GGFS oversize. + * + * @throws Exception If failed. + */ + private void checkOversize() throws Exception { + ggfsMaxData = BLOCK_SIZE; + + startUp(); + + final IgniteFsPath path = new IgniteFsPath("/file"); + + // This write is expected to be successful. + IgniteFsOutputStream os = ggfs(0).create(path, false); + os.write(chunk(BLOCK_SIZE - 1)); + os.close(); + + // This write must be successful as well. + os = ggfs(0).append(path, false); + os.write(chunk(1)); + os.close(); + + // This write must fail w/ exception. + GridTestUtils.assertThrows(log(), new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteFsOutputStream osErr = ggfs(0).append(path, false); + + try { + osErr.write(chunk(BLOCK_SIZE)); + osErr.close(); + + return null; + } + catch (IOException e) { + Throwable e0 = e; + + while (e0.getCause() != null) + e0 = e0.getCause(); + + throw (Exception)e0; + } + finally { + U.closeQuiet(osErr); + } + } + }, IgniteFsOutOfSpaceException.class, "Failed to write data block (GGFS maximum data size exceeded) [used=" + + ggfsMaxData + ", allowed=" + ggfsMaxData + ']'); + } + + /** + * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory. + * + * @throws Exception If failed. + */ + private void checkOversizeDelay() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + ggfsMaxData = 256; + trashPurgeTimeout = 2000; + + startUp(); + + GridGgfsImpl ggfs = ggfs(0); + + final IgniteFsPath path = new IgniteFsPath("/file"); + final IgniteFsPath otherPath = new IgniteFsPath("/fileOther"); + + // Fill cache with data up to it's limit. + IgniteFsOutputStream os = ggfs.create(path, false); + os.write(chunk((int)ggfsMaxData)); + os.close(); + + final GridCache<IgniteUuid, GridGgfsFileInfo> metaCache = ggfs.context().kernalContext().cache().cache( + ggfs.configuration().getMetaCacheName()); + + // Start a transaction in a separate thread which will lock file ID. + final IgniteUuid id = ggfs.context().meta().fileId(path); + final GridGgfsFileInfo info = ggfs.context().meta().info(id); + + final AtomicReference<Throwable> err = new AtomicReference<>(); + + try { + new Thread(new Runnable() { + @Override public void run() { + try { + + try (IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + metaCache.get(id); + + latch.await(); + + U.sleep(1000); // Sleep here so that data manager could "see" oversize. + + tx.commit(); + } + } + catch (Throwable e) { + err.set(e); + } + } + }).start(); + + // Now add file ID to trash listing so that delete worker could "see" it. + + try (IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Map<String, GridGgfsListingEntry> listing = Collections.singletonMap(path.name(), + new GridGgfsListingEntry(info)); + + // Clear root listing. + metaCache.put(ROOT_ID, new GridGgfsFileInfo(ROOT_ID)); + + // Add file to trash listing. + GridGgfsFileInfo trashInfo = metaCache.get(TRASH_ID); + + if (trashInfo == null) + metaCache.put(TRASH_ID, new GridGgfsFileInfo(listing, new GridGgfsFileInfo(TRASH_ID))); + else + metaCache.put(TRASH_ID, new GridGgfsFileInfo(listing, trashInfo)); + + tx.commit(); + } + + assert metaCache.get(TRASH_ID) != null; + + // Now the file is locked and is located in trash, try adding some more data. + os = ggfs.create(otherPath, false); + os.write(new byte[1]); + + latch.countDown(); + + os.close(); + + assert err.get() == null; + } + finally { + latch.countDown(); // Safety. + } + } + + /** + * Ensure that GGFS size is correctly updated in case of preloading. + * + * @throws Exception If failed. + */ + private void checkPreload() throws Exception { + assert cacheMode == PARTITIONED; + + startUp(); + + // Perform writes. + Collection<GgfsFile> files = write(); + + // Check sizes. + Map<UUID, Integer> expSizes = new HashMap<>(GRID_CNT, 1.0f); + + for (GgfsFile file : files) { + for (GgfsBlock block : file.blocks()) { + Collection<UUID> ids = primaryOrBackups(block.key()); + + for (UUID id : ids) { + if (expSizes.get(id) == null) + expSizes.put(id, block.length()); + else + expSizes.put(id, expSizes.get(id) + block.length()); + } + } + } + + info("Size map before node start: " + expSizes); + + for (int i = 0; i < GRID_CNT; i++) { + UUID id = grid(i).localNode().id(); + + GridCacheAdapter<GridGgfsBlockKey, byte[]> cache = cache(id); + + int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; + + assertEquals(expSize, cache.ggfsDataSpaceUsed()); + } + + // Start a node. + final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1); + + for (int i = 0; i < GRID_CNT - 1; i++) { + grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + latch.countDown(); + + return true; + } + }, EVT_CACHE_PRELOAD_STOPPED); + } + + Ignite g = startGrid(GRID_CNT); + + info("Started grid: " + g.cluster().localNode().id()); + + U.awaitQuiet(latch); + + // Wait partitions are evicted. + awaitPartitionMapExchange(); + + // Check sizes again. + expSizes.clear(); + + for (GgfsFile file : files) { + for (GgfsBlock block : file.blocks()) { + Collection<UUID> ids = primaryOrBackups(block.key()); + + assert !ids.isEmpty(); + + for (UUID id : ids) { + if (expSizes.get(id) == null) + expSizes.put(id, block.length()); + else + expSizes.put(id, expSizes.get(id) + block.length()); + } + } + } + + info("Size map after node start: " + expSizes); + + for (int i = 0; i < GRID_CNT - 1; i++) { + UUID id = grid(i).localNode().id(); + + GridCacheAdapter<GridGgfsBlockKey, byte[]> cache = cache(id); + + int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0; + + assertEquals("For node: " + id, expSize, cache.ggfsDataSpaceUsed()); + } + } + + /** + * Create data chunk of the given length. + * + * @param len Length. + * @return Data chunk. + */ + private byte[] chunk(int len) { + byte[] chunk = new byte[len]; + + for (int i = 0; i < len; i++) + chunk[i] = (byte)i; + + return chunk; + } + + /** + * Create block key. + * + * @param path Path. + * @param blockId Block ID. + * @return Block key. + * @throws Exception If failed. + */ + private GridGgfsBlockKey blockKey(IgniteFsPath path, long blockId) throws Exception { + GridGgfsEx ggfs0 = (GridGgfsEx)grid(0).fileSystem(GGFS_NAME); + + IgniteUuid fileId = ggfs0.context().meta().fileId(path); + + return new GridGgfsBlockKey(fileId, null, true, blockId); + } + + /** + * Determine primary node for the given block key. + * + * @param key Block key. + * @return Node ID. + */ + private UUID primary(GridGgfsBlockKey key) { + GridEx grid = grid(0); + + for (ClusterNode node : grid.nodes()) { + if (grid.cachex(DATA_CACHE_NAME).affinity().isPrimary(node, key)) + return node.id(); + } + + return null; + } + + /** + * Determine primary and backup node IDs for the given block key. + * + * @param key Block key. + * @return Collection of node IDs. + */ + private Collection<UUID> primaryOrBackups(GridGgfsBlockKey key) { + GridEx grid = grid(0); + + Collection<UUID> ids = new HashSet<>(); + + for (ClusterNode node : grid.nodes()) { + if (grid.cachex(DATA_CACHE_NAME).affinity().isPrimaryOrBackup(node, key)) + ids.add(node.id()); + } + + return ids; + } + + /** + * Get GGfs of a node with the given index. + * + * @param idx Node index. + * @return GGFS. + * @throws Exception If failed. + */ + private GridGgfsImpl ggfs(int idx) throws Exception { + return (GridGgfsImpl)grid(idx).fileSystem(GGFS_NAME); + } + + /** + * Get GGfs of the given node. + * + * @param ignite Node; + * @return GGFS. + * @throws Exception If failed. + */ + private GridGgfsImpl ggfs(Ignite ignite) throws Exception { + return (GridGgfsImpl) ignite.fileSystem(GGFS_NAME); + } + + /** + * Get data cache for the given node ID. + * + * @param nodeId Node ID. + * @return Data cache. + */ + private GridCacheAdapter<GridGgfsBlockKey, byte[]> cache(UUID nodeId) { + return (GridCacheAdapter<GridGgfsBlockKey, byte[]>)((GridEx)G.ignite(nodeId)).cachex(DATA_CACHE_NAME) + .<GridGgfsBlockKey, byte[]>cache(); + } + + /** + * Perform write of the files. + * + * @return Collection of written file descriptors. + * @throws Exception If failed. + */ + private Collection<GgfsFile> write() throws Exception { + Collection<GgfsFile> res = new HashSet<>(FILES_CNT, 1.0f); + + ThreadLocalRandom8 rand = ThreadLocalRandom8.current(); + + for (int i = 0; i < FILES_CNT; i++) { + // Create empty file locally. + IgniteFsPath path = new IgniteFsPath("/file-" + i); + + ggfs(0).create(path, false).close(); + + GridGgfsMetaManager meta = ggfs(0).context().meta(); + + IgniteUuid fileId = meta.fileId(path); + + // Calculate file blocks. + int fileSize = rand.nextInt(MAX_FILE_SIZE); + + int fullBlocks = fileSize / BLOCK_SIZE; + int remainderSize = fileSize % BLOCK_SIZE; + + Collection<GgfsBlock> blocks = new ArrayList<>(fullBlocks + remainderSize > 0 ? 1 : 0); + + for (int j = 0; j < fullBlocks; j++) + blocks.add(new GgfsBlock(new GridGgfsBlockKey(fileId, null, true, j), BLOCK_SIZE)); + + if (remainderSize > 0) + blocks.add(new GgfsBlock(new GridGgfsBlockKey(fileId, null, true, fullBlocks), remainderSize)); + + GgfsFile file = new GgfsFile(path, fileSize, blocks); + + // Actual write. + for (GgfsBlock block : blocks) { + IgniteFsOutputStream os = ggfs(0).append(path, false); + + os.write(chunk(block.length())); + + os.close(); + } + + // Add written file to the result set. + res.add(file); + } + + return res; + } + + /** A file written to the file system. */ + private static class GgfsFile { + /** Path to the file, */ + private final IgniteFsPath path; + + /** File length. */ + private final int len; + + /** Blocks with their corresponding locations. */ + private final Collection<GgfsBlock> blocks; + + /** + * Constructor. + * + * @param path Path. + * @param len Length. + * @param blocks Blocks. + */ + private GgfsFile(IgniteFsPath path, int len, Collection<GgfsBlock> blocks) { + this.path = path; + this.len = len; + this.blocks = blocks; + } + + /** @return Path. */ + IgniteFsPath path() { + return path; + } + + /** @return Length. */ + int length() { + return len; + } + + /** @return Blocks. */ + Collection<GgfsBlock> blocks() { + return blocks; + } + } + + /** Block written to the file system. */ + private static class GgfsBlock { + /** Block key. */ + private final GridGgfsBlockKey key; + + /** Block length. */ + private final int len; + + /** + * Constructor. + * + * @param key Block key. + * @param len Block length. + */ + private GgfsBlock(GridGgfsBlockKey key, int len) { + this.key = key; + this.len = len; + } + + /** @return Block key. */ + private GridGgfsBlockKey key() { + return key; + } + + /** @return Block length. */ + private int length() { + return len; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsStreamsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsStreamsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsStreamsSelfTest.java new file mode 100644 index 0000000..869eb8c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsStreamsSelfTest.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.gridgain.testframework.GridTestUtils.*; + +/** + * Tests for GGFS streams content. + */ +public class GridGgfsStreamsSelfTest extends GridGgfsCommonAbstractTest { + /** Test IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Meta-information cache name. */ + private static final String META_CACHE_NAME = "replicated"; + + /** Data cache name. */ + public static final String DATA_CACHE_NAME = "data"; + + /** Group size. */ + public static final int CFG_GRP_SIZE = 128; + + /** Pre-configured block size. */ + private static final int CFG_BLOCK_SIZE = 64000; + + /** Number of threads to test parallel readings. */ + private static final int WRITING_THREADS_CNT = 5; + + /** Number of threads to test parallel readings. */ + private static final int READING_THREADS_CNT = 5; + + /** Test nodes count. */ + private static final int NODES_CNT = 4; + + /** Number of retries for async ops. */ + public static final int ASSERT_RETRIES = 100; + + /** Delay between checks for async ops. */ + public static final int ASSERT_RETRY_INTERVAL = 100; + + /** File system to test. */ + private IgniteFs fs; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + if (NODES_CNT <= 0) + return; + + // Initialize FS. + fs = grid(0).fileSystem("ggfs"); + + // Cleanup FS. + fs.format(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME)); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setMetaCacheName(META_CACHE_NAME); + ggfsCfg.setDataCacheName(DATA_CACHE_NAME); + ggfsCfg.setName("ggfs"); + ggfsCfg.setBlockSize(CFG_BLOCK_SIZE); + ggfsCfg.setFragmentizerEnabled(true); + + cfg.setGgfsConfiguration(ggfsCfg); + + return cfg; + } + + /** {@inheritDoc} */ + protected CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(cacheName); + + if (META_CACHE_NAME.equals(cacheName)) + cacheCfg.setCacheMode(REPLICATED); + else { + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + + cacheCfg.setBackups(0); + cacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(CFG_GRP_SIZE)); + } + + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setQueryIndexEnabled(false); + + return cacheCfg; + } + + /** + * Test GGFS construction. + * + * @throws IgniteCheckedException In case of exception. + */ + public void testConfiguration() throws IgniteCheckedException { + GridCache metaCache = getFieldValue(fs, "meta", "metaCache"); + GridCache dataCache = getFieldValue(fs, "data", "dataCache"); + + assertNotNull(metaCache); + assertEquals(META_CACHE_NAME, metaCache.name()); + assertEquals(REPLICATED, metaCache.configuration().getCacheMode()); + + assertNotNull(dataCache); + assertEquals(DATA_CACHE_NAME, dataCache.name()); + assertEquals(PARTITIONED, dataCache.configuration().getCacheMode()); + } + + /** + * Test file creation. + * + * @throws Exception In case of exception. + */ + public void testCreateFile() throws Exception { + IgniteFsPath root = new IgniteFsPath("/"); + IgniteFsPath path = new IgniteFsPath("/asdf"); + + long max = 100L * CFG_BLOCK_SIZE / WRITING_THREADS_CNT; + + for (long size = 0; size <= max; size = size * 15 / 10 + 1) { + assertEquals(Collections.<IgniteFsPath>emptyList(), fs.listPaths(root)); + + testCreateFile(path, size, new Random().nextInt()); + } + } + + /** @throws Exception If failed. */ + public void testCreateFileColocated() throws Exception { + IgniteFsPath path = new IgniteFsPath("/colocated"); + + UUID uuid = UUID.randomUUID(); + + IgniteUuid affKey; + + long idx = 0; + + while (true) { + affKey = new IgniteUuid(uuid, idx); + + if (grid(0).mapKeyToNode(DATA_CACHE_NAME, affKey).id().equals(grid(0).localNode().id())) + break; + + idx++; + } + + try (IgniteFsOutputStream out = fs.create(path, 1024, true, affKey, 0, 1024, null)) { + // Write 5M, should be enough to test distribution. + for (int i = 0; i < 15; i++) + out.write(new byte[1024 * 1024]); + } + + IgniteFsFile info = fs.info(path); + + Collection<IgniteFsBlockLocation> affNodes = fs.affinity(path, 0, info.length()); + + assertEquals(1, affNodes.size()); + Collection<UUID> nodeIds = F.first(affNodes).nodeIds(); + + assertEquals(1, nodeIds.size()); + assertEquals(grid(0).localNode().id(), F.first(nodeIds)); + } + + /** @throws Exception If failed. */ + public void testCreateFileFragmented() throws Exception { + GridGgfsEx impl = (GridGgfsEx)grid(0).fileSystem("ggfs"); + + GridGgfsFragmentizerManager fragmentizer = impl.context().fragmentizer(); + + GridTestUtils.setFieldValue(fragmentizer, "fragmentizerEnabled", false); + + IgniteFsPath path = new IgniteFsPath("/file"); + + try { + IgniteFs fs0 = grid(0).fileSystem("ggfs"); + IgniteFs fs1 = grid(1).fileSystem("ggfs"); + IgniteFs fs2 = grid(2).fileSystem("ggfs"); + + try (IgniteFsOutputStream out = fs0.create(path, 128, false, 1, CFG_GRP_SIZE, + F.asMap(IgniteFs.PROP_PREFER_LOCAL_WRITES, "true"))) { + // 1.5 blocks + byte[] data = new byte[CFG_BLOCK_SIZE * 3 / 2]; + + Arrays.fill(data, (byte)1); + + out.write(data); + } + + try (IgniteFsOutputStream out = fs1.append(path, false)) { + // 1.5 blocks. + byte[] data = new byte[CFG_BLOCK_SIZE * 3 / 2]; + + Arrays.fill(data, (byte)2); + + out.write(data); + } + + // After this we should have first two block colocated with grid 0 and last block colocated with grid 1. + IgniteFsFileImpl fileImpl = (IgniteFsFileImpl)fs.info(path); + + GridCache<Object, Object> metaCache = grid(0).cachex(META_CACHE_NAME); + + GridGgfsFileInfo fileInfo = (GridGgfsFileInfo)metaCache.get(fileImpl.fileId()); + + GridGgfsFileMap map = fileInfo.fileMap(); + + List<GridGgfsFileAffinityRange> ranges = map.ranges(); + + assertEquals(2, ranges.size()); + + assertTrue(ranges.get(0).startOffset() == 0); + assertTrue(ranges.get(0).endOffset() == 2 * CFG_BLOCK_SIZE - 1); + + assertTrue(ranges.get(1).startOffset() == 2 * CFG_BLOCK_SIZE); + assertTrue(ranges.get(1).endOffset() == 3 * CFG_BLOCK_SIZE - 1); + + // Validate data read after colocated writes. + try (IgniteFsInputStream in = fs2.open(path)) { + // Validate first part of file. + for (int i = 0; i < CFG_BLOCK_SIZE * 3 / 2; i++) + assertEquals((byte)1, in.read()); + + // Validate second part of file. + for (int i = 0; i < CFG_BLOCK_SIZE * 3 / 2; i++) + assertEquals((byte)2, in.read()); + + assertEquals(-1, in.read()); + } + } + finally { + GridTestUtils.setFieldValue(fragmentizer, "fragmentizerEnabled", true); + + boolean hasData = false; + + for (int i = 0; i < NODES_CNT; i++) + hasData |= !grid(i).cachex(DATA_CACHE_NAME).isEmpty(); + + assertTrue(hasData); + + fs.delete(path, true); + } + + GridTestUtils.retryAssert(log, ASSERT_RETRIES, ASSERT_RETRY_INTERVAL, new CAX() { + @Override public void applyx() { + for (int i = 0; i < NODES_CNT; i++) + assertTrue(grid(i).cachex(DATA_CACHE_NAME).isEmpty()); + } + }); + } + + /** + * Test file creation. + * + * @param path Path to file to store. + * @param size Size of file to store. + * @param salt Salt for file content generation. + * @throws Exception In case of any exception. + */ + private void testCreateFile(final IgniteFsPath path, final long size, final int salt) throws Exception { + info("Create file [path=" + path + ", size=" + size + ", salt=" + salt + ']'); + + final AtomicInteger cnt = new AtomicInteger(0); + final Collection<IgniteFsPath> cleanUp = new ConcurrentLinkedQueue<>(); + + long time = runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + int id = cnt.incrementAndGet(); + + IgniteFsPath f = new IgniteFsPath(path.parent(), "asdf" + (id > 1 ? "-" + id : "")); + + try (IgniteFsOutputStream out = fs.create(f, 0, true, null, 0, 1024, null)) { + assertNotNull(out); + + cleanUp.add(f); // Add all created into cleanup list. + + U.copy(new GridGgfsTestInputStream(size, salt), out); + } + + return null; + } + }, WRITING_THREADS_CNT, "perform-multi-thread-writing"); + + if (time > 0) { + double rate = size * 1000. / time / 1024 / 1024; + + info(String.format("Write file [path=%s, size=%d kB, rate=%2.1f MB/s]", path, + WRITING_THREADS_CNT * size / 1024, WRITING_THREADS_CNT * rate)); + } + + info("Read and validate saved file: " + path); + + final InputStream expIn = new GridGgfsTestInputStream(size, salt); + final IgniteFsInputStream actIn = fs.open(path, CFG_BLOCK_SIZE * READING_THREADS_CNT * 11 / 10); + + // Validate continuous reading of whole file. + assertEqualStreams(expIn, actIn, size, null); + + // Validate random seek and reading. + final Random rnd = new Random(); + + runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + long skip = Math.abs(rnd.nextLong() % (size + 1)); + long range = Math.min(size - skip, rnd.nextInt(CFG_BLOCK_SIZE * 400)); + + assertEqualStreams(new GridGgfsTestInputStream(size, salt), actIn, range, skip); + + return null; + } + }, READING_THREADS_CNT, "validate-multi-thread-reading"); + + expIn.close(); + actIn.close(); + + info("Get stored file info: " + path); + + IgniteFsFile desc = fs.info(path); + + info("Validate stored file info: " + desc); + + assertNotNull(desc); + + if (log.isDebugEnabled()) + log.debug("File descriptor: " + desc); + + Collection<IgniteFsBlockLocation> aff = fs.affinity(path, 0, desc.length()); + + assertFalse("Affinity: " + aff, desc.length() != 0 && aff.isEmpty()); + + int blockSize = desc.blockSize(); + + assertEquals("File size", size, desc.length()); + assertEquals("Binary block size", CFG_BLOCK_SIZE, blockSize); + //assertEquals("Permission", "rwxr-xr-x", desc.getPermission().toString()); + //assertEquals("Permission sticky bit marks this is file", false, desc.getPermission().getStickyBit()); + assertEquals("Type", true, desc.isFile()); + assertEquals("Type", false, desc.isDirectory()); + + info("Cleanup files: " + cleanUp); + + for (IgniteFsPath f : cleanUp) { + fs.delete(f, true); + assertNull(fs.info(f)); + } + } + + /** + * Validate streams generate the same output. + * + * @param expIn Expected input stream. + * @param actIn Actual input stream. + * @param expSize Expected size of the streams. + * @param seek Seek to use async position-based reading or {@code null} to use simple continuous reading. + * @throws IOException In case of any IO exception. + */ + private void assertEqualStreams(InputStream expIn, IgniteFsInputStream actIn, + @Nullable Long expSize, @Nullable Long seek) throws IOException { + if (seek != null) + expIn.skip(seek); + + int bufSize = 2345; + byte buf1[] = new byte[bufSize]; + byte buf2[] = new byte[bufSize]; + long pos = 0; + + long start = System.currentTimeMillis(); + + while (true) { + int read = (int)Math.min(bufSize, expSize - pos); + + int i1; + + if (seek == null) + i1 = actIn.read(buf1, 0, read); + else if (seek % 2 == 0) + i1 = actIn.read(pos + seek, buf1, 0, read); + else { + i1 = read; + + actIn.readFully(pos + seek, buf1, 0, read); + } + + // Read at least 0 byte, but don't read more then 'i1' or 'read'. + int i2 = expIn.read(buf2, 0, Math.max(0, Math.min(i1, read))); + + if (i1 != i2) { + fail("Expects the same data [read=" + read + ", pos=" + pos + ", seek=" + seek + + ", i1=" + i1 + ", i2=" + i2 + ']'); + } + + if (i1 == -1) + break; // EOF + + // i1 == bufSize => compare buffers. + // i1 < bufSize => Compare part of buffers, rest of buffers are equal from previous iteration. + assertTrue("Expects the same data [read=" + read + ", pos=" + pos + ", seek=" + seek + + ", i1=" + i1 + ", i2=" + i2 + ']', Arrays.equals(buf1, buf2)); + + if (read == 0) + break; // Nothing more to read. + + pos += i1; + } + + if (expSize != null) + assertEquals(expSize.longValue(), pos); + + long time = System.currentTimeMillis() - start; + + if (time != 0 && log.isInfoEnabled()) { + log.info(String.format("Streams were compared in continuous reading " + + "[size=%7d, rate=%3.1f MB/sec]", expSize, expSize * 1000. / time / 1024 / 1024)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTaskSelfTest.java new file mode 100644 index 0000000..a446309 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTaskSelfTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.records.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.fs.IgniteFsMode.*; + +/** + * Tests for {@link org.apache.ignite.fs.mapreduce.IgniteFsTask}. + */ +public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest { + /** Predefined words dictionary. */ + private static final String[] DICTIONARY = new String[] {"word0", "word1", "word2", "word3", "word4", "word5", + "word6", "word7"}; + + /** File path. */ + private static final IgniteFsPath FILE = new IgniteFsPath("/file"); + + /** Shared IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Block size: 64 Kb. */ + private static final int BLOCK_SIZE = 64 * 1024; + + /** Total words in file. */ + private static final int TOTAL_WORDS = 2 * 1024 * 1024; + + /** Node count */ + private static final int NODE_CNT = 4; + + /** Repeat count. */ + private static final int REPEAT_CNT = 10; + + /** GGFS. */ + private static IgniteFs ggfs; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < NODE_CNT; i++) { + Ignite g = G.start(config(i)); + + if (i + 1 == NODE_CNT) + ggfs = g.fileSystem("ggfs"); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ggfs.format(); + } + + /** + * Create grid configuration. + * + * @param idx Node index. + * @return Grid configuration + */ + private IgniteConfiguration config(int idx) { + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName("dataCache"); + ggfsCfg.setMetaCacheName("metaCache"); + ggfsCfg.setName("ggfs"); + ggfsCfg.setBlockSize(BLOCK_SIZE); + ggfsCfg.setDefaultMode(PRIMARY); + ggfsCfg.setFragmentizerEnabled(false); + + CacheConfiguration dataCacheCfg = new CacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setDistributionMode(PARTITIONED_ONLY); + dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(1)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setQueryIndexEnabled(false); + + CacheConfiguration metaCacheCfg = new CacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setGgfsConfiguration(ggfsCfg); + + cfg.setGridName("node-" + idx); + + return cfg; + } + + /** + * Test task. + * + * @throws Exception If failed. + */ + public void testTask() throws Exception { + U.sleep(3000); // TODO: Sleep in order to wait for fragmentizing to finish. + + for (int i = 0; i < REPEAT_CNT; i++) { + String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)]; + + generateFile(TOTAL_WORDS); + Long genLen = ggfs.info(FILE).length(); + + IgniteBiTuple<Long, Integer> taskRes = ggfs.execute(new Task(), + new IgniteFsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg); + + assert F.eq(genLen, taskRes.getKey()); + assert F.eq(TOTAL_WORDS, taskRes.getValue()); + } + } + + /** + * Test task. + * + * @throws Exception If failed. + */ + public void testTaskAsync() throws Exception { + U.sleep(3000); + + assertFalse(ggfs.isAsync()); + + IgniteFs ggfsAsync = ggfs.enableAsync(); + + assertTrue(ggfsAsync.isAsync()); + + for (int i = 0; i < REPEAT_CNT; i++) { + String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)]; + + generateFile(TOTAL_WORDS); + Long genLen = ggfs.info(FILE).length(); + + assertNull(ggfsAsync.execute( + new Task(), new IgniteFsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg)); + + IgniteFuture<IgniteBiTuple<Long, Integer>> fut = ggfsAsync.future(); + + assertNotNull(fut); + + IgniteBiTuple<Long, Integer> taskRes = fut.get(); + + assert F.eq(genLen, taskRes.getKey()); + assert F.eq(TOTAL_WORDS, taskRes.getValue()); + } + + ggfsAsync.format(); + + IgniteFuture<?> fut = ggfsAsync.future(); + + assertNotNull(fut); + + fut.get(); + } + + // TODO: Remove. + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** + * Generate file with random data and provided argument. + * + * @param wordCnt Word count. + * @throws Exception If failed. + */ + private void generateFile(int wordCnt) + throws Exception { + Random rnd = new Random(System.currentTimeMillis()); + + try (OutputStreamWriter writer = new OutputStreamWriter(ggfs.create(FILE, true))) { + int cnt = 0; + + while (cnt < wordCnt) { + String word = DICTIONARY[rnd.nextInt(DICTIONARY.length)]; + + writer.write(word + " "); + + cnt++; + } + } + } + + /** + * Task. + */ + private static class Task extends IgniteFsTask<String, IgniteBiTuple<Long, Integer>> { + /** {@inheritDoc} */ + @Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, + IgniteFsTaskArgs<String> args) throws IgniteCheckedException { + return new Job(); + } + + /** {@inheritDoc} */ + @Override public IgniteBiTuple<Long, Integer> reduce(List<ComputeJobResult> ress) throws IgniteCheckedException { + long totalLen = 0; + int argCnt = 0; + + for (ComputeJobResult res : ress) { + IgniteBiTuple<Long, Integer> res0 = (IgniteBiTuple<Long, Integer>)res.getData(); + + if (res0 != null) { + totalLen += res0.getKey(); + argCnt += res0.getValue(); + } + } + + return F.t(totalLen, argCnt); + } + } + + /** + * Job. + */ + private static class Job implements IgniteFsJob, Serializable { + @IgniteInstanceResource + private Ignite ignite; + + @IgniteTaskSessionResource + private ComputeTaskSession ses; + + @IgniteJobContextResource + private ComputeJobContext ctx; + + /** {@inheritDoc} */ + @Override public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) + throws IgniteCheckedException, IOException { + assert ignite != null; + assert ses != null; + assert ctx != null; + + in.seek(range.start()); + + byte[] buf = new byte[(int)range.length()]; + + int totalRead = 0; + + while (totalRead < buf.length) { + int b = in.read(); + + assert b != -1; + + buf[totalRead++] = (byte)b; + } + + String str = new String(buf); + + String[] chunks = str.split(" "); + + int ctr = 0; + + for (String chunk : chunks) { + if (!chunk.isEmpty()) + ctr++; + } + + return F.t(range.length(), ctr); + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTestInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTestInputStream.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTestInputStream.java new file mode 100644 index 0000000..cc4ec3f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsTestInputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import java.io.*; + +/** + * Test input stream with predictable data output and zero memory usage. + */ +class GridGgfsTestInputStream extends InputStream { + /** This stream length. */ + private long size; + + /** Salt for input data generation. */ + private long salt; + + /** Current stream position. */ + private long pos; + + /** + * Constructs test input stream. + * + * @param size This stream length. + * @param salt Salt for input data generation. + */ + GridGgfsTestInputStream(long size, long salt) { + this.size = size; + this.salt = salt; + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + if (pos >= size) + return -1; + + long next = salt ^ (salt * pos++); + + next ^= next >>> 32; + next ^= next >>> 16; + next ^= next >>> 8; + + return (int)(0xFF & next); + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + pos += Math.min(n, size - pos); + + return size - pos; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html new file mode 100644 index 0000000..1f85ff2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsAbstractRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsAbstractRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsAbstractRecordResolverSelfTest.java new file mode 100644 index 0000000..410275e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsAbstractRecordResolverSelfTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs.split; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.fs.IgniteFsMode.*; + +/** + * Base class for all split resolvers + */ +public class GridGgfsAbstractRecordResolverSelfTest extends GridCommonAbstractTest { + /** File path. */ + protected static final IgniteFsPath FILE = new IgniteFsPath("/file"); + + /** Shared IP finder. */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** GGFS. */ + protected static IgniteFs ggfs; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); + + ggfsCfg.setDataCacheName("dataCache"); + ggfsCfg.setMetaCacheName("metaCache"); + ggfsCfg.setName("ggfs"); + ggfsCfg.setBlockSize(512); + ggfsCfg.setDefaultMode(PRIMARY); + + CacheConfiguration dataCacheCfg = new CacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setDistributionMode(NEAR_PARTITIONED); + dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(128)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setQueryIndexEnabled(false); + + CacheConfiguration metaCacheCfg = new CacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + metaCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("grid"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setGgfsConfiguration(ggfsCfg); + + Ignite g = G.start(cfg); + + ggfs = g.fileSystem("ggfs"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ggfs.format(); + } + + /** + * Convenient method for wrapping some bytes into byte array. + * + * @param data Data bytes. + * @return Byte array. + */ + protected static byte[] wrap(int... data) { + byte[] res = new byte[data.length]; + + for (int i = 0; i < data.length; i++) + res[i] = (byte)data[i]; + + return res; + } + + /** + * Create byte array consisting of the given chunks. + * + * @param chunks Array of chunks where the first value is the byte array and the second value is amount of repeats. + * @return Byte array. + */ + protected static byte[] array(Map.Entry<byte[], Integer>... chunks) { + int totalSize = 0; + + for (Map.Entry<byte[], Integer> chunk : chunks) + totalSize += chunk.getKey().length * chunk.getValue(); + + byte[] res = new byte[totalSize]; + + int pos = 0; + + for (Map.Entry<byte[], Integer> chunk : chunks) { + for (int i = 0; i < chunk.getValue(); i++) { + System.arraycopy(chunk.getKey(), 0, res, pos, chunk.getKey().length); + + pos += chunk.getKey().length; + } + } + + return res; + } + + /** + * Open file for read and return input stream. + * + * @return Input stream. + * @throws Exception In case of exception. + */ + protected IgniteFsInputStream read() throws Exception { + return ggfs.open(FILE); + } + + /** + * Write data to the file. + * + * @param chunks Data chunks. + * @throws Exception In case of exception. + */ + protected void write(byte[]... chunks) throws Exception { + IgniteFsOutputStream os = ggfs.create(FILE, true); + + if (chunks != null) { + for (byte[] chunk : chunks) + os.write(chunk); + } + + os.close(); + } + + /** + * Create split. + * + * @param start Start position. + * @param len Length. + * @return Split. + */ + protected IgniteFsFileRange split(long start, long len) { + return new IgniteFsFileRange(FILE, start, len); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java new file mode 100644 index 0000000..ce3b53a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs.split; + +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.records.*; +import org.apache.ignite.internal.util.typedef.*; + +/** + * Byte delimiter split resolver self test. + */ +public class GridGgfsByteDelimiterRecordResolverSelfTest extends GridGgfsAbstractRecordResolverSelfTest { + /** + * Test split resolution when there are no delimiters in the file. + * + * @throws Exception If failed. + */ + public void testNoDelimiters() throws Exception { + byte[] delim = wrap(2); + byte[] data = array(F.t(wrap(1), 8)); + + assertSplit(0, 4, 0, 8, data, delim); + assertSplit(0, 8, 0, 8, data, delim); + + assertSplitNull(2, 2, data, delim); + assertSplitNull(2, 6, data, delim); + } + + /** + * Test split resolution when there is one delimiter at the head. + * + * @throws Exception If failed. + */ + public void testHeadDelimiter() throws Exception { + byte[] delim = array(F.t(wrap(2), 8)); + byte[] data = array(F.t(delim, 1), F.t(wrap(1), 8)); + + assertSplit(0, 4, 0, 8, data, delim); + assertSplit(0, 8, 0, 8, data, delim); + assertSplit(0, 12, 0, 16, data, delim); + assertSplit(0, 16, 0, 16, data, delim); + + assertSplitNull(2, 2, data, delim); + assertSplitNull(2, 6, data, delim); + assertSplit(2, 10, 8, 8, data, delim); + assertSplit(2, 14, 8, 8, data, delim); + + assertSplit(8, 4, 8, 8, data, delim); + assertSplit(8, 8, 8, 8, data, delim); + + assertSplitNull(10, 2, data, delim); + assertSplitNull(10, 6, data, delim); + } + + /** + * Test split when there is one delimiter at the end. + * + * @throws Exception If failed. + */ + public void testEndDelimiter() throws Exception { + byte[] delim = array(F.t(wrap(2), 8)); + byte[] data = array(F.t(wrap(1), 8), F.t(delim, 1)); + + assertSplit(0, 4, 0, 16, data, delim); + assertSplit(0, 8, 0, 16, data, delim); + assertSplit(0, 12, 0, 16, data, delim); + assertSplit(0, 16, 0, 16, data, delim); + + assertSplitNull(2, 2, data, delim); + assertSplitNull(2, 6, data, delim); + assertSplitNull(2, 10, data, delim); + assertSplitNull(2, 14, data, delim); + + assertSplitNull(8, 4, data, delim); + assertSplitNull(8, 8, data, delim); + + assertSplitNull(10, 2, data, delim); + assertSplitNull(10, 6, data, delim); + } + + /** + * Test split when there is one delimiter in the middle. + * + * @throws Exception If failed. + */ + public void testMiddleDelimiter() throws Exception { + byte[] delim = array(F.t(wrap(2), 8)); + byte[] data = array(F.t(wrap(1), 8), F.t(delim, 1), F.t(wrap(1), 8)); + + assertSplit(0, 4, 0, 16, data, delim); + assertSplit(0, 8, 0, 16, data, delim); + assertSplit(0, 12, 0, 16, data, delim); + assertSplit(0, 16, 0, 16, data, delim); + assertSplit(0, 20, 0, 24, data, delim); + assertSplit(0, 24, 0, 24, data, delim); + + assertSplitNull(2, 2, data, delim); + assertSplitNull(2, 6, data, delim); + assertSplitNull(2, 10, data, delim); + assertSplitNull(2, 14, data, delim); + assertSplit(2, 18, 16, 8, data, delim); + assertSplit(2, 22, 16, 8, data, delim); + + assertSplitNull(8, 4, data, delim); + assertSplitNull(8, 8, data, delim); + assertSplit(8, 12, 16, 8, data, delim); + assertSplit(8, 16, 16, 8, data, delim); + + assertSplitNull(10, 2, data, delim); + assertSplitNull(10, 6, data, delim); + assertSplit(10, 10, 16, 8, data, delim); + assertSplit(10, 14, 16, 8, data, delim); + + assertSplit(16, 4, 16, 8, data, delim); + assertSplit(16, 8, 16, 8, data, delim); + + assertSplitNull(18, 2, data, delim); + assertSplitNull(18, 6, data, delim); + } + + /** + * Test split when there are two head delimiters. + * + * @throws Exception If failed. + */ + public void testTwoHeadDelimiters() throws Exception { + byte[] delim = array(F.t(wrap(2), 8)); + byte[] data = array(F.t(delim, 2), F.t(wrap(1), 8)); + + assertSplit(0, 4, 0, 8, data, delim); + assertSplit(0, 8, 0, 8, data, delim); + assertSplit(0, 12, 0, 16, data, delim); + assertSplit(0, 16, 0, 16, data, delim); + assertSplit(0, 20, 0, 24, data, delim); + assertSplit(0, 24, 0, 24, data, delim); + + assertSplitNull(2, 2, data, delim); + assertSplitNull(2, 6, data, delim); + assertSplit(2, 10, 8, 8, data, delim); + assertSplit(2, 14, 8, 8, data, delim); + assertSplit(2, 18, 8, 16, data, delim); + assertSplit(2, 22, 8, 16, data, delim); + + assertSplit(8, 4, 8, 8, data, delim); + assertSplit(8, 8, 8, 8, data, delim); + assertSplit(8, 12, 8, 16, data, delim); + assertSplit(8, 16, 8, 16, data, delim); + + assertSplitNull(10, 2, data, delim); + assertSplitNull(10, 6, data, delim); + assertSplit(10, 10, 16, 8, data, delim); + assertSplit(10, 14, 16, 8, data, delim); + + assertSplit(16, 4, 16, 8, data, delim); + assertSplit(16, 8, 16, 8, data, delim); + + assertSplitNull(18, 2, data, delim); + assertSplitNull(18, 6, data, delim); + } + + /** + * Test split when there are two tail delimiters. + * + * @throws Exception If failed. + */ + public void testTwoTailDelimiters() throws Exception { + byte[] delim = array(F.t(wrap(2), 8)); + byte[] data = array(F.t(wrap(1), 8), F.t(delim, 2)); + + assertSplit(0, 4, 0, 16, data, delim); + assertSplit(0, 8, 0, 16, data, delim); + assertSplit(0, 12, 0, 16, data, delim); + assertSplit(0, 16, 0, 16, data, delim); + assertSplit(0, 20, 0, 24, data, delim); + assertSplit(0, 24, 0, 24, data, delim); + + assertSplitNull(2, 2, data, delim); + assertSplitNull(2, 6, data, delim); + assertSplitNull(2, 10, data, delim); + assertSplitNull(2, 14, data, delim); + assertSplit(2, 18, 16, 8, data, delim); + assertSplit(2, 22, 16, 8, data, delim); + + assertSplitNull(8, 4, data, delim); + assertSplitNull(8, 8, data, delim); + assertSplit(8, 12, 16, 8, data, delim); + assertSplit(8, 16, 16, 8, data, delim); + + assertSplitNull(10, 2, data, delim); + assertSplitNull(10, 6, data, delim); + assertSplit(10, 10, 16, 8, data, delim); + assertSplit(10, 14, 16, 8, data, delim); + + assertSplit(16, 4, 16, 8, data, delim); + assertSplit(16, 8, 16, 8, data, delim); + + assertSplitNull(18, 2, data, delim); + assertSplitNull(18, 6, data, delim); + } + + /** + * Test split when there is one head delimiter, one tail delimiter and some data between them. + * + * @throws Exception If failed. + */ + public void testHeadAndTailDelimiters() throws Exception { + byte[] delim = array(F.t(wrap(2), 8)); + byte[] data = array(F.t(delim, 1), F.t(wrap(1), 8), F.t(delim, 1)); + + assertSplit(0, 4, 0, 8, data, delim); + assertSplit(0, 8, 0, 8, data, delim); + assertSplit(0, 12, 0, 24, data, delim); + assertSplit(0, 16, 0, 24, data, delim); + assertSplit(0, 20, 0, 24, data, delim); + assertSplit(0, 24, 0, 24, data, delim); + + assertSplitNull(2, 2, data, delim); + assertSplitNull(2, 6, data, delim); + assertSplit(2, 10, 8, 16, data, delim); + assertSplit(2, 14, 8, 16, data, delim); + assertSplit(2, 18, 8, 16, data, delim); + assertSplit(2, 22, 8, 16, data, delim); + + assertSplit(8, 4, 8, 16, data, delim); + assertSplit(8, 8, 8, 16, data, delim); + assertSplit(8, 12, 8, 16, data, delim); + assertSplit(8, 16, 8, 16, data, delim); + + assertSplitNull(10, 2, data, delim); + assertSplitNull(10, 6, data, delim); + assertSplitNull(10, 10, data, delim); + assertSplitNull(10, 14, data, delim); + + assertSplitNull(16, 4, data, delim); + assertSplitNull(16, 8, data, delim); + + assertSplitNull(18, 2, data, delim); + assertSplitNull(18, 6, data, delim); + } + + /** + * Test special case when delimiter starts with the same bytes as the last previos data byte. + * + * @throws Exception If failed. + */ + public void testDelimiterStartsWithTheSameBytesAsLastPreviousDataByte() throws Exception { + byte[] delim = array(F.t(wrap(1, 1, 2), 1)); + byte[] data = array(F.t(wrap(1), 1), F.t(delim, 1), F.t(wrap(1), 1)); + + assertSplit(0, 1, 0, 4, data, delim); + assertSplit(0, 2, 0, 4, data, delim); + assertSplit(0, 4, 0, 4, data, delim); + assertSplit(0, 5, 0, 5, data, delim); + + assertSplit(1, 4, 4, 1, data, delim); + } + + /** + * Check split resolution. + * + * @param suggestedStart Suggested start. + * @param suggestedLen Suggested length. + * @param expStart Expected start. + * @param expLen Expected length. + * @param data File data. + * @param delims Delimiters. + * @throws Exception If failed. + */ + public void assertSplit(long suggestedStart, long suggestedLen, long expStart, long expLen, byte[] data, + byte[]... delims) throws Exception { + write(data); + + IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims); + + IgniteFsFileRange split; + + try (IgniteFsInputStream is = read()) { + split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); + } + + assert split != null : "Split is null."; + assert split.start() == expStart : "Incorrect start [expected=" + expStart + ", actual=" + split.start() + ']'; + assert split.length() == expLen : "Incorrect length [expected=" + expLen + ", actual=" + split.length() + ']'; + } + + /** + * Check the split resolution resulted in {@code null}. + * + * @param suggestedStart Suggested start. + * @param suggestedLen Suggested length. + * @param data File data. + * @param delims Delimiters. + * @throws Exception If failed. + */ + public void assertSplitNull(long suggestedStart, long suggestedLen, byte[] data, byte[]... delims) + throws Exception { + write(data); + + IgniteFsByteDelimiterRecordResolver rslvr = resolver(delims); + + IgniteFsFileRange split; + + try (IgniteFsInputStream is = read()) { + split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); + } + + assert split == null : "Split is not null."; + } + + /** + * Create resolver. + * + * @param delims Delimiters. + * @return Resolver. + */ + private IgniteFsByteDelimiterRecordResolver resolver(byte[]... delims) { + return new IgniteFsByteDelimiterRecordResolver(delims); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsFixedLengthRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsFixedLengthRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsFixedLengthRecordResolverSelfTest.java new file mode 100644 index 0000000..961b17c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/split/GridGgfsFixedLengthRecordResolverSelfTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs.split; + +import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.records.*; +import org.apache.ignite.internal.util.typedef.*; + +/** + * Fixed length split resolver self test. + */ +public class GridGgfsFixedLengthRecordResolverSelfTest extends GridGgfsAbstractRecordResolverSelfTest { + /** + * Test split resolver. + * + * @throws Exception If failed. + */ + public void testResolver() throws Exception { + byte[] data = array(F.t(wrap(1), 24)); + + assertSplit(0, 4, 0, 8, data, 8); + assertSplit(0, 8, 0, 8, data, 8); + assertSplit(0, 12, 0, 16, data, 8); + assertSplit(0, 16, 0, 16, data, 8); + assertSplit(0, 20, 0, 24, data, 8); + assertSplit(0, 24, 0, 24, data, 8); + assertSplit(0, 28, 0, 24, data, 8); + assertSplit(0, 32, 0, 24, data, 8); + + assertSplitNull(2, 2, data, 8); + assertSplitNull(2, 6, data, 8); + assertSplit(2, 10, 8, 8, data, 8); + assertSplit(2, 14, 8, 8, data, 8); + assertSplit(2, 18, 8, 16, data, 8); + assertSplit(2, 22, 8, 16, data, 8); + assertSplit(2, 26, 8, 16, data, 8); + assertSplit(2, 30, 8, 16, data, 8); + + assertSplit(8, 4, 8, 8, data, 8); + assertSplit(8, 8, 8, 8, data, 8); + assertSplit(8, 12, 8, 16, data, 8); + assertSplit(8, 16, 8, 16, data, 8); + assertSplit(8, 20, 8, 16, data, 8); + assertSplit(8, 24, 8, 16, data, 8); + + assertSplitNull(10, 2, data, 8); + assertSplitNull(10, 6, data, 8); + assertSplit(10, 10, 16, 8, data, 8); + assertSplit(10, 14, 16, 8, data, 8); + assertSplit(10, 18, 16, 8, data, 8); + assertSplit(10, 22, 16, 8, data, 8); + + assertSplit(16, 4, 16, 8, data, 8); + assertSplit(16, 8, 16, 8, data, 8); + assertSplit(16, 12, 16, 8, data, 8); + assertSplit(16, 16, 16, 8, data, 8); + + assertSplitNull(18, 2, data, 8); + assertSplitNull(18, 6, data, 8); + assertSplitNull(18, 10, data, 8); + assertSplitNull(18, 14, data, 8); + + assertSplitNull(24, 4, data, 8); + assertSplitNull(24, 8, data, 8); + + assertSplitNull(26, 2, data, 8); + assertSplitNull(26, 6, data, 8); + } + + /** + * Check split resolution. + * + * @param suggestedStart Suggested start. + * @param suggestedLen Suggested length. + * @param expStart Expected start. + * @param expLen Expected length. + * @param data File data. + * @param len Length. + * @throws Exception If failed. + */ + public void assertSplit(long suggestedStart, long suggestedLen, long expStart, long expLen, byte[] data, int len) + throws Exception { + write(data); + + IgniteFsFixedLengthRecordResolver rslvr = resolver(len); + + IgniteFsFileRange split; + + try (IgniteFsInputStream is = read()) { + split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); + } + + assert split != null : "Split is null."; + assert split.start() == expStart : "Incorrect start [expected=" + expStart + ", actual=" + split.start() + ']'; + assert split.length() == expLen : "Incorrect length [expected=" + expLen + ", actual=" + split.length() + ']'; + } + + /** + * Check the split resolution resulted in {@code null}. + * + * @param suggestedStart Suggested start. + * @param suggestedLen Suggested length. + * @param data File data. + * @param len Length. + * @throws Exception If failed. + */ + public void assertSplitNull(long suggestedStart, long suggestedLen, byte[] data, int len) + throws Exception { + write(data); + + IgniteFsFixedLengthRecordResolver rslvr = resolver(len); + + IgniteFsFileRange split; + + try (IgniteFsInputStream is = read()) { + split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen)); + } + + assert split == null : "Split is not null."; + } + + /** + * Create resolver. + * + * @param len Length. + * @return Resolver. + */ + private IgniteFsFixedLengthRecordResolver resolver(int len) { + return new IgniteFsFixedLengthRecordResolver(len); + } +}
