IGNITE-1298: Now output stream flush awaits for all data blocks to be processed before updating file length in meta cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02f24653 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02f24653 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02f24653 Branch: refs/heads/ignite-1093 Commit: 02f246535dd31346ff94485810e3bb306bb67cbb Parents: 16c095a Author: iveselovskiy <[email protected]> Authored: Fri Aug 28 16:03:45 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Aug 28 16:03:45 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsDataManager.java | 169 +++++++++---------- .../processors/igfs/IgfsOutputStreamImpl.java | 2 + .../igfs/IgfsBackupFailoverSelfTest.java | 137 +++++++++++++-- 3 files changed, 203 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index aa6427d..602924d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; @@ -323,58 +322,6 @@ public class IgfsDataManager extends IgfsManager { } /** - * Get list of local data blocks of the given file. - * - * @param fileInfo File info. - * @return List of local data block indices. - * @throws IgniteCheckedException If failed. - */ - public List<Long> listLocalDataBlocks(IgfsFileInfo fileInfo) - throws IgniteCheckedException { - assert fileInfo != null; - - int prevGrpIdx = 0; // Block index within affinity group. - - boolean prevPrimaryFlag = false; // Whether previous block was primary. - - List<Long> res = new ArrayList<>(); - - for (long i = 0; i < fileInfo.blocksCount(); i++) { - // Determine group index. - int grpIdx = (int)(i % grpSize); - - if (prevGrpIdx < grpIdx) { - // Reuse existing affinity result. - if (prevPrimaryFlag) - res.add(i); - } - else { - // Re-calculate affinity result. - IgfsBlockKey key = new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), - fileInfo.evictExclude(), i); - - Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key); - - assert affNodes != null && !affNodes.isEmpty(); - - ClusterNode primaryNode = affNodes.iterator().next(); - - if (primaryNode.id().equals(igfsCtx.kernalContext().localNodeId())) { - res.add(i); - - prevPrimaryFlag = true; - } - else - prevPrimaryFlag = false; - } - - prevGrpIdx = grpIdx; - } - - return res; - } - - /** * Get data block for specified file ID and block index. * * @param fileInfo File info. @@ -1764,6 +1711,19 @@ public class IgfsDataManager extends IgfsManager { } /** + * Allows output stream to await for all current acks. + * + * @param fileId File ID. + * @throws IgniteInterruptedCheckedException In case of interrupt. + */ + void awaitAllAcksReceived(IgniteUuid fileId) throws IgniteInterruptedCheckedException { + WriteCompletionFuture fut = pendingWrites.get(fileId); + + if (fut != null) + fut.awaitAllAcksReceived(); + } + + /** * Future that is completed when all participating */ private class WriteCompletionFuture extends GridFutureAdapter<Boolean> { @@ -1771,10 +1731,16 @@ public class IgfsDataManager extends IgfsManager { private static final long serialVersionUID = 0L; /** File id to remove future from map. */ - private IgniteUuid fileId; + private final IgniteUuid fileId; /** Pending acks. */ - private ConcurrentMap<UUID, Set<Long>> pendingAcks = new ConcurrentHashMap8<>(); + private final ConcurrentMap<Long, UUID> ackMap = new ConcurrentHashMap8<>(); + + /** Lock for map-related conditions. */ + private final Lock lock = new ReentrantLock(); + + /** Condition to wait for empty map. */ + private final Condition allAcksRcvCond = lock.newCondition(); /** Flag indicating future is waiting for last ack. */ private volatile boolean awaitingLast; @@ -1788,6 +1754,23 @@ public class IgfsDataManager extends IgfsManager { this.fileId = fileId; } + /** + * Await all pending data blockes to be acked. + * + * @throws IgniteInterruptedCheckedException In case of interrupt. + */ + public void awaitAllAcksReceived() throws IgniteInterruptedCheckedException { + lock.lock(); + + try { + while (!ackMap.isEmpty()) + U.await(allAcksRcvCond); + } + finally { + lock.unlock(); + } + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { if (!isDone()) { @@ -1808,26 +1791,41 @@ public class IgfsDataManager extends IgfsManager { */ private void onWriteRequest(UUID nodeId, long batchId) { if (!isDone()) { - Set<Long> reqIds = pendingAcks.get(nodeId); + UUID pushedOut = ackMap.putIfAbsent(batchId, nodeId); - if (reqIds == null) - reqIds = F.addIfAbsent(pendingAcks, nodeId, new GridConcurrentHashSet<Long>()); - - reqIds.add(batchId); + assert pushedOut == null; } } /** + * Answers if there are some batches for the specified node we're currently waiting acks for. + * + * @param nodeId The node Id. + * @return If there are acks awaited from this node. + */ + private boolean hasPendingAcks(UUID nodeId) { + assert nodeId != null; + + for (Map.Entry<Long, UUID> e : ackMap.entrySet()) + if (nodeId.equals(e.getValue())) + return true; + + return false; + } + + /** * Error occurred on node with given ID. * * @param nodeId Node ID. * @param e Caught exception. */ private void onError(UUID nodeId, IgniteCheckedException e) { - Set<Long> reqIds = pendingAcks.get(nodeId); - // If waiting for ack from this node. - if (reqIds != null && !reqIds.isEmpty()) { + if (hasPendingAcks(nodeId)) { + ackMap.clear(); + + signalNoAcks(); + if (e.hasCause(IgfsOutOfSpaceException.class)) onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + nodeId, e)); else @@ -1844,18 +1842,31 @@ public class IgfsDataManager extends IgfsManager { */ private void onWriteAck(UUID nodeId, long batchId) { if (!isDone()) { - Set<Long> reqIds = pendingAcks.get(nodeId); + boolean rmv = ackMap.remove(batchId, nodeId); - assert reqIds != null : "Received acknowledgement message for not registered node [nodeId=" + + assert rmv : "Received acknowledgement message for not registered batch [nodeId=" + nodeId + ", batchId=" + batchId + ']'; - boolean rmv = reqIds.remove(batchId); + if (ackMap.isEmpty()) { + signalNoAcks(); - assert rmv : "Received acknowledgement message for not registered batch [nodeId=" + - nodeId + ", batchId=" + batchId + ']'; + if (awaitingLast) + onDone(true); + } + } + } + + /** + * Signal that currenlty there are no more pending acks. + */ + private void signalNoAcks() { + lock.lock(); - if (awaitingLast && checkCompleted()) - onDone(true); + try { + allAcksRcvCond.signalAll(); + } + finally { + lock.unlock(); } } @@ -1868,24 +1879,8 @@ public class IgfsDataManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Marked write completion future as awaiting last ack: " + fileId); - if (checkCompleted()) + if (ackMap.isEmpty()) onDone(true); } - - /** - * @return True if received all request acknowledgements after {@link #markWaitingLastAck()} was called. - */ - private boolean checkCompleted() { - for (Map.Entry<UUID, Set<Long>> entry : pendingAcks.entrySet()) { - Set<Long> reqIds = entry.getValue(); - - // If still waiting for some acks. - if (!reqIds.isEmpty()) - return false; - } - - // Got match for each entry in sent map. - return true; - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java index 298733a..01359b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java @@ -283,6 +283,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter { } if (space > 0) { + data.awaitAllAcksReceived(fileInfo.id()); + IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(), new ReserveSpaceClosure(space, streamRange)); http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java index 0162121..09cecaa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.secondary.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; @@ -108,21 +109,6 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest { } /** - * Creates IPC configuration. - * - * @param port The port to use. - * @return The endpoint configuration. - */ - protected IgfsIpcEndpointConfiguration createIgfsRestConfig(int port) { - IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration(); - - cfg.setType(IgfsIpcEndpointType.TCP); - cfg.setPort(port); - - return cfg; - } - - /** * Start grid with IGFS. * * @param gridName Grid name. @@ -296,8 +282,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest { final AtomicBoolean stop = new AtomicBoolean(); GridTestUtils.runMultiThreadedAsync(new Callable() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { Thread.sleep(1_000); // Some delay to ensure read is in progress. // Now stop all the nodes but the 1st: @@ -390,7 +375,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest { final int f = f0; - int att = doWithRetries(2, new Callable<Void>() { + int att = doWithRetries(1, new Callable<Void>() { @Override public Void call() throws Exception { IgfsOutputStream ios = os; @@ -411,6 +396,8 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest { } }); + assert att == 1; + X.println("write #2 completed: " + f0 + " in " + att + " attempts."); } @@ -432,6 +419,120 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest { } /** + * + * @throws Exception + */ + public void testWriteFailoverWhileStoppingMultipleNodes() throws Exception { + final IgfsImpl igfs0 = nodeDatas[0].igfsImpl; + + clear(igfs0); + + IgfsAbstractSelfTest.create(igfs0, paths(DIR, SUBDIR), null); + + final IgfsOutputStream[] outStreams = new IgfsOutputStream[files]; + + // Create files: + for (int f = 0; f < files; f++) { + final byte[] data = createChunk(fileSize, f); + + IgfsOutputStream os = null; + + try { + os = igfs0.create(filePath(f), 256, true, null, 0, -1, null); + + assert os != null; + + writeFileChunks(os, data); + } + finally { + if (os != null) + os.flush(); + } + + outStreams[f] = os; + + X.println("write #1 completed: " + f); + } + + final AtomicBoolean stop = new AtomicBoolean(); + + GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + Thread.sleep(10_000); // Some delay to ensure read is in progress. + + // Now stop all the nodes but the 1st: + for (int n = 1; n < numIgfsNodes; n++) { + stopGrid(n); + + X.println("#### grid " + n + " stopped."); + } + + //Thread.sleep(10_000); + + stop.set(true); + + return null; + } + }, 1, "igfs-node-stopper"); + + // Write #2: + for (int f0 = 0; f0 < files; f0++) { + final IgfsOutputStream os = outStreams[f0]; + + assert os != null; + + final int f = f0; + + int att = doWithRetries(1, new Callable<Void>() { + @Override public Void call() throws Exception { + IgfsOutputStream ios = os; + + try { + writeChunks0(igfs0, ios, f); + } + catch (IOException ioe) { + log().warning("Attempt to append the data to existing stream failed: ", ioe); + + ios = igfs0.append(filePath(f), false); + + assert ios != null; + + writeChunks0(igfs0, ios, f); + } + + return null; + } + }); + + assert att == 1; + + X.println("write #2 completed: " + f0 + " in " + att + " attempts."); + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return stop.get(); + } + }, 25_000); + + // Check files: + for (int f = 0; f < files; f++) { + IgfsPath path = filePath(f); + + byte[] data = createChunk(fileSize, f); + + // Check through 1st node: + checkExist(igfs0, path); + + assertEquals("File length mismatch.", data.length * 2, igfs0.size(path)); + + checkFileContent(igfs0, path, data, data); + + X.println("Read test completed: " + f); + } + } + + /** * Writes data to the file of the specified index and closes the output stream. * * @param igfs0 IGFS.
