This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch IGNITE-26455
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 3fce6b73728b272a3dd42e70f7989ea995798ff8
Author: Pochatkin Mikhail <m.a.pochat...@gmail.com>
AuthorDate: Fri Sep 19 11:27:16 2025 +0300

    IGNITE-26455 Add support for deploy units with folders structure
---
 .../ignite/internal/deployment/DeployFile.java     |   9 +-
 .../ignite/internal/deployment/DeployFiles.java    | 121 ++++++++++++++++++---
 .../internal/deployment/ItDeploymentUnitTest.java  |  31 ++++++
 .../deployunit/DeployDeploymentUnitProcessor.java  |  95 ++++++++++++++++
 .../ignite/internal/deployunit/DeploymentUnit.java |  52 +++++----
 .../internal/deployunit/DeploymentUnitImpl.java    |  70 ++++++++++++
 .../deployunit/DeploymentUnitProcessor.java        |  71 ++++++++++++
 .../internal/deployunit/FileDeployerService.java   |  25 ++---
 .../ignite/internal/deployunit/UnitContent.java    |  23 +---
 .../internal/deployunit/ZipDeploymentUnit.java     |  84 ++++++++++++++
 .../ignite/deployment/FileDeployerServiceTest.java |  31 +++++-
 .../internal/compute/ItComputeTestStandalone.java  |   3 +-
 .../testframework/WorkDirectoryExtension.java      |  15 ++-
 .../rest/api/deployment/DeploymentCodeApi.java     |  10 +-
 .../deployment/CompletedFileUploadSubscriber.java  |  51 ++++-----
 .../deployment/DeploymentManagementController.java |  34 +++---
 .../rest/deployment/InputStreamCollector.java      |  72 ++++++++++++
 .../rest/deployment/InputStreamCollectorImpl.java  |  60 ++++++++++
 .../rest/deployment/ZipInputStreamCollector.java   | 102 +++++++++++++++++
 19 files changed, 830 insertions(+), 129 deletions(-)

diff --git 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
index 071aa615a46..c76b9a84031 100644
--- 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
+++ 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
@@ -29,10 +29,13 @@ class DeployFile {
 
     private final int replicaTimeout;
 
-    DeployFile(Path file, long expectedSize, int replicaTimeout) throws 
IOException {
+    private final boolean zip;
+
+    DeployFile(Path file, boolean zip, long expectedSize, int replicaTimeout) 
throws IOException {
         this.file = file;
         this.expectedSize = expectedSize;
         this.replicaTimeout = replicaTimeout;
+        this.zip = zip;
         ensureExists();
     }
 
@@ -46,6 +49,10 @@ class DeployFile {
         return file;
     }
 
+    boolean zip() {
+        return zip;
+    }
+
     long expectedSize() {
         return expectedSize;
     }
diff --git 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
index de5a1ffb265..68d368bdf82 100644
--- 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
+++ 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
@@ -18,26 +18,34 @@
 package org.apache.ignite.internal.deployment;
 
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.fillDummyFile;
+import static 
org.apache.ignite.internal.testframework.WorkDirectoryExtension.zipDirectory;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.DeploymentUnitImpl;
 import org.apache.ignite.internal.deployunit.NodesToDeploy;
 import org.apache.ignite.internal.deployunit.UnitStatuses;
 import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
+import org.apache.ignite.internal.deployunit.ZipDeploymentUnit;
 
 class DeployFiles {
     private static final int BASE_REPLICA_TIMEOUT = 30;
@@ -56,6 +64,10 @@ class DeployFiles {
 
     private DeployFile bigFile;
 
+    private DeployFile flatZipFile;
+
+    private DeployFile treeZipFile;
+
     // TODO https://issues.apache.org/jira/browse/IGNITE-20204
     DeployFiles(Path workDir) {
         this.workDir = workDir;
@@ -63,7 +75,30 @@ class DeployFiles {
 
     private static DeployFile create(Path path, long size, int replicaTimeout) 
{
         try {
-            return new DeployFile(path, size, replicaTimeout);
+            return new DeployFile(path, false, size, replicaTimeout);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private DeployFile createZip(Path path, Map<String, Long> content, int 
replicaTimeout) {
+        try {
+            Path zipTempFolder = workDir.resolve("tempDir");
+            Files.createDirectories(zipTempFolder);
+            long size = 0;
+            for (Entry<String, Long> e : content.entrySet()) {
+                String zipEntryPath = e.getKey();
+                Long entrySize = e.getValue();
+                Path entry = zipTempFolder.resolve(zipEntryPath);
+                if (entrySize > 0) {
+                    Files.createDirectories(entry.getParent());
+                    fillDummyFile(entry, entrySize);
+                }
+                size += entrySize;
+            }
+            zipDirectory(zipTempFolder, path);
+            deleteIfExists(zipTempFolder);
+            return new DeployFile(path, true, size, replicaTimeout);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -90,6 +125,39 @@ class DeployFiles {
         return bigFile;
     }
 
+    DeployFile flatZipFile() {
+        if (flatZipFile == null) {
+            flatZipFile = createZip(
+                    workDir.resolve("flat.zip"),
+                    Map.of(
+                            "a1", SMALL_IN_BYTES,
+                            "a2", SMALL_IN_BYTES,
+                            "a3", SMALL_IN_BYTES,
+                            "a4", SMALL_IN_BYTES
+                    ),
+                    BASE_REPLICA_TIMEOUT
+            );
+        }
+        return flatZipFile;
+    }
+
+    DeployFile treeZipFile() {
+        if (treeZipFile == null) {
+            treeZipFile = createZip(
+                    workDir.resolve("tree.zip"),
+                    Map.of(
+                            "a1/a2", SMALL_IN_BYTES,
+                            "b1", SMALL_IN_BYTES,
+                            "c1/c2/c3/c4", MEDIUM_IN_BYTES,
+                            "d1/d2", SMALL_IN_BYTES,
+                            "d1/a2", SMALL_IN_BYTES
+                    ),
+                    BASE_REPLICA_TIMEOUT * 3
+            );
+        }
+        return treeZipFile;
+    }
+
     private Unit deployAndVerify(String id, Version version, DeployFile file, 
IgniteImpl entryNode) {
         return deployAndVerify(id, version, false, file, entryNode);
     }
@@ -110,14 +178,8 @@ class DeployFiles {
             NodesToDeploy nodesToDeploy,
             IgniteImpl entryNode
     ) {
-        List<Path> paths = files.stream()
-                .map(DeployFile::file)
-                .collect(Collectors.toList());
-
-        CompletableFuture<Boolean> deploy;
-
-        DeploymentUnit deploymentUnit = fromPaths(paths);
-        deploy = entryNode.deployment()
+        DeploymentUnit deploymentUnit = fromFiles(files);
+        CompletableFuture<Boolean> deploy = entryNode.deployment()
                 .deployAsync(id, version, force, deploymentUnit, nodesToDeploy)
                 .whenComplete((res, err) -> {
                     try {
@@ -134,8 +196,19 @@ class DeployFiles {
         Path nodeUnitDirectory = unit.getNodeUnitDirectory(entryNode);
 
         for (DeployFile file : files) {
-            Path filePath = 
nodeUnitDirectory.resolve(file.file().getFileName());
-            assertTrue(Files.exists(filePath));
+            if (file.zip()) {
+                try (ZipInputStream zis = new 
ZipInputStream(Files.newInputStream(file.file()))) {
+                    ZipEntry ze;
+                    while ((ze = zis.getNextEntry()) != null) {
+                        
assertTrue(Files.exists(nodeUnitDirectory.resolve(ze.getName())));
+                    }
+                } catch (IOException e) {
+                    fail(e);
+                }
+            } else {
+                Path filePath = 
nodeUnitDirectory.resolve(file.file().getFileName());
+                assertTrue(Files.exists(filePath));
+            }
         }
 
         return unit;
@@ -153,6 +226,14 @@ class DeployFiles {
         return deployAndVerify(id, version, bigFile(), entryNode);
     }
 
+    public Unit deployAndVerifyFlatZip(String id, Version version, IgniteImpl 
entryNode) {
+        return deployAndVerify(id, version, flatZipFile(), entryNode);
+    }
+
+    public Unit deployAndVerifyTreeZip(String id, Version version, IgniteImpl 
entryNode) {
+        return deployAndVerify(id, version, treeZipFile(), entryNode);
+    }
+
     public static UnitStatuses buildStatus(String id, Unit... units) {
         UnitStatusesBuilder builder = UnitStatuses.builder(id);
         for (Unit unit : units) {
@@ -169,16 +250,20 @@ class DeployFiles {
         Files.copy(file.file(), unitFile);
     }
 
-    private static DeploymentUnit fromPaths(List<Path> paths) {
-        Objects.requireNonNull(paths);
+    private static DeploymentUnit fromFiles(List<DeployFile> files) {
         Map<String, InputStream> map = new HashMap<>();
+        List<ZipInputStream> zips = new ArrayList<>();
         try {
-            for (Path path : paths) {
-                map.put(path.getFileName().toString(), 
Files.newInputStream(path));
+            for (DeployFile file : files) {
+                if (file.zip()) {
+                    zips.add(new 
ZipInputStream(Files.newInputStream(file.file())));
+                } else {
+                    map.put(file.file().getFileName().toString(), 
Files.newInputStream(file.file()));
+                }
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return new DeploymentUnit(map);
+        return new ZipDeploymentUnit(new DeploymentUnitImpl(map), zips);
     }
 }
diff --git 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index fc5554e8baa..520da9f3dfa 100644
--- 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++ 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -281,4 +281,35 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
 
         smallUnit.waitUnitClean(igniteImpl(2));
     }
+
+    @Test
+    public void testZipDeploy() {
+        String id = "test";
+        Unit unit = files.deployAndVerifyFlatZip(id, 
Version.parseVersion("1.1.0"), igniteImpl(1));
+
+        Unit unit2 = files.deployAndVerifyTreeZip(id, 
Version.parseVersion("1.1.1"), igniteImpl(1));
+
+        UnitStatuses status = buildStatus(id, unit, unit2);
+
+        await().timeout(2, SECONDS)
+                .pollDelay(500, MILLISECONDS)
+                .until(() -> 
igniteImpl(2).deployment().clusterStatusesAsync(), willBe(List.of(status)));
+    }
+
+    @Test
+    public void testZipAndFileDeploy() {
+        String id = "test";
+
+        Unit unit = files.deployAndVerify(
+                id, Version.parseVersion("1.1.0"), false, 
List.of(files.smallFile(), files.flatZipFile()),
+                new NodesToDeploy(List.of(igniteImpl(1).name())),
+                igniteImpl(0)
+        );
+
+        UnitStatuses status = buildStatus(id, unit);
+
+        await().timeout(2, SECONDS)
+                .pollDelay(500, MILLISECONDS)
+                .until(() -> 
igniteImpl(2).deployment().clusterStatusesAsync(), willBe(List.of(status)));
+    }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployDeploymentUnitProcessor.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployDeploymentUnitProcessor.java
new file mode 100644
index 00000000000..50e45d4a3f8
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployDeploymentUnitProcessor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map.Entry;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/**
+ * Implementation of {@link DeploymentUnitProcessor} that deploys deployment 
unit content to the file system.
+ *
+ * <p>This processor extracts and deploys files from deployment units to a 
specified target directory.
+ * It handles both regular deployment units (containing individual files as 
input streams) and ZIP-based
+ * deployment units (containing compressed archives that need extraction).
+ *
+ * <p>The deployment process ensures atomic file operations by:
+ * <ul>
+ *     <li>First copying files to temporary locations with a {@code .tmp} 
suffix</li>
+ *     <li>Then atomically moving them to their final destinations</li>
+ *     <li>Creating necessary parent directories as needed</li>
+ * </ul>
+ *
+ * <p>This approach prevents partial deployments and ensures that files are 
either fully deployed
+ * or not deployed at all, maintaining consistency during the deployment 
process.
+ *
+ * <p>Type parameters:
+ * <ul>
+ *     <li>{@code Path} - the argument type representing the target deployment 
directory</li>
+ *     <li>{@code Void} - the return type (no meaningful return value)</li>
+ * </ul>
+ */
+public class DeployDeploymentUnitProcessor implements 
DeploymentUnitProcessor<Path, Void> {
+    /** Suffix used for temporary files during the deployment process. */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    /** {@inheritDoc} */
+    @Override
+    public Void processContent(DeploymentUnitImpl unit, Path unitFolder) 
throws IOException {
+        for (Entry<String, InputStream> e : unit.content().entrySet()) {
+            doDeploy(unitFolder, e.getKey(), e.getValue());
+        }
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Void processContentWithUnzip(ZipDeploymentUnit unit, Path 
unitFolder) throws IOException {
+        for (ZipInputStream zis : unit.zipContent()) {
+            ZipEntry ze;
+            while ((ze = zis.getNextEntry()) != null) {
+                Path entryPath = unitFolder.resolve(ze.getName());
+
+                if (ze.isDirectory()) {
+                    Files.createDirectories(entryPath);
+                } else {
+                    Path unitFileFolder = entryPath.getParent();
+                    Files.createDirectories(unitFileFolder);
+                    doDeploy(unitFileFolder, 
entryPath.getFileName().toString(), zis);
+                }
+            }
+        }
+        return null;
+    }
+
+    private static void doDeploy(Path unitFolder, String entryName, 
InputStream is) throws IOException {
+        Path unitPath = unitFolder.resolve(entryName);
+        Files.createDirectories(unitPath.getParent());
+        Path unitPathTmp = unitFolder.resolve(entryName + TMP_SUFFIX);
+        Files.copy(is, unitPathTmp, REPLACE_EXISTING);
+        Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING);
+    }
+
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
index bb36c8dccc0..0d701a994e2 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
@@ -17,32 +17,40 @@
 
 package org.apache.ignite.internal.deployunit;
 
-import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
-
-import java.io.InputStream;
-import java.util.Map;
+import java.io.IOException;
 
 /**
- * Deployment unit interface.
+ * Interface representing a deployment unit in the Apache Ignite code 
deployment system.
+ * 
+ * <p>A deployment unit is a container for code and resources that can be 
deployed to and managed 
+ * within an Ignite cluster. This interface provides a contract for processing 
deployment unit 
+ * content through a processor pattern, allowing for flexible handling of 
different types of 
+ * deployment units such as regular file-based units and ZIP-compressed units.
  */
-public class DeploymentUnit implements AutoCloseable {
-    private final Map<String, InputStream> content;
-
-    public DeploymentUnit(Map<String, InputStream> content) {
-        this.content = content;
-    }
-
+public interface DeploymentUnit extends AutoCloseable {
     /**
-     * Deployment unit content - a map from file name to input stream.
+     * Processes the deployment unit content using the provided processor.
+     * 
+     * <p>This method delegates the processing of the deployment unit to the 
specified processor,
+     * following a strategy pattern. The processor determines how the unit 
content should be
+     * handled based on its implementation. Different processors can perform 
different operations
+     * on the same deployment unit.
+     * 
+     * <p>The method supports generic type parameters to allow processors to 
work with different
+     * argument types and return different result types, providing flexibility 
for various
+     * processing scenarios.
+     * 
+     * <p>For ZIP-based deployment units, the processor may need to handle 
both regular content
+     * and compressed content that requires extraction.
      *
-     * @return Deployment unit content.
+     *
+     * @param <T> the type of argument passed to the processor
+     * @param <R> the type of result returned by the processor
+     * @param processor the processor that will handle the deployment unit 
content
+     * @param unitFolder the argument to be passed to the processor during 
processing
+     * @return the result of the processing operation as determined by the 
processor
+     * @throws IOException if an I/O error occurs during processing, such as 
issues reading
+     *                     deployment unit content or writing processed 
results.
      */
-    public Map<String, InputStream> content() {
-        return content;
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeAll(content.values());
-    }
+    <T, R> R process(DeploymentUnitProcessor<T, R> processor, T unitFolder) 
throws IOException;
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitImpl.java
new file mode 100644
index 00000000000..1b4ec14e3ee
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+/**
+ * Standard implementation of {@link DeploymentUnit} that handles regular 
(non-compressed) deployment content.
+ *
+ * <p>This class represents a deployment unit containing a collection of 
files, where each file is
+ * represented as a mapping from file name to its corresponding {@link 
InputStream}. This implementation is designed for straightforward
+ * deployment scenarios where the content does not require compression or 
special extraction handling.
+ *
+ * <p>The {@code DeploymentUnitImpl} is the primary implementation used for 
most deployment operations
+ * in the Apache Ignite code deployment system. It provides a simple and 
efficient way to package and deploy code artifacts, configuration
+ * files, resources, and other deployment assets to Ignite cluster nodes.
+ */
+public class DeploymentUnitImpl implements DeploymentUnit {
+
+    /**
+     * The deployment unit content represented as a mapping from file names to 
their input streams. Each entry represents a file within the
+     * deployment unit.
+     */
+    private final Map<String, InputStream> content;
+
+    /**
+     * Constructor.
+     */
+    public DeploymentUnitImpl(Map<String, InputStream> content) {
+        this.content = content;
+    }
+
+    /**
+     * Returns the deployment unit content as a map of file names to input 
streams.
+     */
+    public Map<String, InputStream> content() {
+        return content;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws Exception {
+        closeAll(content.values());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T, R> R process(DeploymentUnitProcessor<T, R> processor, T arg) 
throws IOException {
+        return processor.processContent(this, arg);
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java
new file mode 100644
index 00000000000..eb9351bd92e
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.io.IOException;
+
+/**
+ * Processor interface for handling deployment unit content operations.
+ *
+ * <p>This interface defines a contract for processing deployment units, 
providing methods to handle
+ * both regular deployment units and ZIP-based deployment units. 
Implementations of this interface
+ * can perform various operations on deployment unit content such as 
deployment, validation, 
+ * transformation, or extraction.
+ *
+ * <p>The processor uses a generic approach with two type parameters:
+ * <ul>
+ *     <li>{@code T} - the type of argument passed to processing methods</li>
+ *     <li>{@code R} - the type of result returned by processing methods</li>
+ * </ul>
+ *
+ * <p>This design allows for flexible implementations that can process 
deployment units with
+ * different argument types and return different result types based on the 
specific use case.
+ *
+ * @param <T> the type of argument passed to the processing methods
+ * @param <R> the type of result returned by the processing methods
+ */
+public interface DeploymentUnitProcessor<T, R> {
+    /**
+     * Processes the content of a regular deployment unit.
+     *
+     * <p>This method handles deployment units that contain a collection of 
files represented
+     * as input streams. The implementation should process each file in the 
deployment unit
+     * according to the specific processor's logic.
+     *
+     * @param unit the deployment unit containing the content to be processed
+     * @param arg the argument to be used during processing
+     * @return the result of the processing operation
+     * @throws IOException if an I/O error occurs during processing
+     */
+    R processContent(DeploymentUnitImpl unit, T arg) throws IOException;
+
+    /**
+     * Processes the content of a ZIP-based deployment unit with automatic 
extraction.
+     *
+     * <p>This method handles deployment units that contain ZIP archives. The 
implementation
+     * should process the ZIP content by extracting and handling each entry 
according to the
+     * specific processor's logic. This method is typically used when the 
deployment unit
+     * contains compressed content that needs to be extracted during 
processing.
+     *
+     * @param unit the ZIP deployment unit containing the compressed content 
to be processed
+     * @param arg the argument to be used during processing
+     * @return the result of the processing operation
+     * @throws IOException if an I/O error occurs during processing or 
extraction
+     */
+    R processContentWithUnzip(ZipDeploymentUnit unit, T arg) throws 
IOException;
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
index 73d13708c7d..605cc82eb98 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -17,11 +17,7 @@
 
 package org.apache.ignite.internal.deployunit;
 
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -29,7 +25,6 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -46,8 +41,6 @@ import org.apache.ignite.internal.util.IgniteUtils;
 public class FileDeployerService {
     private static final IgniteLogger LOG = 
Loggers.forClass(FileDeployerService.class);
 
-    private static final String TMP_SUFFIX = ".tmp";
-
     private static final int DEPLOYMENT_EXECUTOR_SIZE = 4;
 
     /**
@@ -57,6 +50,8 @@ public class FileDeployerService {
 
     private final ExecutorService executor;
 
+    private final DeploymentUnitProcessor<Path, Void> deployProcessor = new 
DeployDeploymentUnitProcessor();
+
     /** Constructor. */
     public FileDeployerService(String nodeName) {
         executor = Executors.newFixedThreadPool(
@@ -81,16 +76,10 @@ public class FileDeployerService {
         return CompletableFuture.supplyAsync(() -> {
             try {
                 Path unitFolder = unitPath(id, version);
-
                 Files.createDirectories(unitFolder);
 
-                for (Entry<String, InputStream> entry : 
deploymentUnit.content().entrySet()) {
-                    String fileName = entry.getKey();
-                    Path unitPath = unitFolder.resolve(fileName);
-                    Path unitPathTmp = unitFolder.resolve(fileName + 
TMP_SUFFIX);
-                    Files.copy(entry.getValue(), unitPathTmp, 
REPLACE_EXISTING);
-                    Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, 
REPLACE_EXISTING);
-                }
+                deploymentUnit.process(deployProcessor, unitFolder);
+
                 return true;
             } catch (IOException e) {
                 LOG.error("Failed to deploy unit " + id + ":" + version, e);
@@ -129,10 +118,12 @@ public class FileDeployerService {
         return CompletableFuture.supplyAsync(() -> {
             Map<String, byte[]> result = new HashMap<>();
             try {
-                Files.walkFileTree(unitPath(id, version), new 
SimpleFileVisitor<>() {
+                Path unitFolder = unitPath(id, version);
+                Files.walkFileTree(unitFolder, new SimpleFileVisitor<>() {
                     @Override
                     public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
-                        result.put(file.getFileName().toString(), 
Files.readAllBytes(file));
+                        Path unitStructure = unitFolder.relativize(file);
+                        result.put(unitStructure.toString(), 
Files.readAllBytes(file));
                         return FileVisitResult.CONTINUE;
                     }
                 });
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
index 6ea4cf60b02..dcaa29a5246 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
@@ -18,15 +18,12 @@
 package org.apache.ignite.internal.deployunit;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
 
 /**
  * Unit content representation.
@@ -78,24 +75,6 @@ public class UnitContent implements Iterable<Entry<String, 
byte[]>> {
         return files.entrySet().iterator();
     }
 
-    /**
-     * Read unit content from unit {@link DeploymentUnit}.
-     *
-     * @param deploymentUnit Deployment unit instance.
-     * @return Unit content from provided deployment unit.
-     */
-    public static UnitContent readContent(DeploymentUnit deploymentUnit) {
-        Map<String, byte[]> map = deploymentUnit.content().entrySet().stream()
-                .collect(Collectors.toMap(Entry::getKey, entry -> {
-                    try {
-                        return entry.getValue().readAllBytes();
-                    } catch (IOException e) {
-                        throw new DeploymentUnitReadException(e);
-                    }
-                }));
-        return new UnitContent(map);
-    }
-
     /**
      * Convert unit content to {@link DeploymentUnit}.
      *
@@ -107,6 +86,6 @@ public class UnitContent implements Iterable<Entry<String, 
byte[]>> {
         content.iterator().forEachRemaining(it -> {
             files.put(it.getKey(), new ByteArrayInputStream(it.getValue()));
         });
-        return new DeploymentUnit(files);
+        return new DeploymentUnitImpl(files);
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java
new file mode 100644
index 00000000000..0758379d89a
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.zip.ZipInputStream;
+
+/**
+ * A specialized implementation of {@link DeploymentUnit} that handles 
ZIP-compressed deployment content.
+ * 
+ * <p>This class represents a deployment unit that contains both regular 
(uncompressed) content and 
+ * ZIP-compressed archives that require extraction during processing. It 
extends the basic deployment 
+ * unit functionality to handle mixed content types, providing automatic 
extraction capabilities for 
+ * ZIP archives while maintaining support for regular file content.
+ */
+public class ZipDeploymentUnit implements DeploymentUnit {
+    /** The deployment unit containing regular (non-ZIP) content. */
+    private final DeploymentUnit notZippedContent;
+
+    /** Collection of ZIP input streams that require extraction during 
processing. */
+    private final Collection<ZipInputStream> zipContent;
+
+    /**
+     * Constructor.
+     */
+    public ZipDeploymentUnit(DeploymentUnit notZippedContent, 
Collection<ZipInputStream> zipContent) {
+        this.notZippedContent = notZippedContent;
+        this.zipContent = zipContent;
+    }
+
+    /**
+     * Processes the deployment unit content using a two-phase approach for 
mixed content types.
+     * 
+     * <p>This method implements the {@link DeploymentUnit} processing 
contract with specialized 
+     * handling for ZIP content.
+     *
+     * @param <T> the type of argument passed to the processor.
+     * @param <R> the type of result returned by the processor.
+     * @param processor the processor that will handle both regular and ZIP 
content;
+     *                 must implement both {@code processContent} and {@code 
processContentWithUnzip} methods.
+     * @param arg the argument to be passed to the processor during both 
processing phases.
+     * @return the result of the ZIP content processing phase.
+     * @throws IOException if an I/O error occurs during either processing 
phase.
+     */
+    @Override
+    public <T, R> R process(DeploymentUnitProcessor<T, R> processor, T arg) 
throws IOException {
+        notZippedContent.process(processor, arg);
+        return processor.processContentWithUnzip(this, arg);
+    }
+
+    /**
+     * Returns the collection of ZIP input streams that require extraction 
during processing.
+     */
+    public Collection<ZipInputStream> zipContent() {
+        return zipContent;
+    }
+
+    /**
+     * Closes this deployment unit and releases all associated resources.
+     */
+    @Override
+    public void close() throws Exception {
+        notZippedContent.close();
+        closeAll(zipContent);
+    }
+}
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
index 0cb57643735..33e06ace784 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
@@ -29,12 +29,15 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.DeploymentUnitImpl;
 import org.apache.ignite.internal.deployunit.FileDeployerService;
 import org.apache.ignite.internal.deployunit.UnitContent;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -70,18 +73,18 @@ public class FileDeployerServiceTest {
 
     @Test
     public void test() throws Exception {
-        try (DeploymentUnit unit = content()) {
+        try (DeploymentUnitImpl unit = content()) {
             CompletableFuture<Boolean> deployed = service.deploy("id", 
parseVersion("1.0.0"), unit);
             assertThat(deployed, willBe(true));
         }
 
-        try (DeploymentUnit unit = content()) {
+        try (DeploymentUnitImpl unit = content()) {
             CompletableFuture<UnitContent> unitContent = 
service.getUnitContent("id", parseVersion("1.0.0"));
-            assertThat(unitContent, 
willBe(equalTo(UnitContent.readContent(unit))));
+            assertThat(unitContent, willBe(equalTo(readContent(unit))));
         }
     }
 
-    private DeploymentUnit content() {
+    private DeploymentUnitImpl content() {
         Map<String, InputStream> map = Stream.of(file1, file2, file3)
                 .collect(Collectors.toMap(it -> it.getFileName().toString(), 
it -> {
                     try {
@@ -95,6 +98,24 @@ public class FileDeployerServiceTest {
                     }
                 }));
 
-        return new DeploymentUnit(map);
+        return new DeploymentUnitImpl(map);
+    }
+
+    /**
+     * Read unit content from unit {@link DeploymentUnit}.
+     *
+     * @param deploymentUnit Deployment unit instance.
+     * @return Unit content from provided deployment unit.
+     */
+    private static UnitContent readContent(DeploymentUnitImpl deploymentUnit) {
+        Map<String, byte[]> map = deploymentUnit.content().entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, entry -> {
+                    try {
+                        return entry.getValue().readAllBytes();
+                    } catch (IOException e) {
+                        throw new DeploymentUnitReadException(e);
+                    }
+                }));
+        return new UnitContent(map);
     }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 6c7964a8ff2..af74af30a03 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -48,6 +48,7 @@ import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.DeploymentUnitImpl;
 import org.apache.ignite.internal.deployunit.IgniteDeployment;
 import org.apache.ignite.internal.deployunit.NodesToDeploy;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -229,7 +230,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
             CompletableFuture<Boolean> deployed = deployment.deployAsync(
                     unitId,
                     unitVersion,
-                    new 
org.apache.ignite.internal.deployunit.DeploymentUnit(Map.of(jarName, 
jarStream)),
+                    new DeploymentUnitImpl(Map.of(jarName, jarStream)),
                     new NodesToDeploy(MAJORITY)
             );
 
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
index 76327903740..21845b4be36 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
@@ -307,7 +307,20 @@ public class WorkDirectoryExtension
         return PATTERN.matcher(property).matches();
     }
 
-    private static void zipDirectory(Path source, Path target) {
+    /**
+     * Creates a ZIP archive from the contents of a source directory.
+     *
+     * <p>This method recursively walks through the source directory and 
compresses all files (excluding directories)
+     * into a ZIP archive at the target location. The directory structure is 
preserved within the ZIP file using
+     * relative paths from the source directory.
+     *
+     * <p>The parent directories of the target ZIP file are created if they 
don't exist. If the target file already
+     * exists, it will be overwritten.
+     *
+     * @param source the path to the source directory to be zipped; must be an 
existing directory
+     * @param target the path where the ZIP archive will be created; parent 
directories will be created if necessary
+     */
+    public static void zipDirectory(Path source, Path target) {
         try {
             Files.createDirectories(target.getParent());
 
diff --git 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
index a903ba6b912..2baf0d11b1f 100644
--- 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
+++ 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
@@ -30,6 +30,8 @@ import io.micronaut.http.annotation.Post;
 import io.micronaut.http.annotation.QueryValue;
 import io.micronaut.http.multipart.CompletedFileUpload;
 import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.enums.ParameterIn;
 import io.swagger.v3.oas.annotations.media.ArraySchema;
 import io.swagger.v3.oas.annotations.media.Content;
 import io.swagger.v3.oas.annotations.media.Schema;
@@ -77,7 +79,13 @@ public interface DeploymentCodeApi {
             Optional<InitialDeployMode> deployMode,
             @QueryValue
             @Schema(name = "initialNodes", requiredMode = REQUIRED, 
description = "List of node identifiers to deploy to.")
-            Optional<List<String>> initialNodes
+            Optional<List<String>> initialNodes,
+            @Parameter(
+                    name = "X-Unzip-Units",
+                    in = ParameterIn.HEADER,
+                    description = "Unzip all uploaded archives with saving 
directory structure."
+            )
+            boolean unzip
     );
 
     /**
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
index e9c67064715..1d3dd4be065 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
@@ -19,9 +19,6 @@ package org.apache.ignite.internal.rest.deployment;
 
 import io.micronaut.http.multipart.CompletedFileUpload;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.deployunit.DeploymentUnit;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -33,14 +30,19 @@ import org.reactivestreams.Subscription;
  * Implementation of {@link Subscriber} based on {@link CompletedFileUpload} 
which will collect uploaded files to the
  * {@link DeploymentUnit}.
  */
-class CompletedFileUploadSubscriber implements 
Subscriber<CompletedFileUpload>, AutoCloseable {
+class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> 
{
     private static final IgniteLogger LOG = 
Loggers.forClass(CompletedFileUploadSubscriber.class);
 
     private final CompletableFuture<DeploymentUnit> result = new 
CompletableFuture<>();
 
-    private final Map<String, InputStream> content = new HashMap<>();
+    private final InputStreamCollector collector;
 
-    private IOException ex;
+    private Throwable ex;
+
+    public CompletedFileUploadSubscriber(boolean unzip) {
+        InputStreamCollector isCollector = new InputStreamCollectorImpl();
+        this.collector = unzip ? new ZipInputStreamCollector(isCollector) : 
isCollector;
+    }
 
     @Override
     public void onSubscribe(Subscription subscription) {
@@ -50,20 +52,22 @@ class CompletedFileUploadSubscriber implements 
Subscriber<CompletedFileUpload>,
     @Override
     public void onNext(CompletedFileUpload item) {
         try {
-            content.put(item.getFilename(), item.getInputStream());
+            collector.addInputStream(item.getFilename(), 
item.getInputStream());
         } catch (IOException e) {
             LOG.error("Failed to read file: " + item.getFilename(), e);
-            if (ex != null) {
-                ex.addSuppressed(e);
-            } else {
-                ex = e;
-            }
+            suppressException(e);
         }
     }
 
     @Override
     public void onError(Throwable throwable) {
-        result.completeExceptionally(throwable);
+        try {
+            collector.clear();
+        } catch (Exception e) {
+            suppressException(e);
+        }
+        suppressException(throwable);
+        result.completeExceptionally(ex);
     }
 
     @Override
@@ -71,22 +75,19 @@ class CompletedFileUploadSubscriber implements 
Subscriber<CompletedFileUpload>,
         if (ex != null) {
             result.completeExceptionally(ex);
         } else {
-            result.complete(new DeploymentUnit(content));
+            result.complete(collector.toDeploymentUnit());
         }
     }
 
-    public CompletableFuture<DeploymentUnit> result() {
-        return result;
+    private void suppressException(Throwable t) {
+        if (ex == null) {
+            ex = t;
+        } else {
+            ex.addSuppressed(t);
+        }
     }
 
-    @Override
-    public void close() throws Exception {
-        result.thenAccept(it -> {
-            try {
-                it.close();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        });
+    public CompletableFuture<DeploymentUnit> result() {
+        return result;
     }
 }
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
index 58996ee418b..48837ee9756 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.rest.deployment;
 
+import static org.apache.ignite.deployment.version.Version.parseVersion;
+
 import io.micronaut.http.annotation.Controller;
 import io.micronaut.http.multipart.CompletedFileUpload;
 import java.util.ArrayList;
@@ -63,30 +65,30 @@ public class DeploymentManagementController implements 
DeploymentCodeApi, Resour
             String unitVersion,
             Publisher<CompletedFileUpload> unitContent,
             Optional<InitialDeployMode> deployMode,
-            Optional<List<String>> initialNodes
+            Optional<List<String>> initialNodes,
+            boolean unzip
     ) {
-
-        CompletedFileUploadSubscriber subscriber = new 
CompletedFileUploadSubscriber();
+        CompletedFileUploadSubscriber subscriber = new 
CompletedFileUploadSubscriber(unzip);
         unitContent.subscribe(subscriber);
 
         NodesToDeploy nodesToDeploy = initialNodes.map(NodesToDeploy::new)
                 .orElseGet(() -> new 
NodesToDeploy(fromInitialDeployMode(deployMode)));
 
-        return subscriber.result().thenCompose(content -> {
-            return deployment.deployAsync(unitId, 
Version.parseVersion(unitVersion), content, nodesToDeploy);
-        }).whenComplete((res, throwable) -> {
-            try {
-                subscriber.close();
-            } catch (Exception e) {
-                LOG.error("Failed to close subscriber", e);
-            }
-        });
-
+        return subscriber.result().thenCompose(deploymentUnit ->
+            deployment.deployAsync(unitId, parseVersion(unitVersion), 
deploymentUnit, nodesToDeploy)
+                    .whenComplete((unitStatus, throwable) -> {
+                        try {
+                            deploymentUnit.close();
+                        } catch (Exception e) {
+                            LOG.error("Failed to close subscriber", e);
+                        }
+                    })
+        );
     }
 
     @Override
     public CompletableFuture<Boolean> undeploy(String unitId, String 
unitVersion) {
-        return deployment.undeployAsync(unitId, 
Version.parseVersion(unitVersion));
+        return deployment.undeployAsync(unitId, parseVersion(unitVersion));
     }
 
     @Override
@@ -107,7 +109,7 @@ public class DeploymentManagementController implements 
DeploymentCodeApi, Resour
 
     private CompletableFuture<List<UnitStatuses>> clusterStatuses(String 
unitId, Optional<String> version) {
         if (version.isPresent()) {
-            Version parsedVersion = Version.parseVersion(version.get());
+            Version parsedVersion = parseVersion(version.get());
             return deployment.clusterStatusAsync(unitId, parsedVersion)
                     .thenApply(deploymentStatus -> {
                         if (deploymentStatus != null) {
@@ -140,7 +142,7 @@ public class DeploymentManagementController implements 
DeploymentCodeApi, Resour
 
     private CompletableFuture<List<UnitStatuses>> nodeStatuses(String unitId, 
Optional<String> version) {
         if (version.isPresent()) {
-            Version parsedVersion = Version.parseVersion(version.get());
+            Version parsedVersion = parseVersion(version.get());
             return deployment.nodeStatusAsync(unitId, parsedVersion)
                     .thenApply(deploymentStatus -> {
                         if (deploymentStatus != null) {
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java
new file mode 100644
index 00000000000..f7d77a39b86
--- /dev/null
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.deployment;
+
+import java.io.InputStream;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+
+/**
+ * Interface for collecting input streams and converting them into deployment 
units.
+ * 
+ * <p>This interface provides a contract for accumulating input streams from 
various sources
+ * (such as file uploads, network transfers, or other data sources) and 
organizing them into
+ * a structured deployment unit that can be processed by the Apache Ignite 
deployment system.
+ * 
+ * <p>Implementations of this interface are typically used in REST endpoints, 
file upload
+ * handlers, and other scenarios where deployment content is received in 
stream format and
+ * needs to be converted into deployment units for processing.
+ */
+public interface InputStreamCollector {
+    /**
+     * Adds an input stream with the specified filename to the collection.
+     * 
+     * <p>This method accepts an input stream representing a file or resource 
that should be
+     * included in the deployment unit. The filename parameter provides the 
logical name or
+     * path for the content within the deployment unit structure.
+     * 
+     * <p>Once added, the input stream becomes managed by this collector and 
should not be
+     * closed directly by the caller. The collector is responsible for proper 
cleanup of
+     * all managed streams.
+     *
+     *
+     * @param filename the logical name or path for the content within the 
deployment unit;
+     *                must not be {@code null} or empty.
+     * @param is the input stream containing the content to be added; must not 
be {@code null}.
+     */
+    void addInputStream(String filename, InputStream is);
+
+    /**
+     * Clears all collected input streams and releases associated resources.
+     * 
+     * <p>This method closes all input streams that have been added to this 
collector and
+     * clears the internal collection. After calling this method, the 
collector returns to
+     * its initial empty state and can be reused for collecting new streams.
+     */
+    void clear() throws Exception;
+
+    /**
+     * Converts the collected input streams into a deployment unit.
+     * 
+     * <p>This method creates a {@link DeploymentUnit} instance containing all 
the input streams
+     * that have been added to this collector. The specific type of deployment 
unit returned
+     * depends on the implementation and the characteristics of the collected 
content.
+     *
+     * @return a deployment unit containing all collected input streams; never 
{@code null}.
+     */
+    DeploymentUnit toDeploymentUnit();
+}
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java
new file mode 100644
index 00000000000..e9520f6fc57
--- /dev/null
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.deployment;
+
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.DeploymentUnitImpl;
+
+/**
+ * Standard implementation of {@link InputStreamCollector} for collecting 
regular file content.
+ *
+ * <p>This implementation provides a straightforward approach to collecting 
input streams and
+ * converting them into a standard {@link DeploymentUnitImpl}. It maintains an 
internal map
+ * of filename-to-stream associations and creates deployment units containing 
regular
+ * (non-compressed) file content.
+ */
+public class InputStreamCollectorImpl implements InputStreamCollector {
+    /**
+     * Internal storage for collected input streams mapped by their filenames.
+     * The map maintains the association between logical file paths and their 
content streams.
+     */
+    private final Map<String, InputStream> content = new HashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public void addInputStream(String filename, InputStream is) {
+        content.put(filename, is);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public DeploymentUnit toDeploymentUnit() {
+        return new DeploymentUnitImpl(content);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void clear() throws Exception {
+        closeAll(content.values());
+    }
+}
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java
new file mode 100644
index 00000000000..b4c41f290c9
--- /dev/null
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.deployment;
+
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.ZipInputStream;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.ZipDeploymentUnit;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Advanced implementation of {@link InputStreamCollector} that automatically 
detects and handles ZIP content.
+ * 
+ * <p>This decorator implementation wraps another {@link InputStreamCollector} 
and provides intelligent
+ * handling of mixed content types. It automatically detects ZIP archives 
among the added input streams
+ * and separates them from regular file content, creating a {@link 
ZipDeploymentUnit} that can handle
+ * both types of content appropriately.
+ */
+public class ZipInputStreamCollector implements InputStreamCollector {
+    /**
+     * The delegate collector that handles regular (non-ZIP) content.
+     * All streams that are not identified as ZIP archives are forwarded to 
this collector.
+     */
+    private final InputStreamCollector delegate;
+
+    /**
+     * Collection of detected ZIP input streams that require special 
processing.
+     * These streams will be handled by the ZipDeploymentUnit for automatic 
extraction.
+     */
+    private final List<ZipInputStream> zipContent = new ArrayList<>();
+
+    /**
+     * Constructor.
+     */
+    public ZipInputStreamCollector(InputStreamCollector delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addInputStream(String filename, InputStream is) {
+        InputStream result = is.markSupported() ? is : new 
BufferedInputStream(is);
+
+        ZipInputStream zis = tryZip(result);
+        if (zis != null) {
+            zipContent.add(zis);
+        } else {
+            delegate.addInputStream(filename, result);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable
+    private static ZipInputStream tryZip(InputStream is) {
+        try {
+            ZipInputStream zis = new ZipInputStream(is);
+            if (zis.getNextEntry() != null) {
+                is.reset();
+                return zis;
+            } else {
+                is.reset();
+                return null;
+            }
+        } catch (IOException e) {
+            return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void clear() throws Exception {
+        delegate.clear();
+        closeAll(zipContent);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public DeploymentUnit toDeploymentUnit() {
+        return new ZipDeploymentUnit(delegate.toDeploymentUnit(), zipContent);
+    }
+}

Reply via email to