This is an automated email from the ASF dual-hosted git repository. mpochatkin pushed a commit to branch IGNITE-26455 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 3fce6b73728b272a3dd42e70f7989ea995798ff8 Author: Pochatkin Mikhail <m.a.pochat...@gmail.com> AuthorDate: Fri Sep 19 11:27:16 2025 +0300 IGNITE-26455 Add support for deploy units with folders structure --- .../ignite/internal/deployment/DeployFile.java | 9 +- .../ignite/internal/deployment/DeployFiles.java | 121 ++++++++++++++++++--- .../internal/deployment/ItDeploymentUnitTest.java | 31 ++++++ .../deployunit/DeployDeploymentUnitProcessor.java | 95 ++++++++++++++++ .../ignite/internal/deployunit/DeploymentUnit.java | 52 +++++---- .../internal/deployunit/DeploymentUnitImpl.java | 70 ++++++++++++ .../deployunit/DeploymentUnitProcessor.java | 71 ++++++++++++ .../internal/deployunit/FileDeployerService.java | 25 ++--- .../ignite/internal/deployunit/UnitContent.java | 23 +--- .../internal/deployunit/ZipDeploymentUnit.java | 84 ++++++++++++++ .../ignite/deployment/FileDeployerServiceTest.java | 31 +++++- .../internal/compute/ItComputeTestStandalone.java | 3 +- .../testframework/WorkDirectoryExtension.java | 15 ++- .../rest/api/deployment/DeploymentCodeApi.java | 10 +- .../deployment/CompletedFileUploadSubscriber.java | 51 ++++----- .../deployment/DeploymentManagementController.java | 34 +++--- .../rest/deployment/InputStreamCollector.java | 72 ++++++++++++ .../rest/deployment/InputStreamCollectorImpl.java | 60 ++++++++++ .../rest/deployment/ZipInputStreamCollector.java | 102 +++++++++++++++++ 19 files changed, 830 insertions(+), 129 deletions(-) diff --git a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java index 071aa615a46..c76b9a84031 100644 --- a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java +++ b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java @@ -29,10 +29,13 @@ class DeployFile { private final int replicaTimeout; - DeployFile(Path file, long expectedSize, int replicaTimeout) throws IOException { + private final boolean zip; + + DeployFile(Path file, boolean zip, long expectedSize, int replicaTimeout) throws IOException { this.file = file; this.expectedSize = expectedSize; this.replicaTimeout = replicaTimeout; + this.zip = zip; ensureExists(); } @@ -46,6 +49,10 @@ class DeployFile { return file; } + boolean zip() { + return zip; + } + long expectedSize() { return expectedSize; } diff --git a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java index de5a1ffb265..68d368bdf82 100644 --- a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java +++ b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java @@ -18,26 +18,34 @@ package org.apache.ignite.internal.deployment; import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.fillDummyFile; +import static org.apache.ignite.internal.testframework.WorkDirectoryExtension.zipDirectory; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; import org.apache.ignite.deployment.version.Version; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.deployunit.DeploymentUnit; +import org.apache.ignite.internal.deployunit.DeploymentUnitImpl; import org.apache.ignite.internal.deployunit.NodesToDeploy; import org.apache.ignite.internal.deployunit.UnitStatuses; import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder; +import org.apache.ignite.internal.deployunit.ZipDeploymentUnit; class DeployFiles { private static final int BASE_REPLICA_TIMEOUT = 30; @@ -56,6 +64,10 @@ class DeployFiles { private DeployFile bigFile; + private DeployFile flatZipFile; + + private DeployFile treeZipFile; + // TODO https://issues.apache.org/jira/browse/IGNITE-20204 DeployFiles(Path workDir) { this.workDir = workDir; @@ -63,7 +75,30 @@ class DeployFiles { private static DeployFile create(Path path, long size, int replicaTimeout) { try { - return new DeployFile(path, size, replicaTimeout); + return new DeployFile(path, false, size, replicaTimeout); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private DeployFile createZip(Path path, Map<String, Long> content, int replicaTimeout) { + try { + Path zipTempFolder = workDir.resolve("tempDir"); + Files.createDirectories(zipTempFolder); + long size = 0; + for (Entry<String, Long> e : content.entrySet()) { + String zipEntryPath = e.getKey(); + Long entrySize = e.getValue(); + Path entry = zipTempFolder.resolve(zipEntryPath); + if (entrySize > 0) { + Files.createDirectories(entry.getParent()); + fillDummyFile(entry, entrySize); + } + size += entrySize; + } + zipDirectory(zipTempFolder, path); + deleteIfExists(zipTempFolder); + return new DeployFile(path, true, size, replicaTimeout); } catch (IOException e) { throw new RuntimeException(e); } @@ -90,6 +125,39 @@ class DeployFiles { return bigFile; } + DeployFile flatZipFile() { + if (flatZipFile == null) { + flatZipFile = createZip( + workDir.resolve("flat.zip"), + Map.of( + "a1", SMALL_IN_BYTES, + "a2", SMALL_IN_BYTES, + "a3", SMALL_IN_BYTES, + "a4", SMALL_IN_BYTES + ), + BASE_REPLICA_TIMEOUT + ); + } + return flatZipFile; + } + + DeployFile treeZipFile() { + if (treeZipFile == null) { + treeZipFile = createZip( + workDir.resolve("tree.zip"), + Map.of( + "a1/a2", SMALL_IN_BYTES, + "b1", SMALL_IN_BYTES, + "c1/c2/c3/c4", MEDIUM_IN_BYTES, + "d1/d2", SMALL_IN_BYTES, + "d1/a2", SMALL_IN_BYTES + ), + BASE_REPLICA_TIMEOUT * 3 + ); + } + return treeZipFile; + } + private Unit deployAndVerify(String id, Version version, DeployFile file, IgniteImpl entryNode) { return deployAndVerify(id, version, false, file, entryNode); } @@ -110,14 +178,8 @@ class DeployFiles { NodesToDeploy nodesToDeploy, IgniteImpl entryNode ) { - List<Path> paths = files.stream() - .map(DeployFile::file) - .collect(Collectors.toList()); - - CompletableFuture<Boolean> deploy; - - DeploymentUnit deploymentUnit = fromPaths(paths); - deploy = entryNode.deployment() + DeploymentUnit deploymentUnit = fromFiles(files); + CompletableFuture<Boolean> deploy = entryNode.deployment() .deployAsync(id, version, force, deploymentUnit, nodesToDeploy) .whenComplete((res, err) -> { try { @@ -134,8 +196,19 @@ class DeployFiles { Path nodeUnitDirectory = unit.getNodeUnitDirectory(entryNode); for (DeployFile file : files) { - Path filePath = nodeUnitDirectory.resolve(file.file().getFileName()); - assertTrue(Files.exists(filePath)); + if (file.zip()) { + try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(file.file()))) { + ZipEntry ze; + while ((ze = zis.getNextEntry()) != null) { + assertTrue(Files.exists(nodeUnitDirectory.resolve(ze.getName()))); + } + } catch (IOException e) { + fail(e); + } + } else { + Path filePath = nodeUnitDirectory.resolve(file.file().getFileName()); + assertTrue(Files.exists(filePath)); + } } return unit; @@ -153,6 +226,14 @@ class DeployFiles { return deployAndVerify(id, version, bigFile(), entryNode); } + public Unit deployAndVerifyFlatZip(String id, Version version, IgniteImpl entryNode) { + return deployAndVerify(id, version, flatZipFile(), entryNode); + } + + public Unit deployAndVerifyTreeZip(String id, Version version, IgniteImpl entryNode) { + return deployAndVerify(id, version, treeZipFile(), entryNode); + } + public static UnitStatuses buildStatus(String id, Unit... units) { UnitStatusesBuilder builder = UnitStatuses.builder(id); for (Unit unit : units) { @@ -169,16 +250,20 @@ class DeployFiles { Files.copy(file.file(), unitFile); } - private static DeploymentUnit fromPaths(List<Path> paths) { - Objects.requireNonNull(paths); + private static DeploymentUnit fromFiles(List<DeployFile> files) { Map<String, InputStream> map = new HashMap<>(); + List<ZipInputStream> zips = new ArrayList<>(); try { - for (Path path : paths) { - map.put(path.getFileName().toString(), Files.newInputStream(path)); + for (DeployFile file : files) { + if (file.zip()) { + zips.add(new ZipInputStream(Files.newInputStream(file.file()))); + } else { + map.put(file.file().getFileName().toString(), Files.newInputStream(file.file())); + } } } catch (IOException e) { throw new RuntimeException(e); } - return new DeploymentUnit(map); + return new ZipDeploymentUnit(new DeploymentUnitImpl(map), zips); } } diff --git a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java index fc5554e8baa..520da9f3dfa 100644 --- a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java +++ b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java @@ -281,4 +281,35 @@ public class ItDeploymentUnitTest extends ClusterPerTestIntegrationTest { smallUnit.waitUnitClean(igniteImpl(2)); } + + @Test + public void testZipDeploy() { + String id = "test"; + Unit unit = files.deployAndVerifyFlatZip(id, Version.parseVersion("1.1.0"), igniteImpl(1)); + + Unit unit2 = files.deployAndVerifyTreeZip(id, Version.parseVersion("1.1.1"), igniteImpl(1)); + + UnitStatuses status = buildStatus(id, unit, unit2); + + await().timeout(2, SECONDS) + .pollDelay(500, MILLISECONDS) + .until(() -> igniteImpl(2).deployment().clusterStatusesAsync(), willBe(List.of(status))); + } + + @Test + public void testZipAndFileDeploy() { + String id = "test"; + + Unit unit = files.deployAndVerify( + id, Version.parseVersion("1.1.0"), false, List.of(files.smallFile(), files.flatZipFile()), + new NodesToDeploy(List.of(igniteImpl(1).name())), + igniteImpl(0) + ); + + UnitStatuses status = buildStatus(id, unit); + + await().timeout(2, SECONDS) + .pollDelay(500, MILLISECONDS) + .until(() -> igniteImpl(2).deployment().clusterStatusesAsync(), willBe(List.of(status))); + } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployDeploymentUnitProcessor.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployDeploymentUnitProcessor.java new file mode 100644 index 00000000000..50e45d4a3f8 --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployDeploymentUnitProcessor.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map.Entry; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** + * Implementation of {@link DeploymentUnitProcessor} that deploys deployment unit content to the file system. + * + * <p>This processor extracts and deploys files from deployment units to a specified target directory. + * It handles both regular deployment units (containing individual files as input streams) and ZIP-based + * deployment units (containing compressed archives that need extraction). + * + * <p>The deployment process ensures atomic file operations by: + * <ul> + * <li>First copying files to temporary locations with a {@code .tmp} suffix</li> + * <li>Then atomically moving them to their final destinations</li> + * <li>Creating necessary parent directories as needed</li> + * </ul> + * + * <p>This approach prevents partial deployments and ensures that files are either fully deployed + * or not deployed at all, maintaining consistency during the deployment process. + * + * <p>Type parameters: + * <ul> + * <li>{@code Path} - the argument type representing the target deployment directory</li> + * <li>{@code Void} - the return type (no meaningful return value)</li> + * </ul> + */ +public class DeployDeploymentUnitProcessor implements DeploymentUnitProcessor<Path, Void> { + /** Suffix used for temporary files during the deployment process. */ + private static final String TMP_SUFFIX = ".tmp"; + + /** {@inheritDoc} */ + @Override + public Void processContent(DeploymentUnitImpl unit, Path unitFolder) throws IOException { + for (Entry<String, InputStream> e : unit.content().entrySet()) { + doDeploy(unitFolder, e.getKey(), e.getValue()); + } + return null; + } + + /** {@inheritDoc} */ + @Override + public Void processContentWithUnzip(ZipDeploymentUnit unit, Path unitFolder) throws IOException { + for (ZipInputStream zis : unit.zipContent()) { + ZipEntry ze; + while ((ze = zis.getNextEntry()) != null) { + Path entryPath = unitFolder.resolve(ze.getName()); + + if (ze.isDirectory()) { + Files.createDirectories(entryPath); + } else { + Path unitFileFolder = entryPath.getParent(); + Files.createDirectories(unitFileFolder); + doDeploy(unitFileFolder, entryPath.getFileName().toString(), zis); + } + } + } + return null; + } + + private static void doDeploy(Path unitFolder, String entryName, InputStream is) throws IOException { + Path unitPath = unitFolder.resolve(entryName); + Files.createDirectories(unitPath.getParent()); + Path unitPathTmp = unitFolder.resolve(entryName + TMP_SUFFIX); + Files.copy(is, unitPathTmp, REPLACE_EXISTING); + Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING); + } + +} diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java index bb36c8dccc0..0d701a994e2 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java @@ -17,32 +17,40 @@ package org.apache.ignite.internal.deployunit; -import static org.apache.ignite.internal.util.IgniteUtils.closeAll; - -import java.io.InputStream; -import java.util.Map; +import java.io.IOException; /** - * Deployment unit interface. + * Interface representing a deployment unit in the Apache Ignite code deployment system. + * + * <p>A deployment unit is a container for code and resources that can be deployed to and managed + * within an Ignite cluster. This interface provides a contract for processing deployment unit + * content through a processor pattern, allowing for flexible handling of different types of + * deployment units such as regular file-based units and ZIP-compressed units. */ -public class DeploymentUnit implements AutoCloseable { - private final Map<String, InputStream> content; - - public DeploymentUnit(Map<String, InputStream> content) { - this.content = content; - } - +public interface DeploymentUnit extends AutoCloseable { /** - * Deployment unit content - a map from file name to input stream. + * Processes the deployment unit content using the provided processor. + * + * <p>This method delegates the processing of the deployment unit to the specified processor, + * following a strategy pattern. The processor determines how the unit content should be + * handled based on its implementation. Different processors can perform different operations + * on the same deployment unit. + * + * <p>The method supports generic type parameters to allow processors to work with different + * argument types and return different result types, providing flexibility for various + * processing scenarios. + * + * <p>For ZIP-based deployment units, the processor may need to handle both regular content + * and compressed content that requires extraction. * - * @return Deployment unit content. + * + * @param <T> the type of argument passed to the processor + * @param <R> the type of result returned by the processor + * @param processor the processor that will handle the deployment unit content + * @param unitFolder the argument to be passed to the processor during processing + * @return the result of the processing operation as determined by the processor + * @throws IOException if an I/O error occurs during processing, such as issues reading + * deployment unit content or writing processed results. */ - public Map<String, InputStream> content() { - return content; - } - - @Override - public void close() throws Exception { - closeAll(content.values()); - } + <T, R> R process(DeploymentUnitProcessor<T, R> processor, T unitFolder) throws IOException; } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitImpl.java new file mode 100644 index 00000000000..1b4ec14e3ee --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitImpl.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +/** + * Standard implementation of {@link DeploymentUnit} that handles regular (non-compressed) deployment content. + * + * <p>This class represents a deployment unit containing a collection of files, where each file is + * represented as a mapping from file name to its corresponding {@link InputStream}. This implementation is designed for straightforward + * deployment scenarios where the content does not require compression or special extraction handling. + * + * <p>The {@code DeploymentUnitImpl} is the primary implementation used for most deployment operations + * in the Apache Ignite code deployment system. It provides a simple and efficient way to package and deploy code artifacts, configuration + * files, resources, and other deployment assets to Ignite cluster nodes. + */ +public class DeploymentUnitImpl implements DeploymentUnit { + + /** + * The deployment unit content represented as a mapping from file names to their input streams. Each entry represents a file within the + * deployment unit. + */ + private final Map<String, InputStream> content; + + /** + * Constructor. + */ + public DeploymentUnitImpl(Map<String, InputStream> content) { + this.content = content; + } + + /** + * Returns the deployment unit content as a map of file names to input streams. + */ + public Map<String, InputStream> content() { + return content; + } + + /** {@inheritDoc} */ + @Override + public void close() throws Exception { + closeAll(content.values()); + } + + /** {@inheritDoc} */ + @Override + public <T, R> R process(DeploymentUnitProcessor<T, R> processor, T arg) throws IOException { + return processor.processContent(this, arg); + } +} diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java new file mode 100644 index 00000000000..eb9351bd92e --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import java.io.IOException; + +/** + * Processor interface for handling deployment unit content operations. + * + * <p>This interface defines a contract for processing deployment units, providing methods to handle + * both regular deployment units and ZIP-based deployment units. Implementations of this interface + * can perform various operations on deployment unit content such as deployment, validation, + * transformation, or extraction. + * + * <p>The processor uses a generic approach with two type parameters: + * <ul> + * <li>{@code T} - the type of argument passed to processing methods</li> + * <li>{@code R} - the type of result returned by processing methods</li> + * </ul> + * + * <p>This design allows for flexible implementations that can process deployment units with + * different argument types and return different result types based on the specific use case. + * + * @param <T> the type of argument passed to the processing methods + * @param <R> the type of result returned by the processing methods + */ +public interface DeploymentUnitProcessor<T, R> { + /** + * Processes the content of a regular deployment unit. + * + * <p>This method handles deployment units that contain a collection of files represented + * as input streams. The implementation should process each file in the deployment unit + * according to the specific processor's logic. + * + * @param unit the deployment unit containing the content to be processed + * @param arg the argument to be used during processing + * @return the result of the processing operation + * @throws IOException if an I/O error occurs during processing + */ + R processContent(DeploymentUnitImpl unit, T arg) throws IOException; + + /** + * Processes the content of a ZIP-based deployment unit with automatic extraction. + * + * <p>This method handles deployment units that contain ZIP archives. The implementation + * should process the ZIP content by extracting and handling each entry according to the + * specific processor's logic. This method is typically used when the deployment unit + * contains compressed content that needs to be extracted during processing. + * + * @param unit the ZIP deployment unit containing the compressed content to be processed + * @param arg the argument to be used during processing + * @return the result of the processing operation + * @throws IOException if an I/O error occurs during processing or extraction + */ + R processContentWithUnzip(ZipDeploymentUnit unit, T arg) throws IOException; +} diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java index 73d13708c7d..605cc82eb98 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java @@ -17,11 +17,7 @@ package org.apache.ignite.internal.deployunit; -import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; -import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; - import java.io.IOException; -import java.io.InputStream; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -29,7 +25,6 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -46,8 +41,6 @@ import org.apache.ignite.internal.util.IgniteUtils; public class FileDeployerService { private static final IgniteLogger LOG = Loggers.forClass(FileDeployerService.class); - private static final String TMP_SUFFIX = ".tmp"; - private static final int DEPLOYMENT_EXECUTOR_SIZE = 4; /** @@ -57,6 +50,8 @@ public class FileDeployerService { private final ExecutorService executor; + private final DeploymentUnitProcessor<Path, Void> deployProcessor = new DeployDeploymentUnitProcessor(); + /** Constructor. */ public FileDeployerService(String nodeName) { executor = Executors.newFixedThreadPool( @@ -81,16 +76,10 @@ public class FileDeployerService { return CompletableFuture.supplyAsync(() -> { try { Path unitFolder = unitPath(id, version); - Files.createDirectories(unitFolder); - for (Entry<String, InputStream> entry : deploymentUnit.content().entrySet()) { - String fileName = entry.getKey(); - Path unitPath = unitFolder.resolve(fileName); - Path unitPathTmp = unitFolder.resolve(fileName + TMP_SUFFIX); - Files.copy(entry.getValue(), unitPathTmp, REPLACE_EXISTING); - Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING); - } + deploymentUnit.process(deployProcessor, unitFolder); + return true; } catch (IOException e) { LOG.error("Failed to deploy unit " + id + ":" + version, e); @@ -129,10 +118,12 @@ public class FileDeployerService { return CompletableFuture.supplyAsync(() -> { Map<String, byte[]> result = new HashMap<>(); try { - Files.walkFileTree(unitPath(id, version), new SimpleFileVisitor<>() { + Path unitFolder = unitPath(id, version); + Files.walkFileTree(unitFolder, new SimpleFileVisitor<>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - result.put(file.getFileName().toString(), Files.readAllBytes(file)); + Path unitStructure = unitFolder.relativize(file); + result.put(unitStructure.toString(), Files.readAllBytes(file)); return FileVisitResult.CONTINUE; } }); diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java index 6ea4cf60b02..dcaa29a5246 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java @@ -18,15 +18,12 @@ package org.apache.ignite.internal.deployunit; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.stream.Collectors; -import org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException; /** * Unit content representation. @@ -78,24 +75,6 @@ public class UnitContent implements Iterable<Entry<String, byte[]>> { return files.entrySet().iterator(); } - /** - * Read unit content from unit {@link DeploymentUnit}. - * - * @param deploymentUnit Deployment unit instance. - * @return Unit content from provided deployment unit. - */ - public static UnitContent readContent(DeploymentUnit deploymentUnit) { - Map<String, byte[]> map = deploymentUnit.content().entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, entry -> { - try { - return entry.getValue().readAllBytes(); - } catch (IOException e) { - throw new DeploymentUnitReadException(e); - } - })); - return new UnitContent(map); - } - /** * Convert unit content to {@link DeploymentUnit}. * @@ -107,6 +86,6 @@ public class UnitContent implements Iterable<Entry<String, byte[]>> { content.iterator().forEachRemaining(it -> { files.put(it.getKey(), new ByteArrayInputStream(it.getValue())); }); - return new DeploymentUnit(files); + return new DeploymentUnitImpl(files); } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java new file mode 100644 index 00000000000..0758379d89a --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; + +import java.io.IOException; +import java.util.Collection; +import java.util.zip.ZipInputStream; + +/** + * A specialized implementation of {@link DeploymentUnit} that handles ZIP-compressed deployment content. + * + * <p>This class represents a deployment unit that contains both regular (uncompressed) content and + * ZIP-compressed archives that require extraction during processing. It extends the basic deployment + * unit functionality to handle mixed content types, providing automatic extraction capabilities for + * ZIP archives while maintaining support for regular file content. + */ +public class ZipDeploymentUnit implements DeploymentUnit { + /** The deployment unit containing regular (non-ZIP) content. */ + private final DeploymentUnit notZippedContent; + + /** Collection of ZIP input streams that require extraction during processing. */ + private final Collection<ZipInputStream> zipContent; + + /** + * Constructor. + */ + public ZipDeploymentUnit(DeploymentUnit notZippedContent, Collection<ZipInputStream> zipContent) { + this.notZippedContent = notZippedContent; + this.zipContent = zipContent; + } + + /** + * Processes the deployment unit content using a two-phase approach for mixed content types. + * + * <p>This method implements the {@link DeploymentUnit} processing contract with specialized + * handling for ZIP content. + * + * @param <T> the type of argument passed to the processor. + * @param <R> the type of result returned by the processor. + * @param processor the processor that will handle both regular and ZIP content; + * must implement both {@code processContent} and {@code processContentWithUnzip} methods. + * @param arg the argument to be passed to the processor during both processing phases. + * @return the result of the ZIP content processing phase. + * @throws IOException if an I/O error occurs during either processing phase. + */ + @Override + public <T, R> R process(DeploymentUnitProcessor<T, R> processor, T arg) throws IOException { + notZippedContent.process(processor, arg); + return processor.processContentWithUnzip(this, arg); + } + + /** + * Returns the collection of ZIP input streams that require extraction during processing. + */ + public Collection<ZipInputStream> zipContent() { + return zipContent; + } + + /** + * Closes this deployment unit and releases all associated resources. + */ + @Override + public void close() throws Exception { + notZippedContent.close(); + closeAll(zipContent); + } +} diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java index 0cb57643735..33e06ace784 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java @@ -29,12 +29,15 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.internal.deployunit.DeploymentUnit; +import org.apache.ignite.internal.deployunit.DeploymentUnitImpl; import org.apache.ignite.internal.deployunit.FileDeployerService; import org.apache.ignite.internal.deployunit.UnitContent; +import org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -70,18 +73,18 @@ public class FileDeployerServiceTest { @Test public void test() throws Exception { - try (DeploymentUnit unit = content()) { + try (DeploymentUnitImpl unit = content()) { CompletableFuture<Boolean> deployed = service.deploy("id", parseVersion("1.0.0"), unit); assertThat(deployed, willBe(true)); } - try (DeploymentUnit unit = content()) { + try (DeploymentUnitImpl unit = content()) { CompletableFuture<UnitContent> unitContent = service.getUnitContent("id", parseVersion("1.0.0")); - assertThat(unitContent, willBe(equalTo(UnitContent.readContent(unit)))); + assertThat(unitContent, willBe(equalTo(readContent(unit)))); } } - private DeploymentUnit content() { + private DeploymentUnitImpl content() { Map<String, InputStream> map = Stream.of(file1, file2, file3) .collect(Collectors.toMap(it -> it.getFileName().toString(), it -> { try { @@ -95,6 +98,24 @@ public class FileDeployerServiceTest { } })); - return new DeploymentUnit(map); + return new DeploymentUnitImpl(map); + } + + /** + * Read unit content from unit {@link DeploymentUnit}. + * + * @param deploymentUnit Deployment unit instance. + * @return Unit content from provided deployment unit. + */ + private static UnitContent readContent(DeploymentUnitImpl deploymentUnit) { + Map<String, byte[]> map = deploymentUnit.content().entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, entry -> { + try { + return entry.getValue().readAllBytes(); + } catch (IOException e) { + throw new DeploymentUnitReadException(e); + } + })); + return new UnitContent(map); } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java index 6c7964a8ff2..af74af30a03 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java @@ -48,6 +48,7 @@ import org.apache.ignite.compute.JobTarget; import org.apache.ignite.compute.TaskDescriptor; import org.apache.ignite.deployment.DeploymentUnit; import org.apache.ignite.deployment.version.Version; +import org.apache.ignite.internal.deployunit.DeploymentUnitImpl; import org.apache.ignite.internal.deployunit.IgniteDeployment; import org.apache.ignite.internal.deployunit.NodesToDeploy; import org.apache.ignite.internal.testframework.IgniteTestUtils; @@ -229,7 +230,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest { CompletableFuture<Boolean> deployed = deployment.deployAsync( unitId, unitVersion, - new org.apache.ignite.internal.deployunit.DeploymentUnit(Map.of(jarName, jarStream)), + new DeploymentUnitImpl(Map.of(jarName, jarStream)), new NodesToDeploy(MAJORITY) ); diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java index 76327903740..21845b4be36 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java @@ -307,7 +307,20 @@ public class WorkDirectoryExtension return PATTERN.matcher(property).matches(); } - private static void zipDirectory(Path source, Path target) { + /** + * Creates a ZIP archive from the contents of a source directory. + * + * <p>This method recursively walks through the source directory and compresses all files (excluding directories) + * into a ZIP archive at the target location. The directory structure is preserved within the ZIP file using + * relative paths from the source directory. + * + * <p>The parent directories of the target ZIP file are created if they don't exist. If the target file already + * exists, it will be overwritten. + * + * @param source the path to the source directory to be zipped; must be an existing directory + * @param target the path where the ZIP archive will be created; parent directories will be created if necessary + */ + public static void zipDirectory(Path source, Path target) { try { Files.createDirectories(target.getParent()); diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java index a903ba6b912..2baf0d11b1f 100644 --- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java @@ -30,6 +30,8 @@ import io.micronaut.http.annotation.Post; import io.micronaut.http.annotation.QueryValue; import io.micronaut.http.multipart.CompletedFileUpload; import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.enums.ParameterIn; import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; @@ -77,7 +79,13 @@ public interface DeploymentCodeApi { Optional<InitialDeployMode> deployMode, @QueryValue @Schema(name = "initialNodes", requiredMode = REQUIRED, description = "List of node identifiers to deploy to.") - Optional<List<String>> initialNodes + Optional<List<String>> initialNodes, + @Parameter( + name = "X-Unzip-Units", + in = ParameterIn.HEADER, + description = "Unzip all uploaded archives with saving directory structure." + ) + boolean unzip ); /** diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java index e9c67064715..1d3dd4be065 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java @@ -19,9 +19,6 @@ package org.apache.ignite.internal.rest.deployment; import io.micronaut.http.multipart.CompletedFileUpload; import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.deployunit.DeploymentUnit; import org.apache.ignite.internal.logger.IgniteLogger; @@ -33,14 +30,19 @@ import org.reactivestreams.Subscription; * Implementation of {@link Subscriber} based on {@link CompletedFileUpload} which will collect uploaded files to the * {@link DeploymentUnit}. */ -class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload>, AutoCloseable { +class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> { private static final IgniteLogger LOG = Loggers.forClass(CompletedFileUploadSubscriber.class); private final CompletableFuture<DeploymentUnit> result = new CompletableFuture<>(); - private final Map<String, InputStream> content = new HashMap<>(); + private final InputStreamCollector collector; - private IOException ex; + private Throwable ex; + + public CompletedFileUploadSubscriber(boolean unzip) { + InputStreamCollector isCollector = new InputStreamCollectorImpl(); + this.collector = unzip ? new ZipInputStreamCollector(isCollector) : isCollector; + } @Override public void onSubscribe(Subscription subscription) { @@ -50,20 +52,22 @@ class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload>, @Override public void onNext(CompletedFileUpload item) { try { - content.put(item.getFilename(), item.getInputStream()); + collector.addInputStream(item.getFilename(), item.getInputStream()); } catch (IOException e) { LOG.error("Failed to read file: " + item.getFilename(), e); - if (ex != null) { - ex.addSuppressed(e); - } else { - ex = e; - } + suppressException(e); } } @Override public void onError(Throwable throwable) { - result.completeExceptionally(throwable); + try { + collector.clear(); + } catch (Exception e) { + suppressException(e); + } + suppressException(throwable); + result.completeExceptionally(ex); } @Override @@ -71,22 +75,19 @@ class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload>, if (ex != null) { result.completeExceptionally(ex); } else { - result.complete(new DeploymentUnit(content)); + result.complete(collector.toDeploymentUnit()); } } - public CompletableFuture<DeploymentUnit> result() { - return result; + private void suppressException(Throwable t) { + if (ex == null) { + ex = t; + } else { + ex.addSuppressed(t); + } } - @Override - public void close() throws Exception { - result.thenAccept(it -> { - try { - it.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + public CompletableFuture<DeploymentUnit> result() { + return result; } } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java index 58996ee418b..48837ee9756 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.rest.deployment; +import static org.apache.ignite.deployment.version.Version.parseVersion; + import io.micronaut.http.annotation.Controller; import io.micronaut.http.multipart.CompletedFileUpload; import java.util.ArrayList; @@ -63,30 +65,30 @@ public class DeploymentManagementController implements DeploymentCodeApi, Resour String unitVersion, Publisher<CompletedFileUpload> unitContent, Optional<InitialDeployMode> deployMode, - Optional<List<String>> initialNodes + Optional<List<String>> initialNodes, + boolean unzip ) { - - CompletedFileUploadSubscriber subscriber = new CompletedFileUploadSubscriber(); + CompletedFileUploadSubscriber subscriber = new CompletedFileUploadSubscriber(unzip); unitContent.subscribe(subscriber); NodesToDeploy nodesToDeploy = initialNodes.map(NodesToDeploy::new) .orElseGet(() -> new NodesToDeploy(fromInitialDeployMode(deployMode))); - return subscriber.result().thenCompose(content -> { - return deployment.deployAsync(unitId, Version.parseVersion(unitVersion), content, nodesToDeploy); - }).whenComplete((res, throwable) -> { - try { - subscriber.close(); - } catch (Exception e) { - LOG.error("Failed to close subscriber", e); - } - }); - + return subscriber.result().thenCompose(deploymentUnit -> + deployment.deployAsync(unitId, parseVersion(unitVersion), deploymentUnit, nodesToDeploy) + .whenComplete((unitStatus, throwable) -> { + try { + deploymentUnit.close(); + } catch (Exception e) { + LOG.error("Failed to close subscriber", e); + } + }) + ); } @Override public CompletableFuture<Boolean> undeploy(String unitId, String unitVersion) { - return deployment.undeployAsync(unitId, Version.parseVersion(unitVersion)); + return deployment.undeployAsync(unitId, parseVersion(unitVersion)); } @Override @@ -107,7 +109,7 @@ public class DeploymentManagementController implements DeploymentCodeApi, Resour private CompletableFuture<List<UnitStatuses>> clusterStatuses(String unitId, Optional<String> version) { if (version.isPresent()) { - Version parsedVersion = Version.parseVersion(version.get()); + Version parsedVersion = parseVersion(version.get()); return deployment.clusterStatusAsync(unitId, parsedVersion) .thenApply(deploymentStatus -> { if (deploymentStatus != null) { @@ -140,7 +142,7 @@ public class DeploymentManagementController implements DeploymentCodeApi, Resour private CompletableFuture<List<UnitStatuses>> nodeStatuses(String unitId, Optional<String> version) { if (version.isPresent()) { - Version parsedVersion = Version.parseVersion(version.get()); + Version parsedVersion = parseVersion(version.get()); return deployment.nodeStatusAsync(unitId, parsedVersion) .thenApply(deploymentStatus -> { if (deploymentStatus != null) { diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java new file mode 100644 index 00000000000..f7d77a39b86 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.deployment; + +import java.io.InputStream; +import org.apache.ignite.internal.deployunit.DeploymentUnit; + +/** + * Interface for collecting input streams and converting them into deployment units. + * + * <p>This interface provides a contract for accumulating input streams from various sources + * (such as file uploads, network transfers, or other data sources) and organizing them into + * a structured deployment unit that can be processed by the Apache Ignite deployment system. + * + * <p>Implementations of this interface are typically used in REST endpoints, file upload + * handlers, and other scenarios where deployment content is received in stream format and + * needs to be converted into deployment units for processing. + */ +public interface InputStreamCollector { + /** + * Adds an input stream with the specified filename to the collection. + * + * <p>This method accepts an input stream representing a file or resource that should be + * included in the deployment unit. The filename parameter provides the logical name or + * path for the content within the deployment unit structure. + * + * <p>Once added, the input stream becomes managed by this collector and should not be + * closed directly by the caller. The collector is responsible for proper cleanup of + * all managed streams. + * + * + * @param filename the logical name or path for the content within the deployment unit; + * must not be {@code null} or empty. + * @param is the input stream containing the content to be added; must not be {@code null}. + */ + void addInputStream(String filename, InputStream is); + + /** + * Clears all collected input streams and releases associated resources. + * + * <p>This method closes all input streams that have been added to this collector and + * clears the internal collection. After calling this method, the collector returns to + * its initial empty state and can be reused for collecting new streams. + */ + void clear() throws Exception; + + /** + * Converts the collected input streams into a deployment unit. + * + * <p>This method creates a {@link DeploymentUnit} instance containing all the input streams + * that have been added to this collector. The specific type of deployment unit returned + * depends on the implementation and the characteristics of the collected content. + * + * @return a deployment unit containing all collected input streams; never {@code null}. + */ + DeploymentUnit toDeploymentUnit(); +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java new file mode 100644 index 00000000000..e9520f6fc57 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.deployment; + +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.internal.deployunit.DeploymentUnit; +import org.apache.ignite.internal.deployunit.DeploymentUnitImpl; + +/** + * Standard implementation of {@link InputStreamCollector} for collecting regular file content. + * + * <p>This implementation provides a straightforward approach to collecting input streams and + * converting them into a standard {@link DeploymentUnitImpl}. It maintains an internal map + * of filename-to-stream associations and creates deployment units containing regular + * (non-compressed) file content. + */ +public class InputStreamCollectorImpl implements InputStreamCollector { + /** + * Internal storage for collected input streams mapped by their filenames. + * The map maintains the association between logical file paths and their content streams. + */ + private final Map<String, InputStream> content = new HashMap<>(); + + /** {@inheritDoc} */ + @Override + public void addInputStream(String filename, InputStream is) { + content.put(filename, is); + } + + /** {@inheritDoc} */ + @Override + public DeploymentUnit toDeploymentUnit() { + return new DeploymentUnitImpl(content); + } + + /** {@inheritDoc} */ + @Override + public void clear() throws Exception { + closeAll(content.values()); + } +} diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java new file mode 100644 index 00000000000..b4c41f290c9 --- /dev/null +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rest.deployment; + +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipInputStream; +import org.apache.ignite.internal.deployunit.DeploymentUnit; +import org.apache.ignite.internal.deployunit.ZipDeploymentUnit; +import org.jetbrains.annotations.Nullable; + +/** + * Advanced implementation of {@link InputStreamCollector} that automatically detects and handles ZIP content. + * + * <p>This decorator implementation wraps another {@link InputStreamCollector} and provides intelligent + * handling of mixed content types. It automatically detects ZIP archives among the added input streams + * and separates them from regular file content, creating a {@link ZipDeploymentUnit} that can handle + * both types of content appropriately. + */ +public class ZipInputStreamCollector implements InputStreamCollector { + /** + * The delegate collector that handles regular (non-ZIP) content. + * All streams that are not identified as ZIP archives are forwarded to this collector. + */ + private final InputStreamCollector delegate; + + /** + * Collection of detected ZIP input streams that require special processing. + * These streams will be handled by the ZipDeploymentUnit for automatic extraction. + */ + private final List<ZipInputStream> zipContent = new ArrayList<>(); + + /** + * Constructor. + */ + public ZipInputStreamCollector(InputStreamCollector delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override + public void addInputStream(String filename, InputStream is) { + InputStream result = is.markSupported() ? is : new BufferedInputStream(is); + + ZipInputStream zis = tryZip(result); + if (zis != null) { + zipContent.add(zis); + } else { + delegate.addInputStream(filename, result); + } + } + + /** {@inheritDoc} */ + @Nullable + private static ZipInputStream tryZip(InputStream is) { + try { + ZipInputStream zis = new ZipInputStream(is); + if (zis.getNextEntry() != null) { + is.reset(); + return zis; + } else { + is.reset(); + return null; + } + } catch (IOException e) { + return null; + } + } + + /** {@inheritDoc} */ + @Override + public void clear() throws Exception { + delegate.clear(); + closeAll(zipContent); + } + + /** {@inheritDoc} */ + @Override + public DeploymentUnit toDeploymentUnit() { + return new ZipDeploymentUnit(delegate.toDeploymentUnit(), zipContent); + } +}