This is an automated email from the ASF dual-hosted git repository. mpochatkin pushed a commit to branch IGNITE-26418 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 7e44c62f3f8de5e93c1b02025f4a398cf05b642c Author: Pochatkin Mikhail <[email protected]> AuthorDate: Tue Sep 30 14:36:17 2025 +0300 IGNITE-26418 Fix potential netty buffers leak --- .../java/org/apache/ignite/lang/ErrorGroups.java | 3 + .../ignite/internal/deployment/DeployFiles.java | 3 +- .../internal/deployment/ItDeploymentUnitTest.java | 4 +- .../internal/deployunit/CachedDeploymentUnit.java | 51 +++++++++ .../internal/deployunit/DeployerProcessor.java | 121 +++++++++++++++++---- .../internal/deployunit/DeploymentManagerImpl.java | 8 +- .../ignite/internal/deployunit/DeploymentUnit.java | 8 +- .../deployunit/DeploymentUnitProcessor.java | 9 +- .../internal/deployunit/FileDeployerService.java | 51 ++++++--- .../internal/deployunit/FilesDeploymentUnit.java | 29 ++--- ...ploymentUnit.java => StreamDeploymentUnit.java} | 16 ++- .../ignite/internal/deployunit/UnitContent.java | 2 +- .../internal/deployunit/ZipDeploymentUnit.java | 6 +- .../DeploymentConfigurationSchema.java | 3 + .../DeploymentUnitWriteException.java} | 24 ++-- .../TempStorage.java} | 26 ++--- .../deployunit/tempstorage/TempStorageImpl.java | 81 ++++++++++++++ .../TempStorageProvider.java} | 19 +--- .../tempstorage/TempStorageProviderImpl.java | 56 ++++++++++ .../ignite/deployment/FileDeployerServiceTest.java | 15 ++- .../internal/compute/ItComputeTestStandalone.java | 3 +- .../compute/loader/JobContextManagerTest.java | 8 +- .../rest/deployment/CodeDeploymentRestFactory.java | 13 ++- .../deployment/CompletedFileUploadSubscriber.java | 11 +- .../deployment/DeploymentManagementController.java | 18 ++- .../rest/deployment/InputStreamCollector.java | 4 +- .../rest/deployment/InputStreamCollectorImpl.java | 48 ++++++-- .../rest/deployment/ZipInputStreamCollector.java | 41 +++++-- .../DeploymentUnitWriteExceptionHandler.java | 42 +++++++ .../org/apache/ignite/internal/app/IgniteImpl.java | 5 +- 30 files changed, 567 insertions(+), 161 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java index 35fdd683a56..15da8164b84 100755 --- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java @@ -574,6 +574,9 @@ public class ErrorGroups { /** Deployment unit zip deploy error. */ public static final int UNIT_ZIP_ERR = CODE_DEPLOYMENT_ERR_GROUP.registerErrorCode((short) 5); + + /** Deployment unit write to fs error. */ + public static final int UNIT_WRITE_ERR = CODE_DEPLOYMENT_ERR_GROUP.registerErrorCode((short) 6); } /** diff --git a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java index d356a870d47..e22c39224dd 100644 --- a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java +++ b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.deployunit.DeploymentUnit; import org.apache.ignite.internal.deployunit.FilesDeploymentUnit; import org.apache.ignite.internal.deployunit.NodesToDeploy; +import org.apache.ignite.internal.deployunit.StreamDeploymentUnit; import org.apache.ignite.internal.deployunit.UnitStatuses; import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder; import org.apache.ignite.internal.deployunit.ZipDeploymentUnit; @@ -240,7 +241,7 @@ class DeployFiles { } return new ZipDeploymentUnit(zis); } else { - return new FilesDeploymentUnit(map); + return new StreamDeploymentUnit(map); } } } diff --git a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java index 1a33fb7d1c2..83181e57fee 100644 --- a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java +++ b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.deployunit.NodesToDeploy; import org.apache.ignite.internal.deployunit.UnitStatuses; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; /** @@ -282,7 +283,8 @@ public class ItDeploymentUnitTest extends ClusterPerTestIntegrationTest { smallUnit.waitUnitClean(igniteImpl(2)); } - @Test +// @Test + @RepeatedTest(10) public void testZipDeploy() { String id = "test"; Unit unit = files.deployAndVerifyFlatZip(id, Version.parseVersion("1.1.0"), igniteImpl(1)); diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/CachedDeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/CachedDeploymentUnit.java new file mode 100644 index 00000000000..e07d2a7529a --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/CachedDeploymentUnit.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; + +/** + * Implementation of {@link DeploymentUnit} that handles unit from provided future. + */ +public class CachedDeploymentUnit implements DeploymentUnit { + private static final IgniteLogger LOG = Loggers.forClass(CachedDeploymentUnit.class); + + private final CompletableFuture<DeploymentUnit> future; + + public CachedDeploymentUnit(CompletableFuture<DeploymentUnit> future) { + this.future = future; + } + + @Override + public <T, R> CompletableFuture<R> process(DeploymentUnitProcessor<T, R> processor, T arg) { + return future.thenCompose(unit -> unit.process(processor, arg)); + } + + @Override + public void close() throws Exception { + future.whenComplete((unit, throwable) -> { + try { + unit.close(); + } catch (Exception e) { + LOG.warn("Failed to close deployment unit: {}", e, unit); + } + }); + } +} diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployerProcessor.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployerProcessor.java index 258a4e2922a..28fea214bb5 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployerProcessor.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployerProcessor.java @@ -19,14 +19,29 @@ package org.apache.ignite.internal.deployunit; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import org.apache.ignite.internal.deployunit.DeployerProcessor.DeployArg; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorage; +import org.apache.ignite.internal.lang.RunnableX; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.CompletableFutures; +import org.jetbrains.annotations.Nullable; /** * Implementation of {@link DeploymentUnitProcessor} that deploys deployment unit content to the file system. @@ -50,37 +65,103 @@ import java.util.zip.ZipInputStream; * <li>{@code Path} - the argument type representing the target deployment directory</li> * </ul> */ -public class DeployerProcessor implements DeploymentUnitProcessor<Path> { - /** Suffix used for temporary files during the deployment process. */ - private static final String TMP_SUFFIX = ".tmp"; +class DeployerProcessor implements DeploymentUnitProcessor<DeployArg, Boolean> { + private static final IgniteLogger LOG = Loggers.forClass(DeployerProcessor.class); + + private final Executor executor; + + public DeployerProcessor(Executor executor) { + this.executor = executor; + } @Override - public void processContent(FilesDeploymentUnit unit, Path unitFolder) throws IOException { - for (Entry<String, InputStream> e : unit.content().entrySet()) { - doDeploy(unitFolder, e.getKey(), e.getValue()); - } + public CompletableFuture<Boolean> processFilesContent(FilesDeploymentUnit unit, DeployArg deployArg) { + return wrap(() -> { + for (Entry<String, Path> e : unit.content().entrySet()) { + Files.move(e.getValue(), deployArg.unitFolder, ATOMIC_MOVE, REPLACE_EXISTING); + } + }); } @Override - public void processContentWithUnzip(ZipDeploymentUnit unit, Path unitFolder) throws IOException { + public CompletableFuture<Boolean> processStreamContent(StreamDeploymentUnit unit, DeployArg deployArg) { + CompletableFuture<?>[] array = unit.content() + .entrySet() + .stream() + .map(e -> + deployArg.tempStorage.store(e.getKey(), e.getValue()) + .thenCompose(path -> doDeploy(deployArg.unitFolder, e.getKey(), path)) + ).toArray(CompletableFuture[]::new); + return allOf(array).handle(((unused, throwable) -> throwable == null)); + } + + @Override + public CompletableFuture<Boolean> processContentWithUnzip(ZipDeploymentUnit unit, DeployArg deployArg) { ZipInputStream zis = unit.zis(); - ZipEntry ze; - while ((ze = zis.getNextEntry()) != null) { - if (ze.isDirectory()) { + try { + ZipEntry nextEntry = zis.getNextEntry(); + return processZipFile(nextEntry, zis, deployArg) + .handle((unused, throwable) -> throwable == null); + } catch (IOException e) { + return falseCompletedFuture(); + } + } + + private CompletableFuture<Void> processZipFile(@Nullable ZipEntry ze, ZipInputStream zis, DeployArg deployArg) { + if (ze == null) { + return nullCompletedFuture(); + } + String entryName = ze.getName(); + if (ze.isDirectory()) { + try { // To support empty dirs. - Path entryPath = unitFolder.resolve(ze.getName()); + Path entryPath = deployArg.unitFolder.resolve(entryName); Files.createDirectories(entryPath); - } else { - doDeploy(unitFolder, ze.getName(), zis); + return processZipFile(zis.getNextEntry(), zis, deployArg); + } catch (IOException e) { + return failedFuture(e); } + } else { + return deployArg.tempStorage.store(entryName, zis) + .thenCompose(path -> doDeploy(deployArg.unitFolder, entryName, path)) + .thenCompose(unused -> { + try { + return processZipFile(zis.getNextEntry(), zis, deployArg); + } catch (IOException e) { + return failedFuture(e); + } + }); } } - private static void doDeploy(Path unitFolder, String entryName, InputStream is) throws IOException { - Path unitPath = unitFolder.resolve(entryName); - Files.createDirectories(unitPath.getParent()); - Path unitPathTmp = unitFolder.resolve(entryName + TMP_SUFFIX); - Files.copy(is, unitPathTmp, REPLACE_EXISTING); - Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING); + private CompletableFuture<Boolean> wrap(RunnableX runnableX) { + return CompletableFuture.supplyAsync(() -> { + try { + runnableX.run(); + return true; + } catch (Throwable t) { + LOG.error("Failed to process deploy action.", t); + return false; + } + }, executor); + } + + private CompletableFuture<Boolean> doDeploy(Path unitFolder, String entryName, Path deployment) { + return wrap(() -> { + Path unitPath = unitFolder.resolve(entryName); + Files.createDirectories(unitPath.getParent()); + Files.move(deployment, unitPath, ATOMIC_MOVE, REPLACE_EXISTING); + }); + } + + static class DeployArg { + private final Path unitFolder; + + private final TempStorage tempStorage; + + DeployArg(Path unitFolder, TempStorage tempStorage) { + this.unitFolder = unitFolder; + this.tempStorage = tempStorage; + } } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java index 51ecc3f8370..bd7ac3c8620 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback; import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener; import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus; import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorageProvider; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.ComponentContext; @@ -412,7 +413,8 @@ public class DeploymentManagerImpl implements IgniteDeployment { @Override public CompletableFuture<Void> startAsync(ComponentContext componentContext) { Path deploymentUnitFolder = workDir.resolve(configuration.location().value()); - deployer.initUnitsFolder(deploymentUnitFolder); + Path deploymentUnitTmp = workDir.resolve(configuration.tempLocation().value()); + deployer.initUnitsFolder(deploymentUnitFolder, deploymentUnitTmp); deploymentUnitStore.registerNodeStatusListener(nodeStatusWatchListener); deploymentUnitStore.registerClusterStatusListener(clusterStatusWatchListener); messaging.subscribe(); @@ -472,4 +474,8 @@ public class DeploymentManagerImpl implements IgniteDeployment { public DeploymentUnitAccessor deploymentUnitAccessor() { return deploymentUnitAccessor; } + + public TempStorageProvider tempStorageProvider() { + return deployer.tempStorageProvider(); + } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java index 0d86117097f..9c10e46658e 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.deployunit; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * Interface representing a deployment unit in the Apache Ignite code deployment system. @@ -45,10 +46,9 @@ public interface DeploymentUnit extends AutoCloseable { * * * @param <T> the type of argument passed to the processor + * @param <R> the type of result * @param processor the processor that will handle the deployment unit content - * @param unitFolder the argument to be passed to the processor during processing - * @throws IOException if an I/O error occurs during processing, such as issues reading - * deployment unit content or writing processed results. + * @param arg the argument to be passed to the processor during processing */ - <T> void process(DeploymentUnitProcessor<T> processor, T unitFolder) throws IOException; + <T, R> CompletableFuture<R> process(DeploymentUnitProcessor<T, R> processor, T arg); } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java index 8b768d3536c..c5ba1862abc 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.deployunit; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * Processor interface for handling deployment unit content operations. @@ -29,7 +30,7 @@ import java.io.IOException; * * @param <T> the type of argument passed to the processing methods */ -public interface DeploymentUnitProcessor<T> { +public interface DeploymentUnitProcessor<T, R> { /** * Processes the content of a regular deployment unit. * @@ -41,7 +42,9 @@ public interface DeploymentUnitProcessor<T> { * @param arg the argument to be used during processing * @throws IOException if an I/O error occurs during processing */ - void processContent(FilesDeploymentUnit unit, T arg) throws IOException; + CompletableFuture<R> processFilesContent(FilesDeploymentUnit unit, T arg); + + CompletableFuture<R> processStreamContent(StreamDeploymentUnit unit, T arg); /** * Processes the content of a ZIP-based deployment unit with automatic extraction. @@ -55,5 +58,5 @@ public interface DeploymentUnitProcessor<T> { * @param arg the argument to be used during processing * @throws IOException if an I/O error occurs during processing or extraction */ - void processContentWithUnzip(ZipDeploymentUnit unit, T arg) throws IOException; + CompletableFuture<R> processContentWithUnzip(ZipDeploymentUnit unit, T arg); } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java index 863b14715d7..e313300e14b 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.deployunit; +import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; + import java.io.IOException; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -28,8 +30,13 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.BiConsumer; import org.apache.ignite.deployment.version.Version; +import org.apache.ignite.internal.deployunit.DeployerProcessor.DeployArg; import org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorage; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorageProvider; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorageProviderImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.thread.IgniteThreadFactory; @@ -43,6 +50,8 @@ public class FileDeployerService { private static final int DEPLOYMENT_EXECUTOR_SIZE = 4; + private final TempStorageProviderImpl tempStorageProvider; + /** * Folder for units. */ @@ -50,18 +59,22 @@ public class FileDeployerService { private final ExecutorService executor; - private final DeploymentUnitProcessor<Path> deployProcessor = new DeployerProcessor(); + private final DeploymentUnitProcessor<DeployArg, Boolean> deployProcessor; /** Constructor. */ public FileDeployerService(String nodeName) { - executor = Executors.newFixedThreadPool( + this.executor = Executors.newFixedThreadPool( DEPLOYMENT_EXECUTOR_SIZE, IgniteThreadFactory.create(nodeName, "deployment", LOG) ); + + tempStorageProvider = new TempStorageProviderImpl(executor); + deployProcessor = new DeployerProcessor(executor); } - public void initUnitsFolder(Path unitsFolder) { + public void initUnitsFolder(Path unitsFolder, Path tempFolder) { this.unitsFolder = unitsFolder; + tempStorageProvider.init(tempFolder); } /** @@ -73,19 +86,17 @@ public class FileDeployerService { * @return Future with deploy result. */ public CompletableFuture<Boolean> deploy(String id, Version version, DeploymentUnit deploymentUnit) { - return CompletableFuture.supplyAsync(() -> { - try { - Path unitFolder = unitPath(id, version); - Files.createDirectories(unitFolder); - - deploymentUnit.process(deployProcessor, unitFolder); - - return true; - } catch (IOException e) { - LOG.error("Failed to deploy unit " + id + ":" + version, e); - return false; - } - }, executor); + try { + Path unitFolder = unitPath(id, version); + Files.createDirectories(unitFolder); + TempStorage storage = tempStorageProvider.tempStorage(id, version); + return deploymentUnit.process(deployProcessor, new DeployArg(unitFolder, storage)) + .whenComplete((unused, t) -> storage.close()); + + } catch (IOException e) { + LOG.error("Failed to deploy unit " + id + ":" + version, e); + return falseCompletedFuture(); + } } /** @@ -101,7 +112,7 @@ public class FileDeployerService { IgniteUtils.deleteIfExistsThrowable(unitPath(id, version)); return true; } catch (IOException e) { - LOG.debug("Failed to undeploy unit " + id + ":" + version, e); + LOG.error("Failed to undeploy unit " + id + ":" + version, e); return false; } }, executor); @@ -128,7 +139,7 @@ public class FileDeployerService { } }); } catch (IOException e) { - LOG.debug("Failed to get content for unit " + id + ":" + version, e); + LOG.error("Failed to get content for unit " + id + ":" + version, e); } return new UnitContent(result); }, executor); @@ -168,4 +179,8 @@ public class FileDeployerService { public void stop() { executor.shutdown(); } + + public TempStorageProvider tempStorageProvider() { + return tempStorageProvider; + } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java index 3d01bd2f07d..f52cad7d649 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java @@ -17,48 +17,37 @@ package org.apache.ignite.internal.deployunit; -import static org.apache.ignite.internal.util.IgniteUtils.closeAll; - -import java.io.IOException; -import java.io.InputStream; +import java.nio.file.Path; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** - * Standard implementation of {@link DeploymentUnit} that handles regular (non-compressed) deployment content. - * - * <p>This class represents a deployment unit containing a collection of files, where each file is - * represented as a mapping from file name to its corresponding {@link InputStream}. This implementation is designed for straightforward - * deployment scenarios where the content does not require compression or special extraction handling. - * + * Implementation of {@link DeploymentUnit} that handles regular deployment content in local FS path. */ public class FilesDeploymentUnit implements DeploymentUnit { - /** - * The deployment unit content represented as a mapping from file names to their input streams. Each entry represents a file within the - * deployment unit. - */ - private final Map<String, InputStream> content; + private final Map<String, Path> content; /** * Constructor. */ - public FilesDeploymentUnit(Map<String, InputStream> content) { + public FilesDeploymentUnit(Map<String, Path> content) { this.content = content; } /** * Returns the deployment unit content as a map of file names to input streams. */ - public Map<String, InputStream> content() { + public Map<String, Path> content() { return content; } @Override public void close() throws Exception { - closeAll(content.values()); + } @Override - public <T> void process(DeploymentUnitProcessor<T> processor, T arg) throws IOException { - processor.processContent(this, arg); + public <T, R> CompletableFuture<R> process(DeploymentUnitProcessor<T, R> processor, T arg) { + return processor.processFilesContent(this, arg); } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StreamDeploymentUnit.java similarity index 78% copy from modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java copy to modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StreamDeploymentUnit.java index 3d01bd2f07d..fbf8cd1e54a 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StreamDeploymentUnit.java @@ -19,29 +19,27 @@ package org.apache.ignite.internal.deployunit; import static org.apache.ignite.internal.util.IgniteUtils.closeAll; -import java.io.IOException; import java.io.InputStream; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** - * Standard implementation of {@link DeploymentUnit} that handles regular (non-compressed) deployment content. + * Implementation of {@link DeploymentUnit} that handles regular deployment content in stream format. * * <p>This class represents a deployment unit containing a collection of files, where each file is * represented as a mapping from file name to its corresponding {@link InputStream}. This implementation is designed for straightforward * deployment scenarios where the content does not require compression or special extraction handling. - * */ -public class FilesDeploymentUnit implements DeploymentUnit { +public class StreamDeploymentUnit implements DeploymentUnit { /** - * The deployment unit content represented as a mapping from file names to their input streams. Each entry represents a file within the - * deployment unit. + * The deployment unit content represented as a mapping from file names to their input streams. */ private final Map<String, InputStream> content; /** * Constructor. */ - public FilesDeploymentUnit(Map<String, InputStream> content) { + public StreamDeploymentUnit(Map<String, InputStream> content) { this.content = content; } @@ -58,7 +56,7 @@ public class FilesDeploymentUnit implements DeploymentUnit { } @Override - public <T> void process(DeploymentUnitProcessor<T> processor, T arg) throws IOException { - processor.processContent(this, arg); + public <T, R> CompletableFuture<R> process(DeploymentUnitProcessor<T, R> processor, T arg) { + return processor.processStreamContent(this, arg); } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java index 7cd6e7a3b5e..109526f673b 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java @@ -86,6 +86,6 @@ public class UnitContent implements Iterable<Entry<String, byte[]>> { content.iterator().forEachRemaining(it -> { files.put(it.getKey(), new ByteArrayInputStream(it.getValue())); }); - return new FilesDeploymentUnit(files); + return new StreamDeploymentUnit(files); } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java index 78f5874e25b..5d3db1fa139 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.deployunit; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.zip.ZipInputStream; /** @@ -45,11 +46,10 @@ public class ZipDeploymentUnit implements DeploymentUnit { * @param processor the processor that will handle both regular and ZIP content; * must implement both {@code processContent} and {@code processContentWithUnzip} methods. * @param arg the argument to be passed to the processor during both processing phases. - * @throws IOException if an I/O error occurs during either processing phase. */ @Override - public <T> void process(DeploymentUnitProcessor<T> processor, T arg) throws IOException { - processor.processContentWithUnzip(this, arg); + public <T, R> CompletableFuture<R> process(DeploymentUnitProcessor<T, R> processor, T arg) { + return processor.processContentWithUnzip(this, arg); } /** diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java index 929a9612671..fe5b5d445b0 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java @@ -32,4 +32,7 @@ public class DeploymentConfigurationSchema { @Value(hasDefault = true) @PublicName(legacyNames = "deploymentLocation") public final String location = "deployment"; + + @Value(hasDefault = true) + public final String tempLocation = "deployment.tmp"; } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitWriteException.java similarity index 58% copy from modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java copy to modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitWriteException.java index 929a9612671..fca90f4a91b 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitWriteException.java @@ -15,21 +15,23 @@ * limitations under the License. */ -package org.apache.ignite.internal.deployunit.configuration; -import org.apache.ignite.configuration.annotation.Config; -import org.apache.ignite.configuration.annotation.PublicName; -import org.apache.ignite.configuration.annotation.Value; +package org.apache.ignite.internal.deployunit.exception; + +import static org.apache.ignite.lang.ErrorGroups.CodeDeployment.UNIT_WRITE_ERR; + +import org.apache.ignite.lang.IgniteException; /** - * Configuration schema for Compute functionality. + * Throws when deployment unit content failed to write. */ -@Config -public class DeploymentConfigurationSchema { +public class DeploymentUnitWriteException extends IgniteException { + private static final long serialVersionUID = 232238720847777174L; + /** - * Relative path to folder in node working directory where will store all deployment units content. + * Constructor. */ - @Value(hasDefault = true) - @PublicName(legacyNames = "deploymentLocation") - public final String location = "deployment"; + public DeploymentUnitWriteException(String message, Exception cause) { + super(UNIT_WRITE_ERR, message, cause); + } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorage.java similarity index 57% copy from modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java copy to modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorage.java index 929a9612671..6c7f40e6fc9 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorage.java @@ -15,21 +15,17 @@ * limitations under the License. */ -package org.apache.ignite.internal.deployunit.configuration; +package org.apache.ignite.internal.deployunit.tempstorage; -import org.apache.ignite.configuration.annotation.Config; -import org.apache.ignite.configuration.annotation.PublicName; -import org.apache.ignite.configuration.annotation.Value; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; -/** - * Configuration schema for Compute functionality. - */ -@Config -public class DeploymentConfigurationSchema { - /** - * Relative path to folder in node working directory where will store all deployment units content. - */ - @Value(hasDefault = true) - @PublicName(legacyNames = "deploymentLocation") - public final String location = "deployment"; +public interface TempStorage { + + CompletableFuture<Path> store(String fileName, InputStream is); + + void rollback(); + + void close(); } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageImpl.java new file mode 100644 index 00000000000..2f712d9adf4 --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit.tempstorage; + +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.apache.ignite.internal.deployunit.exception.DeploymentUnitWriteException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.IgniteUtils; + +public class TempStorageImpl implements TempStorage { + private static final IgniteLogger LOG = Loggers.forClass(TempStorageImpl.class); + + private final Path storageDir; + + private final Map<String, CompletableFuture<Path>> map = new HashMap<>(); + + private final Executor executor; + + public TempStorageImpl(Path storageDir, Executor executor) { + this.storageDir = storageDir; + this.executor = executor; + } + + @Override + public CompletableFuture<Path> store(String fileName, InputStream is) { + CompletableFuture<Path> result = supplyAsync(() -> { + try { + Path path = Path.of(fileName); + Path parent = path.getParent(); + if (parent != null) { + Files.createDirectories(storageDir.resolve(parent)); + } + Path resolve = storageDir.resolve(path); + Files.copy(is, resolve, StandardCopyOption.REPLACE_EXISTING); + return resolve; + } catch (Exception e) { + LOG.error("Failed to process unit storage action.", e); + throw new DeploymentUnitWriteException("Failed to write unit to storage.", e); + } + }, + executor); + + map.put(fileName, result); + return result; + } + + @Override + public void rollback() { + map.values().forEach(f -> f.cancel(true)); + } + + @Override + public void close() { + IgniteUtils.deleteIfExists(storageDir); + } +} diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageProvider.java similarity index 57% copy from modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java copy to modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageProvider.java index 929a9612671..9adfc59ee4b 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/configuration/DeploymentConfigurationSchema.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageProvider.java @@ -15,21 +15,10 @@ * limitations under the License. */ -package org.apache.ignite.internal.deployunit.configuration; +package org.apache.ignite.internal.deployunit.tempstorage; -import org.apache.ignite.configuration.annotation.Config; -import org.apache.ignite.configuration.annotation.PublicName; -import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.deployment.version.Version; -/** - * Configuration schema for Compute functionality. - */ -@Config -public class DeploymentConfigurationSchema { - /** - * Relative path to folder in node working directory where will store all deployment units content. - */ - @Value(hasDefault = true) - @PublicName(legacyNames = "deploymentLocation") - public final String location = "deployment"; +public interface TempStorageProvider { + TempStorage tempStorage(String id, Version version); } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageProviderImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageProviderImpl.java new file mode 100644 index 00000000000..4b10ceeb297 --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/tempstorage/TempStorageProviderImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit.tempstorage; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.Executor; +import org.apache.ignite.deployment.version.Version; +import org.apache.ignite.internal.deployunit.CachedDeploymentUnit; +import org.apache.ignite.internal.deployunit.exception.DeploymentUnitWriteException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; + +public class TempStorageProviderImpl implements TempStorageProvider { + private static final IgniteLogger LOG = Loggers.forClass(TempStorageProviderImpl.class); + + private final Executor executor; + + private Path storageDir; + + public TempStorageProviderImpl(Executor executor) { + this.executor = executor; + } + + public void init(Path storageDir) { + this.storageDir = storageDir; + } + + @Override + public TempStorage tempStorage(String id, Version version) { + try { + Path storageDir = this.storageDir.resolve(id).resolve(version.render()); + Files.createDirectories(storageDir); + return new TempStorageImpl(storageDir, executor); + } catch (IOException ex) { + LOG.error("Failed to create temp storage {} with id {} and version {}", ex, storageDir, id, version); + throw new DeploymentUnitWriteException("Failed to create deployemnt unit temp storage.", ex); + } + } +} diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java index fb225ef8cb5..f3d059aaa61 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java @@ -36,6 +36,7 @@ import java.util.stream.Stream; import org.apache.ignite.internal.deployunit.DeploymentUnit; import org.apache.ignite.internal.deployunit.FileDeployerService; import org.apache.ignite.internal.deployunit.FilesDeploymentUnit; +import org.apache.ignite.internal.deployunit.StreamDeploymentUnit; import org.apache.ignite.internal.deployunit.UnitContent; import org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException; import org.apache.ignite.internal.testframework.IgniteTestUtils; @@ -61,7 +62,9 @@ public class FileDeployerServiceTest { @BeforeEach public void setup() throws IOException { - service.initUnitsFolder(workDir); + Path deployment = workDir.resolve("deployment"); + Path tempDeployment = workDir.resolve("tempDeployment"); + service.initUnitsFolder(deployment, tempDeployment); file1 = workDir.resolve("file1"); file2 = workDir.resolve("file2"); @@ -73,18 +76,18 @@ public class FileDeployerServiceTest { @Test public void test() throws Exception { - try (FilesDeploymentUnit unit = content()) { + try (StreamDeploymentUnit unit = content()) { CompletableFuture<Boolean> deployed = service.deploy("id", parseVersion("1.0.0"), unit); assertThat(deployed, willBe(true)); } - try (FilesDeploymentUnit unit = content()) { + try (StreamDeploymentUnit unit = content()) { CompletableFuture<UnitContent> unitContent = service.getUnitContent("id", parseVersion("1.0.0")); assertThat(unitContent, willBe(equalTo(readContent(unit)))); } } - private FilesDeploymentUnit content() { + private StreamDeploymentUnit content() { Map<String, InputStream> map = Stream.of(file1, file2, file3) .collect(Collectors.toMap(it -> it.getFileName().toString(), it -> { try { @@ -98,7 +101,7 @@ public class FileDeployerServiceTest { } })); - return new FilesDeploymentUnit(map); + return new StreamDeploymentUnit(map); } /** @@ -107,7 +110,7 @@ public class FileDeployerServiceTest { * @param deploymentUnit Deployment unit instance. * @return Unit content from provided deployment unit. */ - private static UnitContent readContent(FilesDeploymentUnit deploymentUnit) { + private static UnitContent readContent(StreamDeploymentUnit deploymentUnit) { Map<String, byte[]> map = deploymentUnit.content().entrySet().stream() .collect(Collectors.toMap(Entry::getKey, entry -> { try { diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java index dc14070463b..48cd4f67953 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java @@ -53,6 +53,7 @@ import org.apache.ignite.deployment.version.Version; import org.apache.ignite.internal.deployunit.FilesDeploymentUnit; import org.apache.ignite.internal.deployunit.IgniteDeployment; import org.apache.ignite.internal.deployunit.NodesToDeploy; +import org.apache.ignite.internal.deployunit.StreamDeploymentUnit; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -257,7 +258,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest { CompletableFuture<Boolean> deployed = deployment.deployAsync( unitId, unitVersion, - new FilesDeploymentUnit(Map.of(jarName, jarStream)), + new StreamDeploymentUnit(Map.of(jarName, jarStream)), new NodesToDeploy(MAJORITY) ); diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java index e1586575f0d..5d11721d2f9 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java @@ -51,6 +51,8 @@ import org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundExc import org.apache.ignite.internal.deployunit.exception.DeploymentUnitUnavailableException; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -60,10 +62,14 @@ import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) +@ExtendWith(WorkDirectoryExtension.class) class JobContextManagerTest extends BaseIgniteAbstractTest { private final Path unitsDir = getPath(JobClassLoaderFactory.class.getClassLoader().getResource("units")); + @WorkDirectory + private Path workDir; + @Spy private IgniteDeployment deployment = new DummyIgniteDeployment(unitsDir); @@ -75,7 +81,7 @@ class JobContextManagerTest extends BaseIgniteAbstractTest { @BeforeEach void setUp() { FileDeployerService deployerService = new FileDeployerService("test"); - deployerService.initUnitsFolder(unitsDir); + deployerService.initUnitsFolder(unitsDir, workDir.resolve("tempDeployment")); classLoaderManager = new JobContextManager( deployment, diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CodeDeploymentRestFactory.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CodeDeploymentRestFactory.java index 932ba9452fd..87ada81a026 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CodeDeploymentRestFactory.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CodeDeploymentRestFactory.java @@ -21,6 +21,7 @@ import io.micronaut.context.annotation.Bean; import io.micronaut.context.annotation.Factory; import jakarta.inject.Singleton; import org.apache.ignite.internal.deployunit.IgniteDeployment; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorageProvider; import org.apache.ignite.internal.rest.RestFactory; /** @@ -30,8 +31,11 @@ import org.apache.ignite.internal.rest.RestFactory; public class CodeDeploymentRestFactory implements RestFactory { private IgniteDeployment igniteDeployment; - public CodeDeploymentRestFactory(IgniteDeployment igniteDeployment) { + private TempStorageProvider tempStorageProvider; + + public CodeDeploymentRestFactory(IgniteDeployment igniteDeployment, TempStorageProvider tempStorageProvider) { this.igniteDeployment = igniteDeployment; + this.tempStorageProvider = tempStorageProvider; } @Bean @@ -40,8 +44,15 @@ public class CodeDeploymentRestFactory implements RestFactory { return igniteDeployment; } + @Bean + @Singleton + public TempStorageProvider storageProvider() { + return tempStorageProvider; + } + @Override public void cleanResources() { igniteDeployment = null; + tempStorageProvider = null; } } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java index 39aa2ef1c72..daa82a8eb01 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java @@ -21,6 +21,7 @@ import io.micronaut.http.multipart.CompletedFileUpload; import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.deployunit.DeploymentUnit; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorage; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.reactivestreams.Subscriber; @@ -39,8 +40,10 @@ class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> { private Throwable ex; - public CompletedFileUploadSubscriber(boolean unzip) { - this.collector = unzip ? new ZipInputStreamCollector() : new InputStreamCollectorImpl(); + public CompletedFileUploadSubscriber(TempStorage tempStorage, boolean unzip) { + this.collector = unzip + ? new ZipInputStreamCollector(tempStorage) + : new InputStreamCollectorImpl(tempStorage); } @Override @@ -61,7 +64,7 @@ class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> { @Override public void onError(Throwable throwable) { try { - collector.close(); + collector.rollback(); } catch (Exception e) { suppressException(e); } @@ -80,7 +83,7 @@ class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> { } catch (Exception e) { suppressException(e); try { - collector.close(); + collector.rollback(); } catch (Exception e2) { suppressException(e2); } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java index 762a6d89304..75b35cb9137 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java @@ -34,6 +34,9 @@ import org.apache.ignite.deployment.version.Version; import org.apache.ignite.internal.deployunit.IgniteDeployment; import org.apache.ignite.internal.deployunit.NodesToDeploy; import org.apache.ignite.internal.deployunit.UnitStatuses; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorage; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorageProvider; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorageProviderImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.rest.ResourceHolder; @@ -42,6 +45,7 @@ import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus; import org.apache.ignite.internal.rest.api.deployment.InitialDeployMode; import org.apache.ignite.internal.rest.api.deployment.UnitStatus; import org.apache.ignite.internal.rest.api.deployment.UnitVersionStatus; +import org.apache.ignite.internal.util.IgniteUtils; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; @@ -55,8 +59,11 @@ public class DeploymentManagementController implements DeploymentCodeApi, Resour private IgniteDeployment deployment; - public DeploymentManagementController(IgniteDeployment deployment) { + private TempStorageProvider tempStorageProvider; + + public DeploymentManagementController(IgniteDeployment deployment, TempStorageProvider tempStorageProvider) { this.deployment = deployment; + this.tempStorageProvider = tempStorageProvider; } @Override @@ -84,15 +91,19 @@ public class DeploymentManagementController implements DeploymentCodeApi, Resour Optional<List<String>> initialNodes, boolean zip ) { - CompletedFileUploadSubscriber subscriber = new CompletedFileUploadSubscriber(zip); + Version version = parseVersion(unitVersion); + + TempStorage tempStorage = tempStorageProvider.tempStorage(unitId, version); + CompletedFileUploadSubscriber subscriber = new CompletedFileUploadSubscriber(tempStorage, zip); unitContent.subscribe(subscriber); NodesToDeploy nodesToDeploy = initialNodes.map(NodesToDeploy::new) .orElseGet(() -> new NodesToDeploy(fromInitialDeployMode(deployMode))); return subscriber.result().thenCompose(deploymentUnit -> - deployment.deployAsync(unitId, parseVersion(unitVersion), deploymentUnit, nodesToDeploy) + deployment.deployAsync(unitId, version, deploymentUnit, nodesToDeploy) .whenComplete((unitStatus, throwable) -> { + tempStorage.close(); try { deploymentUnit.close(); } catch (Exception e) { @@ -232,5 +243,6 @@ public class DeploymentManagementController implements DeploymentCodeApi, Resour @Override public void cleanResources() { deployment = null; + tempStorageProvider = null; } } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java index 5f382f1902d..827f7786292 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java @@ -31,7 +31,7 @@ import org.apache.ignite.internal.deployunit.DeploymentUnit; * handlers, and other scenarios where deployment content is received in stream format and * needs to be converted into deployment units for processing. */ -public interface InputStreamCollector extends AutoCloseable { +public interface InputStreamCollector { /** * Adds an input stream with the specified filename to the collection. * @@ -60,4 +60,6 @@ public interface InputStreamCollector extends AutoCloseable { * @return a deployment unit containing all collected input streams; never {@code null}. */ DeploymentUnit toDeploymentUnit(); + + void rollback() throws Exception; } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java index a6d441b012a..94e9f6a73a9 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java @@ -17,41 +17,65 @@ package org.apache.ignite.internal.rest.deployment; +import static java.util.concurrent.CompletableFuture.allOf; import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; import org.apache.ignite.internal.deployunit.DeploymentUnit; import org.apache.ignite.internal.deployunit.FilesDeploymentUnit; +import org.apache.ignite.internal.deployunit.CachedDeploymentUnit; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorage; /** * Standard implementation of {@link InputStreamCollector} for collecting regular file content. * * <p>This implementation provides a straightforward approach to collecting input streams and - * converting them into a standard {@link FilesDeploymentUnit}. It maintains an internal map - * of filename-to-stream associations and creates deployment units containing regular - * (non-compressed) file content. + * converting them into a standard {@link FilesDeploymentUnit}. It maintains an internal map of filename-to-stream associations and creates + * deployment units containing regular (non-compressed) file content. */ public class InputStreamCollectorImpl implements InputStreamCollector { - /** - * Internal storage for collected input streams mapped by their filenames. - * The map maintains the association between logical file paths and their content streams. - */ - private final Map<String, InputStream> content = new HashMap<>(); + private final Map<String, CompletableFuture<Path>> collect = new HashMap<>(); + + private final TempStorage tempStorage; + + public InputStreamCollectorImpl(TempStorage tempStorage) { + this.tempStorage = tempStorage; + } @Override public void addInputStream(String filename, InputStream is) { - content.put(filename, is); + collect.put(filename, tempStorage.store(filename, is).whenComplete((path, throwable) -> { + try { + is.close(); + } catch (IOException e) { + + } + })); } @Override public DeploymentUnit toDeploymentUnit() { - return new FilesDeploymentUnit(content); + Map<String, Path> map = new ConcurrentHashMap<>(); + for (Entry<String, CompletableFuture<Path>> e : collect.entrySet()) { + e.getValue().thenAccept(path -> map.put(e.getKey(), path)); + } + + return new CachedDeploymentUnit( + allOf(collect.values().toArray(new CompletableFuture<?>[0])) + .thenApply(unused -> new FilesDeploymentUnit(map)) + ); } @Override - public void close() throws Exception { - closeAll(content.values()); + public void rollback() throws Exception { + tempStorage.rollback(); } } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java index 058269cc148..894409ce571 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java @@ -20,11 +20,17 @@ package org.apache.ignite.internal.rest.deployment; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import java.util.zip.ZipInputStream; import org.apache.ignite.internal.deployunit.DeploymentUnit; +import org.apache.ignite.internal.deployunit.CachedDeploymentUnit; import org.apache.ignite.internal.deployunit.ZipDeploymentUnit; import org.apache.ignite.internal.deployunit.exception.DeploymentUnitZipException; +import org.apache.ignite.internal.deployunit.tempstorage.TempStorage; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.lang.IgniteException; @@ -38,13 +44,19 @@ public class ZipInputStreamCollector implements InputStreamCollector { private static final IgniteLogger LOG = Loggers.forClass(ZipInputStreamCollector.class); private static final byte[] ZIP_MAGIC_HEADER = {0x50, 0x4b, 0x03, 0x04}; - private ZipInputStream zis; + private final TempStorage tempStorage; private IgniteException igniteException; + private CompletableFuture<Path> future; + + public ZipInputStreamCollector(TempStorage tempStorage) { + this.tempStorage = tempStorage; + } + @Override public void addInputStream(String filename, InputStream is) { - if (zis != null || igniteException != null) { + if (future != null || igniteException != null) { // We don't need the stream anymore, so we close it to avoid resource leak. safeClose(is); if (igniteException == null) { @@ -56,7 +68,15 @@ public class ZipInputStreamCollector implements InputStreamCollector { InputStream result = is.markSupported() ? is : new BufferedInputStream(is); if (isZip(result)) { - zis = new ZipInputStream(result); + ZipInputStream zis = new ZipInputStream(result); + future = tempStorage.store(filename, zis) + .whenComplete((path, throwable) -> { + try { + zis.close(); + } catch (IOException e) { + LOG.error("Error with closing"); + } + }); } else { safeClose(result); igniteException = new DeploymentUnitZipException("Only zip file is supported."); @@ -82,10 +102,8 @@ public class ZipInputStreamCollector implements InputStreamCollector { } @Override - public void close() throws Exception { - if (zis != null) { - zis.close(); - } + public void rollback() throws Exception { + tempStorage.rollback(); } @Override @@ -93,6 +111,13 @@ public class ZipInputStreamCollector implements InputStreamCollector { if (igniteException != null) { throw igniteException; } - return new ZipDeploymentUnit(zis); + + return new CachedDeploymentUnit(future.thenApply(zip -> { + try { + return new ZipDeploymentUnit(new ZipInputStream(Files.newInputStream(zip))); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); } } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/exception/handler/DeploymentUnitWriteExceptionHandler.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/exception/handler/DeploymentUnitWriteExceptionHandler.java new file mode 100644 index 00000000000..1f47fe0b350 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/exception/handler/DeploymentUnitWriteExceptionHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.deployment.exception.handler; + +import io.micronaut.context.annotation.Requires; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.server.exceptions.ExceptionHandler; +import jakarta.inject.Singleton; +import org.apache.ignite.internal.deployunit.exception.DeploymentUnitWriteException; +import org.apache.ignite.internal.rest.api.Problem; +import org.apache.ignite.internal.rest.constants.HttpCode; +import org.apache.ignite.internal.rest.problem.HttpProblemResponse; + +@Singleton +@Requires(classes = {DeploymentUnitWriteException.class, ExceptionHandler.class}) +public class DeploymentUnitWriteExceptionHandler + implements ExceptionHandler<DeploymentUnitWriteException, HttpResponse<? extends Problem>> { + + @Override + public HttpResponse<? extends Problem> handle(HttpRequest request, DeploymentUnitWriteException exception) { + return HttpProblemResponse.from( + Problem.fromHttpCode(HttpCode.INTERNAL_SERVER_ERROR) + .detail(exception.getMessage()).build() + ); + } +} diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index cf511826f16..3669928a4f9 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -435,7 +435,7 @@ public class IgniteImpl implements Ignite { /** Metric messaging. */ private final MetricMessaging metricMessaging; - private final IgniteDeployment deploymentManager; + private final DeploymentManagerImpl deploymentManager; private final DistributionZoneManager distributionZoneManager; @@ -1408,7 +1408,8 @@ public class IgniteImpl implements Ignite { new JdbcPortProviderImpl(nodeCfgMgr.configurationRegistry())); Supplier<RestFactory> metricRestFactory = () -> new MetricRestFactory(metricManager, metricMessaging); Supplier<RestFactory> authProviderFactory = () -> new AuthenticationProviderFactory(authenticationManager); - Supplier<RestFactory> deploymentCodeRestFactory = () -> new CodeDeploymentRestFactory(deploymentManager); + Supplier<RestFactory> deploymentCodeRestFactory = + () -> new CodeDeploymentRestFactory(deploymentManager, deploymentManager.tempStorageProvider()); Supplier<RestFactory> restManagerFactory = () -> new RestManagerFactory(restManager); Supplier<RestFactory> computeRestFactory = () -> new ComputeRestFactory(compute); Supplier<RestFactory> disasterRecoveryFactory = () -> new DisasterRecoveryFactory(disasterRecoveryManager);
