This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new de0dab59f3 IGNITE-18870 Add REST API: Find units by node consistent id 
(#1789)
de0dab59f3 is described below

commit de0dab59f3532910a18971197c0f69798b43461a
Author: Mikhail <[email protected]>
AuthorDate: Fri Mar 17 16:52:56 2023 +0300

    IGNITE-18870 Add REST API: Find units by node consistent id (#1789)
---
 .../apache/ignite/deployment/IgniteDeployment.java |   8 ++
 .../internal/deployunit/DeploymentManagerImpl.java | 134 ++++++---------------
 .../ignite/internal/deployunit/UnitMeta.java       |   4 -
 .../ignite/internal/deployunit/key/UnitKey.java    |  76 ++++++++++++
 .../deployunit/{ => key}/UnitMetaSerializer.java   |   8 +-
 .../deployunit/metastore/AccumulateException.java  |  34 ++++++
 .../internal/deployunit/metastore/Accumulator.java |  42 +++++++
 .../deployunit/metastore/EntrySubscriber.java      |  69 +++++++++++
 .../metastore/SortedListAccumulator.java           |  58 +++++++++
 .../metastore/UnitStatusAccumulator.java           |  62 ++++++++++
 .../deployunit/metastore/UnitsAccumulator.java     |  60 +++++++++
 .../ignite/deployment/UnitMetaSerializerTest.java  |   6 +-
 modules/rest-api/openapi/openapi.yaml              |  29 +++++
 .../rest/api/deployment/DeploymentCodeApi.java     |  27 ++++-
 .../deployment/DeploymentManagementController.java |   7 ++
 .../internal/deployment/ItDeploymentUnitTest.java  |  30 +++++
 16 files changed, 540 insertions(+), 114 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java 
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
index 5d8e17eab2..d74ccf8db3 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
@@ -93,4 +93,12 @@ public interface IgniteDeployment {
      *      Future will be failed if unit with specified identifier not exist.
      */
     CompletableFuture<UnitStatus> statusAsync(String id);
+
+    /**
+     * Returns list with deployed units on node with provided consistent id.
+     *
+     * @param consistentId Node consistent id.
+     * @return List with deployed units on node with provided consistent id.
+     */
+    CompletableFuture<List<UnitStatus>> findUnitByConsistentIdAsync(String 
consistentId);
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 683f4f6878..1d51272eff 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -22,6 +22,9 @@ import static 
java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.SYNC;
 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.allUnits;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.withId;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -30,26 +33,20 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Flow.Subscriber;
-import java.util.concurrent.Flow.Subscription;
-import java.util.stream.Collectors;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.deployment.IgniteDeployment;
 import org.apache.ignite.deployment.UnitStatus;
-import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
 import org.apache.ignite.deployment.version.Version;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExistsException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
 import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
 import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
 import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
@@ -61,11 +58,14 @@ import 
org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
 import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
 import org.apache.ignite.internal.deployunit.message.UndeployUnitResponse;
 import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.metastore.EntrySubscriber;
+import org.apache.ignite.internal.deployunit.metastore.SortedListAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.UnitStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.UnitsAccumulator;
 import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
@@ -74,7 +74,6 @@ import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 
-//TODO: rework metastorage keys IGNITE-18870
 /**
  * Deployment manager implementation.
  */
@@ -84,10 +83,6 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
 
     private static final String TMP_SUFFIX = ".tmp";
 
-    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
-
-    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
-
     /**
      * Meta storage.
      */
@@ -145,8 +140,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
         Objects.requireNonNull(version);
         Objects.requireNonNull(deploymentUnit);
 
-        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + 
version.render());
-
+        ByteArray key = key(id, version.render());
         UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(), 
Collections.emptyList());
 
         Operation put = put(key, UnitMetaSerializer.serialize(meta));
@@ -214,7 +208,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
         checkId(id);
         Objects.requireNonNull(version);
 
-        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+        ByteArray key = key(id, version.render());
 
         return metaStorage.invoke(exists(key), Operations.remove(key), 
Operations.noop())
                 .thenCompose(success -> {
@@ -246,31 +240,8 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
     @Override
     public CompletableFuture<List<UnitStatus>> unitsAsync() {
         CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
-        Map<String, UnitStatusBuilder> map = new HashMap<>();
-        metaStorage.prefix(new ByteArray(UNITS_PREFIX))
-                .subscribe(new Subscriber<>() {
-                    @Override
-                    public void onSubscribe(Subscription subscription) {
-                        subscription.request(Long.MAX_VALUE);
-                    }
-
-                    @Override
-                    public void onNext(Entry item) {
-                        UnitMeta meta = 
UnitMetaSerializer.deserialize(item.value());
-                        map.computeIfAbsent(meta.id(), UnitStatus::builder)
-                                .append(meta.version(), 
meta.consistentIdLocation());
-                    }
-
-                    @Override
-                    public void onError(Throwable throwable) {
-                        result.completeExceptionally(throwable);
-                    }
-
-                    @Override
-                    public void onComplete() {
-                        
result.complete(map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toList()));
-                    }
-                });
+        metaStorage.prefix(allUnits())
+                .subscribe(new EntrySubscriber<>(result, new 
UnitsAccumulator()));
         return result;
     }
 
@@ -278,32 +249,13 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
     public CompletableFuture<List<Version>> versionsAsync(String id) {
         checkId(id);
         CompletableFuture<List<Version>> result = new CompletableFuture<>();
-        metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
-                .subscribe(new Subscriber<>() {
-                    private final List<Version> list = new ArrayList<>();
-
-                    @Override
-                    public void onSubscribe(Subscription subscription) {
-                        subscription.request(Long.MAX_VALUE);
-                    }
-
-                    @Override
-                    public void onNext(Entry item) {
-                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
-                        list.add(deserialize.version());
-                    }
-
-                    @Override
-                    public void onError(Throwable throwable) {
-                        result.completeExceptionally(throwable);
-                    }
-
-                    @Override
-                    public void onComplete() {
-                        Collections.sort(list);
-                        result.complete(list);
-                    }
-                });
+        metaStorage.prefix(withId(id))
+                .subscribe(
+                        new EntrySubscriber<>(
+                                result,
+                                new SortedListAccumulator<>(e -> 
UnitMetaSerializer.deserialize(e.value()).version())
+                        )
+                );
         return result;
     }
 
@@ -311,39 +263,23 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
     public CompletableFuture<UnitStatus> statusAsync(String id) {
         checkId(id);
         CompletableFuture<UnitStatus> result = new CompletableFuture<>();
-        metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
-                .subscribe(new Subscriber<>() {
-                    private UnitStatusBuilder builder;
-
-                    @Override
-                    public void onSubscribe(Subscription subscription) {
-                        subscription.request(Long.MAX_VALUE);
-                    }
-
-                    @Override
-                    public void onNext(Entry item) {
-                        if (builder == null) {
-                            builder = UnitStatus.builder(id);
-                        }
-                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
-                        builder.append(deserialize.version(), 
deserialize.consistentIdLocation());
-                    }
+        metaStorage.prefix(withId(id))
+                .subscribe(new EntrySubscriber<>(result, new 
UnitStatusAccumulator(id)));
+        return result;
+    }
 
-                    @Override
-                    public void onError(Throwable throwable) {
-                        result.completeExceptionally(throwable);
-                    }
+    @Override
+    public CompletableFuture<List<UnitStatus>> 
findUnitByConsistentIdAsync(String consistentId) {
+        Objects.requireNonNull(consistentId);
 
-                    @Override
-                    public void onComplete() {
-                        if (builder != null) {
-                            result.complete(builder.build());
-                        } else {
-                            result.completeExceptionally(
-                                    new DeploymentUnitNotFoundException("Unit 
with " + id + " doesn't exist."));
-                        }
-                    }
-                });
+        CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
+        metaStorage.prefix(allUnits())
+                .subscribe(
+                        new EntrySubscriber<>(
+                                result,
+                                new UnitsAccumulator(meta -> 
meta.consistentIdLocation().contains(consistentId))
+                        )
+                );
         return result;
     }
 
@@ -409,7 +345,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
             return CompletableFuture.failedFuture(e);
         }
 
-        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+        ByteArray key = key(id, version);
         return metaStorage.get(key)
                 .thenCompose(e -> {
                     UnitMeta prev = UnitMetaSerializer.deserialize(e.value());
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
index 5730f8cc7a..e8b119e593 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
@@ -60,10 +60,6 @@ public class UnitMeta {
         this.consistentIdLocation.addAll(consistentIdLocation);
     }
 
-    public UnitMeta(String id, String name, List<String> consistentIdLocation) 
{
-        this(id, Version.LATEST, name, consistentIdLocation);
-    }
-
     /**
      * Returns identifier of deployment unit.
      *
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
new file mode 100644
index 0000000000..fe61f27d17
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.key;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Helper for deployment units metastore keys generation.
+ */
+public final class UnitKey {
+    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+    private UnitKey() {
+
+    }
+
+    /**
+     * Key to find all deployment units.
+     *
+     * @return Key in {@link ByteArray} format.
+     */
+    public static ByteArray allUnits() {
+        return key(null, null);
+    }
+
+    /**
+     * Key to find all deployment units with required id.
+     *
+     * @param id Required unit id.
+     * @return Key in {@link ByteArray} format.
+     */
+    public static ByteArray withId(String id) {
+        return key(id, null);
+    }
+
+    /**
+     * Key for unit with required id and version. Only one unit should exist 
with this key.
+     *  Version can be not {@code null} only in case when {@param id} is not 
{@code null}.
+     *
+     * @param id Required unit id.
+     * @param version Required unit version.
+     * @return Key in {@link ByteArray} format.
+     */
+    public static ByteArray key(String id, String version) {
+        StringBuilder sb = new StringBuilder(UNITS_PREFIX);
+        Encoder encoder = Base64.getEncoder();
+        if (id != null) {
+            
sb.append(encoder.encodeToString(id.getBytes(StandardCharsets.UTF_8)));
+            if (version != null) {
+                sb.append(":");
+                
sb.append(encoder.encodeToString(version.getBytes(StandardCharsets.UTF_8)));
+            }
+        }
+        return new ByteArray(sb.toString());
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
similarity index 94%
rename from 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java
rename to 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
index 1101342177..18f7e9b2f4 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.deployunit;
+package org.apache.ignite.internal.deployunit.key;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -25,11 +25,12 @@ import java.util.Base64.Decoder;
 import java.util.Base64.Encoder;
 import java.util.List;
 import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.UnitMeta;
 
 /**
  * Serializer for {@link UnitMeta}.
  */
-public class UnitMetaSerializer {
+public final class UnitMetaSerializer {
     private static final String SEPARATOR = ";";
 
     /**
@@ -72,15 +73,12 @@ public class UnitMetaSerializer {
      */
     public static UnitMeta deserialize(byte[] bytes) {
         String s = new String(bytes, UTF_8);
-
         String[] split = s.split(SEPARATOR);
 
         Decoder decoder = Base64.getDecoder();
 
         String id = new String(decoder.decode(split[0]), UTF_8);
-
         String version = new String(decoder.decode(split[1]), UTF_8);
-
         String unitName = new String(decoder.decode(split[2]), UTF_8);
 
         List<String> ids = new ArrayList<>();
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/AccumulateException.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/AccumulateException.java
new file mode 100644
index 0000000000..96619bd601
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/AccumulateException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Throws when accumulation process finished unsuccessfully {@link 
Accumulator#get()}.
+ */
+public class AccumulateException extends IgniteInternalCheckedException {
+    /**
+     * Constructor.
+     *
+     * @param cause Cause exception.
+     */
+    public AccumulateException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/Accumulator.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/Accumulator.java
new file mode 100644
index 0000000000..c98624b03c
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/Accumulator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Values accumulator. Implementation should NOT be thead-safe.
+ *
+ * @param <R> Result value type.
+ */
+public interface Accumulator<R> {
+    /**
+     * Accumulate provided value.
+     *
+     * @param item Item from metastore.
+     */
+    void accumulate(Entry item);
+
+    /**
+     * Returns all accumulated values transformed to required type.
+     *
+     * @return All accumulated values transformed to required type.
+     * @throws AccumulateException in case when accumulation or transformation 
failed.
+     */
+    R get() throws AccumulateException;
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/EntrySubscriber.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/EntrySubscriber.java
new file mode 100644
index 0000000000..779e3a2877
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/EntrySubscriber.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Implementation of {@link Subscriber} based on {@link Entry}.
+ *
+ * @param <R> Result value type.
+ */
+public class EntrySubscriber<R> implements Subscriber<Entry> {
+    private final CompletableFuture<R> result;
+
+    private final Accumulator<R> accumulator;
+
+    /**
+     * Constructor.
+     *
+     * @param result Result future.
+     * @param accumulator Values accumulator.
+     */
+    public EntrySubscriber(CompletableFuture<R> result, Accumulator<R> 
accumulator) {
+        this.result = result;
+        this.accumulator = accumulator;
+    }
+
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        subscription.request(Long.MAX_VALUE);
+    }
+
+    @Override
+    public void onNext(Entry item) {
+        accumulator.accumulate(item);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        result.completeExceptionally(throwable);
+    }
+
+    @Override
+    public void onComplete() {
+        try {
+            result.complete(accumulator.get());
+        } catch (AccumulateException e) {
+            result.completeExceptionally(e.getCause());
+        }
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/SortedListAccumulator.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/SortedListAccumulator.java
new file mode 100644
index 0000000000..f31e90a8f9
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/SortedListAccumulator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Plain list accumulator. The resulted list will be sorted.
+ *
+ * @param <T> Result value type.
+ */
+public class SortedListAccumulator<T extends Comparable<T>> implements 
Accumulator<List<T>> {
+    private final Function<Entry, T> mapper;
+
+    private final List<T> result = new ArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param mapper Value mapper.
+     */
+    public SortedListAccumulator(Function<Entry, T> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void accumulate(Entry value) {
+        T apply = mapper.apply(value);
+        if (apply != null) {
+            result.add(apply);
+        }
+    }
+
+    @Override
+    public List<T> get() throws AccumulateException {
+        Collections.sort(result);
+        return result;
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
new file mode 100644
index 0000000000..db4b2d4686
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.internal.deployunit.UnitMeta;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Unit status accumulator.
+ */
+public class UnitStatusAccumulator implements Accumulator<UnitStatus> {
+    private final String id;
+
+    private UnitStatusBuilder builder;
+
+    /**
+     * Constructor.
+     *
+     * @param id Identifier of required unit.
+     */
+    public UnitStatusAccumulator(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public void accumulate(Entry item) {
+        if (builder == null) {
+            builder = UnitStatus.builder(id);
+        }
+        UnitMeta deserialize = UnitMetaSerializer.deserialize(item.value());
+        builder.append(deserialize.version(), 
deserialize.consistentIdLocation());
+    }
+
+    @Override
+    public UnitStatus get() throws AccumulateException {
+        if (builder != null) {
+            return builder.build();
+        } else {
+            throw new AccumulateException(
+                    new DeploymentUnitNotFoundException("Unit with " + id + " 
doesn't exist."));
+        }
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
new file mode 100644
index 0000000000..e911c88981
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.internal.deployunit.UnitMeta;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Units accumulator with filtering mechanism.
+ */
+public class UnitsAccumulator implements Accumulator<List<UnitStatus>> {
+    private final Map<String, UnitStatusBuilder> map = new HashMap<>();
+
+    private final Predicate<UnitMeta> filter;
+
+    public UnitsAccumulator() {
+        this(t -> true);
+    }
+
+    public UnitsAccumulator(Predicate<UnitMeta> filter) {
+        this.filter = filter;
+    }
+
+    @Override
+    public void accumulate(Entry item) {
+        UnitMeta meta = UnitMetaSerializer.deserialize(item.value());
+        if (filter.test(meta)) {
+            map.computeIfAbsent(meta.id(), UnitStatus::builder)
+                    .append(meta.version(), meta.consistentIdLocation());
+        }
+    }
+
+    @Override
+    public List<UnitStatus> get() throws AccumulateException {
+        return 
map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toList());
+    }
+}
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
index 9f4b841110..695b187cdb 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.deployment;
 
-import static 
org.apache.ignite.internal.deployunit.UnitMetaSerializer.deserialize;
-import static 
org.apache.ignite.internal.deployunit.UnitMetaSerializer.serialize;
+import static 
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.deserialize;
+import static 
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.serialize;
 import static org.hamcrest.Matchers.is;
 
 import java.util.Arrays;
 import java.util.Collections;
 import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.deployunit.UnitMeta;
-import org.apache.ignite.internal.deployunit.UnitMetaSerializer;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
 import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.Test;
 
diff --git a/modules/rest-api/openapi/openapi.yaml 
b/modules/rest-api/openapi/openapi.yaml
index 10d9f4e941..aad540dea8 100644
--- a/modules/rest-api/openapi/openapi.yaml
+++ b/modules/rest-api/openapi/openapi.yaml
@@ -386,6 +386,35 @@ paths:
             application/problem+json:
               schema:
                 $ref: '#/components/schemas/Problem'
+  /management/v1/deployment/units/consistentId/{consistentId}:
+    get:
+      tags:
+      - deployment
+      description: Status of units which deployed on node.
+      operationId: byConsistentId
+      parameters:
+      - name: consistentId
+        in: path
+        required: true
+        schema:
+          required:
+          - "true"
+          type: string
+      responses:
+        "200":
+          description: All statutes returned successful.
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/UnitStatus'
+        "500":
+          description: Internal error.
+          content:
+            application/problem+json:
+              schema:
+                $ref: '#/components/schemas/Problem'
   /management/v1/deployment/units/{unitId}:
     delete:
       tags:
diff --git 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
index 67bdd5d3bb..4c18eec4b4 100644
--- 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
+++ 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
@@ -128,7 +128,7 @@ public interface DeploymentCodeApi {
      */
     @Operation(operationId = "units", description = "All units statutes.")
     @ApiResponse(responseCode = "200",
-            description = "All statutes returned successful.",
+            description = "All statutes returned successfully.",
             content = @Content(mediaType = APPLICATION_JSON, array = 
@ArraySchema(schema = @Schema(implementation = UnitStatusDto.class)))
     )
     @ApiResponse(responseCode = "500",
@@ -148,7 +148,7 @@ public interface DeploymentCodeApi {
      */
     @Operation(operationId = "versions", description = "All versions of unit 
with provided unit identifier.")
     @ApiResponse(responseCode = "200",
-            description = "Versions returned successful.",
+            description = "Versions returned successfully.",
             content = @Content(mediaType = APPLICATION_JSON, array = 
@ArraySchema(schema = @Schema(implementation = String.class)))
     )
     @ApiResponse(responseCode = "404",
@@ -171,7 +171,7 @@ public interface DeploymentCodeApi {
      */
     @Operation(operationId = "status", description = "Status of unit with 
provided identifier.")
     @ApiResponse(responseCode = "200",
-            description = "Status returned successful.",
+            description = "Status returned successfully.",
             content = @Content(mediaType = APPLICATION_JSON, schema = 
@Schema(implementation = UnitStatusDto.class))
     )
     @ApiResponse(responseCode = "404",
@@ -190,4 +190,25 @@ public interface DeploymentCodeApi {
     @Get("units/{unitId}/status")
     CompletableFuture<UnitStatusDto> status(
             @PathVariable("unitId") @Schema(name = "unitId", required = true) 
String unitId);
+
+    /**
+     * Find unit by node consistent id.
+     */
+    @Operation(operationId = "byConsistentId", description = "Status of units 
which deployed on node.")
+    @ApiResponse(responseCode = "200",
+            description = "All statutes returned successfully.",
+            content = @Content(mediaType = APPLICATION_JSON, array = 
@ArraySchema(schema = @Schema(implementation = UnitStatusDto.class)))
+    )
+    @ApiResponse(responseCode = "500",
+            description = "Internal error.",
+            content = @Content(mediaType = PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class))
+    )
+    @Consumes(APPLICATION_JSON)
+    @Produces({
+            APPLICATION_JSON,
+            PROBLEM_JSON
+    })
+    @Get("units/consistentId/{consistentId}")
+    CompletableFuture<Collection<UnitStatusDto>> findByConsistentId(
+            @PathVariable("consistentId") @Schema(name = "consistentId", 
required = true) String consistentId);
 }
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
index cf8fa908ff..19c382dc74 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
@@ -81,6 +81,13 @@ public class DeploymentManagementController implements 
DeploymentCodeApi {
         return 
deployment.statusAsync(unitId).thenApply(UnitStatusDto::fromUnitStatus);
     }
 
+    @Override
+    public CompletableFuture<Collection<UnitStatusDto>> 
findByConsistentId(String consistentId) {
+        return deployment.findUnitByConsistentIdAsync(consistentId)
+                .thenApply(units -> 
units.stream().map(UnitStatusDto::fromUnitStatus)
+                        .collect(Collectors.toList()));
+    }
+
     private static DeploymentUnit toDeploymentUnit(CompletedFileUpload 
unitContent) throws IOException {
         String fileName = unitContent.getFilename();
         InputStream is = unitContent.getInputStream();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index 492f0b8bc5..8aaf283ac6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -131,6 +131,36 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
         assertThat(newVersions, willBe(List.of(unit1.version)));
     }
 
+    @Test
+    public void testFindByConsistentId() throws InterruptedException {
+        String id = "test";
+        String version = "1.1.0";
+        Unit unit = deployAndVerify(id, Version.parseVersion(version), 1);
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit);
+
+        IgniteImpl node = node(1);
+        CompletableFuture<List<UnitStatus>> nodes = 
node.deployment().findUnitByConsistentIdAsync(node.name());
+        assertThat(nodes, willBe(Collections.singletonList(
+                        UnitStatus.builder(id)
+                                .append(unit.version, List.of(node.name(), 
cmg.name()))
+                                .build())
+                )
+        );
+
+        nodes = node.deployment().findUnitByConsistentIdAsync(node.name());
+        assertThat(nodes, willBe(Collections.singletonList(
+                        UnitStatus.builder(id)
+                                .append(unit.version, List.of(node.name(), 
cmg.name()))
+                                .build())
+                )
+        );
+
+        nodes = 
node.deployment().findUnitByConsistentIdAsync("not-existed-node");
+        assertThat(nodes, willBe(Collections.emptyList()));
+    }
+
     private Unit deployAndVerify(String id, Version version, int nodeIndex) {
         IgniteImpl entryNode = node(nodeIndex);
 


Reply via email to