This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new dd5d9f3 IGNITE-12366: Cancel file transmission on a node-receiver (#7045) dd5d9f3 is described below commit dd5d9f3ab0d853ab80487c01c702824851a4ab4f Author: Maxim Muzafarov <mmu...@apache.org> AuthorDate: Tue Nov 12 14:02:16 2019 +0300 IGNITE-12366: Cancel file transmission on a node-receiver (#7045) --- .../managers/communication/GridIoManager.java | 6 +++ .../TransmissionCancelledException.java | 44 ++++++++++++++++++++++ .../communication/TransmissionHandler.java | 10 +++++ .../managers/communication/TransmissionMeta.java | 2 +- .../GridIoManagerFileTransmissionSelfTest.java | 35 +++++++++++++++++ 5 files changed, 96 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 2ec9acc..799625d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -3289,6 +3289,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa catch (IgniteCheckedException e) { closeChannelQuiet(); + if (X.hasCause(e, TransmissionCancelledException.class)) { + throw new TransmissionCancelledException("File transmission has been cancelled on the remote node " + + "[rmtId=" + rmtId + ", file=" + file.getName() + ", sesKey=" + sesKey + ", retries=" + retries + + ", cause='" + e.getCause(TransmissionCancelledException.class).getMessage() + "']"); + } + throw new IgniteCheckedException("Exception while sending file [rmtId=" + rmtId + ", file=" + file.getName() + ", sesKey=" + sesKey + ", retries=" + retries + ']', e); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionCancelledException.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionCancelledException.java new file mode 100644 index 0000000..3971421 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionCancelledException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.IgniteException; + +/** + * Exception is used to cancel a file transmission operation on the receiver. This exception may be thrown + * at anytime during session handling by a {@link TransmissionHandler} to gracefully interrupt the transmission + * session on a node-sender. + */ +public class TransmissionCancelledException extends IgniteException { + /** Class serialization version number. */ + private static final long serialVersionUID = 0L; + + /** + * Default no-op consturctor. + */ + public TransmissionCancelledException() { + // No-op. + } + + /** + * @param msg Cancellation cause. + */ + public TransmissionCancelledException(String msg) { + super(msg); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java index a55f1e6..2cc4d1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java @@ -41,11 +41,15 @@ import java.util.function.Consumer; */ public interface TransmissionHandler { /** + * @param nodeId Remote node id on which the error occurred. * @param err The err of fail handling process. */ public void onException(UUID nodeId, Throwable err); /** + * Absolute path of a file to receive remote transmission data into. The {@link TransmissionCancelledException} + * can be thrown if it is necessary to gracefully interrupt current transmission session on the node-sender. + * * @param nodeId Remote node id from which request has been received. * @param fileMeta File meta info. * @return Absolute pathname denoting a file. @@ -56,6 +60,9 @@ public interface TransmissionHandler { * <em>Chunk handler</em> represents by itself the way of input data stream processing. * It accepts within each chunk a {@link ByteBuffer} with data from input for further processing. * Activated when the {@link TransmissionPolicy#CHUNK} policy sent. + * <p> + * The {@link TransmissionCancelledException} can be thrown to gracefully interrupt the local transmission and + * the node-senders transmission session. * * @param nodeId Remote node id from which request has been received. * @param initMeta Initial handler meta info. @@ -67,6 +74,9 @@ public interface TransmissionHandler { * <em>File handler</em> represents by itself the way of input data stream processing. All the data will * be processed under the hood using zero-copy transferring algorithm and only start file processing and * the end of processing will be provided. Activated when the {@link TransmissionPolicy#FILE} policy sent. + * <p> + * The {@link TransmissionCancelledException} can be thrown to gracefully interrupt the local transmission and + * the node-senders transmission session. * * @param nodeId Remote node id from which request has been received. * @param initMeta Initial handler meta info. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java index 986bf55..5768d74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java @@ -30,7 +30,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; * Class represents a file meta information to send to the remote node. Used to initiate a new file transfer * process or to continue the previous unfinished from the last transmitted point. */ -class TransmissionMeta implements Externalizable { +public class TransmissionMeta implements Externalizable { /** Serial version uid. */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java index b4254d8..7b91e47 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java @@ -806,6 +806,41 @@ public class GridIoManagerFileTransmissionSelfTest extends GridCommonAbstractTes } /** + * @throws Exception If fails. + */ + @Test(expected = TransmissionCancelledException.class) + public void testChunkHandlerCancelTransmission() throws Exception { + snd = startGrid(0); + rcv = startGrid(1); + + snd.cluster().active(true); + + File fileToSend = createFileRandomData("testFile", 1024 * 1024); + + rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter() { + /** {@inheritDoc} */ + @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) { + return new Consumer<ByteBuffer>() { + @Override public void accept(ByteBuffer buffer) { + throw new TransmissionCancelledException("Operation cancelled by the user"); + } + }; + } + }); + + try (GridIoManager.TransmissionSender sender = snd.context() + .io() + .openTransmissionSender(rcv.localNode().id(), topic)) { + sender.send(fileToSend, TransmissionPolicy.CHUNK); + } + catch (TransmissionCancelledException e) { + log.warning("Transmission cancelled", e); + + throw e; + } + } + + /** * @param ig Ignite instance to check. */ private static void ensureResourcesFree(IgniteEx ig) {