[
https://issues.apache.org/jira/browse/IGNITE-12349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pavel Pereslegin resolved IGNITE-12349.
---------------------------------------
Resolution: Cannot Reproduce
I think this has been fixed in IGNITE-13098.
> File transmission can cause the cluster to freeze.
> --------------------------------------------------
>
> Key: IGNITE-12349
> URL: https://issues.apache.org/jira/browse/IGNITE-12349
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 2.8
> Reporter: Pavel Pereslegin
> Assignee: Maxim Muzafarov
> Priority: Critical
>
> When we initiating file transmission - a timeout object with mutable endTime
> is added to the timeout processor "queue" (see
> TcpCommunicationSpi#openChannel).
> Since endTime is mutable, a timeout for this object will never occur,
> moreover, at some point, this object will be the first in the "queue" and
> TimeoutProcessor will stop working at all.
> Reproducer
> {code:java}
> public class FileTransmissionTimeoutProcessorTest extends
> GridCommonAbstractTest {
> @After
> public void after() throws Exception {
> cleanPersistenceDir();
> stopAllGrids();
> }
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String
> igniteInstanceName) throws Exception {
> return super.getConfiguration(igniteInstanceName)
> .setDataStorageConfiguration(new DataStorageConfiguration()
> .setDefaultDataRegionConfiguration(new
> DataRegionConfiguration()
> .setPersistenceEnabled(true)
> .setMaxSize(500L * 1024 * 1024)))
> .setCacheConfiguration(new CacheConfiguration<Integer,
> Integer>(DEFAULT_CACHE_NAME));
> }
> @Test
> public void testChannelTimeoutObject() throws Exception {
> IgniteEx snd = startGrid(0);
> IgniteEx rcv = startGrid(1);
> // Do some transfer between nodes.
> initiateFileTransfer(snd, rcv);
> GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
> // Add new timeout object after file transmission timeout object.
> snd.context().timeout().addTimeoutObject(new
> GridTimeoutObjectAdapter(DFLT_CONN_TIMEOUT + 1_000) {
> @Override public void onTimeout() {
> fut.onDone(true);
> }
> });
> // The timeout processor will hang on the file transfer timeout
> object and will never complete the remaining tasks.
> boolean success = fut.get(DFLT_CONN_TIMEOUT + 30_000);
> assertTrue(success);
> }
> /** */
> private void initiateFileTransfer(IgniteEx snd, IgniteEx rcv) throws
> IOException, IgniteCheckedException, InterruptedException {
> snd.cluster().active(true);
> awaitPartitionMapExchange();
> try (IgniteDataStreamer<Integer, Integer> dataStreamer =
> snd.dataStreamer(DEFAULT_CACHE_NAME)) {
> dataStreamer.allowOverwrite(true);
> for (int i = 0; i < 10_000; i++)
> dataStreamer.addData(i, i + DEFAULT_CACHE_NAME.hashCode());
> }
> Map<String, Long> fileSizes = new HashMap<>();
> Map<String, Integer> fileCrcs = new HashMap<>();
> Map<String, Serializable> fileParams = new HashMap<>();
>
> assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));
> File tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(),
> "ctmp", true);
>
> rcv.context().io().addTransmissionHandler(GridTopic.TOPIC_CACHE.topic("test",
> 0), new TransmissionHandler() {
> @Override public void onException(UUID nodeId, Throwable err) {
> // No-op.
> }
> @Override public String filePath(UUID nodeId, TransmissionMeta
> fileMeta) {
> return new File(tempStore, fileMeta.name()).getAbsolutePath();
> }
> @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId,
> TransmissionMeta initMeta) {
> return null;
> }
> @Override public Consumer<File> fileHandler(UUID nodeId,
> TransmissionMeta initMeta) {
> return new Consumer<File>() {
> @Override public void accept(File file) {
> assertTrue(fileSizes.containsKey(file.getName()));
> // Save all params.
> fileParams.putAll(initMeta.params());
> }
> };
> }
> });
> IgniteInternalCache<Object, Object> defCache =
> snd.cachex(DEFAULT_CACHE_NAME);
> File cacheDirIg0 = ((FilePageStoreManager)(defCache).context()
> .shared()
> .pageStore()).cacheWorkDir(defCache.configuration());
> File[] cacheParts = cacheDirIg0.listFiles(new FilenameFilter() {
> @Override public boolean accept(File dir, String name) {
> return name.endsWith(FILE_SUFFIX);
> }
> });
> for (File file : cacheParts) {
> fileSizes.put(file.getName(), file.length());
> fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
> }
> try (GridIoManager.TransmissionSender sender = snd.context()
> .io()
> .openTransmissionSender(rcv.localNode().id(),
> GridTopic.TOPIC_CACHE.topic("test", 0))) {
> // Iterate over cache partition cacheParts.
> for (File file : cacheParts) {
> Map<String, Serializable> params = new HashMap<>();
> params.put(file.getName(), file.hashCode());
> sender.send(file, params, TransmissionPolicy.FILE);
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)