This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new de0dab59f3 IGNITE-18870 Add REST API: Find units by node consistent id
(#1789)
de0dab59f3 is described below
commit de0dab59f3532910a18971197c0f69798b43461a
Author: Mikhail <[email protected]>
AuthorDate: Fri Mar 17 16:52:56 2023 +0300
IGNITE-18870 Add REST API: Find units by node consistent id (#1789)
---
.../apache/ignite/deployment/IgniteDeployment.java | 8 ++
.../internal/deployunit/DeploymentManagerImpl.java | 134 ++++++---------------
.../ignite/internal/deployunit/UnitMeta.java | 4 -
.../ignite/internal/deployunit/key/UnitKey.java | 76 ++++++++++++
.../deployunit/{ => key}/UnitMetaSerializer.java | 8 +-
.../deployunit/metastore/AccumulateException.java | 34 ++++++
.../internal/deployunit/metastore/Accumulator.java | 42 +++++++
.../deployunit/metastore/EntrySubscriber.java | 69 +++++++++++
.../metastore/SortedListAccumulator.java | 58 +++++++++
.../metastore/UnitStatusAccumulator.java | 62 ++++++++++
.../deployunit/metastore/UnitsAccumulator.java | 60 +++++++++
.../ignite/deployment/UnitMetaSerializerTest.java | 6 +-
modules/rest-api/openapi/openapi.yaml | 29 +++++
.../rest/api/deployment/DeploymentCodeApi.java | 27 ++++-
.../deployment/DeploymentManagementController.java | 7 ++
.../internal/deployment/ItDeploymentUnitTest.java | 30 +++++
16 files changed, 540 insertions(+), 114 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
index 5d8e17eab2..d74ccf8db3 100644
---
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
+++
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
@@ -93,4 +93,12 @@ public interface IgniteDeployment {
* Future will be failed if unit with specified identifier not exist.
*/
CompletableFuture<UnitStatus> statusAsync(String id);
+
+ /**
+ * Returns list with deployed units on node with provided consistent id.
+ *
+ * @param consistentId Node consistent id.
+ * @return List with deployed units on node with provided consistent id.
+ */
+ CompletableFuture<List<UnitStatus>> findUnitByConsistentIdAsync(String
consistentId);
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 683f4f6878..1d51272eff 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -22,6 +22,9 @@ import static
java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.SYNC;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.allUnits;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.withId;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -30,26 +33,20 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.put;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Flow.Subscriber;
-import java.util.concurrent.Flow.Subscription;
-import java.util.stream.Collectors;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.deployment.IgniteDeployment;
import org.apache.ignite.deployment.UnitStatus;
-import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
import org.apache.ignite.deployment.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExistsException;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
@@ -61,11 +58,14 @@ import
org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
import org.apache.ignite.internal.deployunit.message.UndeployUnitResponse;
import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.metastore.EntrySubscriber;
+import org.apache.ignite.internal.deployunit.metastore.SortedListAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.UnitStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.UnitsAccumulator;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
@@ -74,7 +74,6 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
-//TODO: rework metastorage keys IGNITE-18870
/**
* Deployment manager implementation.
*/
@@ -84,10 +83,6 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
private static final String TMP_SUFFIX = ".tmp";
- private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
-
- private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
-
/**
* Meta storage.
*/
@@ -145,8 +140,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
Objects.requireNonNull(version);
Objects.requireNonNull(deploymentUnit);
- ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" +
version.render());
-
+ ByteArray key = key(id, version.render());
UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(),
Collections.emptyList());
Operation put = put(key, UnitMetaSerializer.serialize(meta));
@@ -214,7 +208,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
checkId(id);
Objects.requireNonNull(version);
- ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+ ByteArray key = key(id, version.render());
return metaStorage.invoke(exists(key), Operations.remove(key),
Operations.noop())
.thenCompose(success -> {
@@ -246,31 +240,8 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
@Override
public CompletableFuture<List<UnitStatus>> unitsAsync() {
CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
- Map<String, UnitStatusBuilder> map = new HashMap<>();
- metaStorage.prefix(new ByteArray(UNITS_PREFIX))
- .subscribe(new Subscriber<>() {
- @Override
- public void onSubscribe(Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Entry item) {
- UnitMeta meta =
UnitMetaSerializer.deserialize(item.value());
- map.computeIfAbsent(meta.id(), UnitStatus::builder)
- .append(meta.version(),
meta.consistentIdLocation());
- }
-
- @Override
- public void onError(Throwable throwable) {
- result.completeExceptionally(throwable);
- }
-
- @Override
- public void onComplete() {
-
result.complete(map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toList()));
- }
- });
+ metaStorage.prefix(allUnits())
+ .subscribe(new EntrySubscriber<>(result, new
UnitsAccumulator()));
return result;
}
@@ -278,32 +249,13 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
public CompletableFuture<List<Version>> versionsAsync(String id) {
checkId(id);
CompletableFuture<List<Version>> result = new CompletableFuture<>();
- metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
- .subscribe(new Subscriber<>() {
- private final List<Version> list = new ArrayList<>();
-
- @Override
- public void onSubscribe(Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Entry item) {
- UnitMeta deserialize =
UnitMetaSerializer.deserialize(item.value());
- list.add(deserialize.version());
- }
-
- @Override
- public void onError(Throwable throwable) {
- result.completeExceptionally(throwable);
- }
-
- @Override
- public void onComplete() {
- Collections.sort(list);
- result.complete(list);
- }
- });
+ metaStorage.prefix(withId(id))
+ .subscribe(
+ new EntrySubscriber<>(
+ result,
+ new SortedListAccumulator<>(e ->
UnitMetaSerializer.deserialize(e.value()).version())
+ )
+ );
return result;
}
@@ -311,39 +263,23 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
public CompletableFuture<UnitStatus> statusAsync(String id) {
checkId(id);
CompletableFuture<UnitStatus> result = new CompletableFuture<>();
- metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
- .subscribe(new Subscriber<>() {
- private UnitStatusBuilder builder;
-
- @Override
- public void onSubscribe(Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Entry item) {
- if (builder == null) {
- builder = UnitStatus.builder(id);
- }
- UnitMeta deserialize =
UnitMetaSerializer.deserialize(item.value());
- builder.append(deserialize.version(),
deserialize.consistentIdLocation());
- }
+ metaStorage.prefix(withId(id))
+ .subscribe(new EntrySubscriber<>(result, new
UnitStatusAccumulator(id)));
+ return result;
+ }
- @Override
- public void onError(Throwable throwable) {
- result.completeExceptionally(throwable);
- }
+ @Override
+ public CompletableFuture<List<UnitStatus>>
findUnitByConsistentIdAsync(String consistentId) {
+ Objects.requireNonNull(consistentId);
- @Override
- public void onComplete() {
- if (builder != null) {
- result.complete(builder.build());
- } else {
- result.completeExceptionally(
- new DeploymentUnitNotFoundException("Unit
with " + id + " doesn't exist."));
- }
- }
- });
+ CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
+ metaStorage.prefix(allUnits())
+ .subscribe(
+ new EntrySubscriber<>(
+ result,
+ new UnitsAccumulator(meta ->
meta.consistentIdLocation().contains(consistentId))
+ )
+ );
return result;
}
@@ -409,7 +345,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
return CompletableFuture.failedFuture(e);
}
- ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+ ByteArray key = key(id, version);
return metaStorage.get(key)
.thenCompose(e -> {
UnitMeta prev = UnitMetaSerializer.deserialize(e.value());
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
index 5730f8cc7a..e8b119e593 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
@@ -60,10 +60,6 @@ public class UnitMeta {
this.consistentIdLocation.addAll(consistentIdLocation);
}
- public UnitMeta(String id, String name, List<String> consistentIdLocation)
{
- this(id, Version.LATEST, name, consistentIdLocation);
- }
-
/**
* Returns identifier of deployment unit.
*
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
new file mode 100644
index 0000000000..fe61f27d17
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.key;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Helper for deployment units metastore keys generation.
+ */
+public final class UnitKey {
+ private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+ private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+ private UnitKey() {
+
+ }
+
+ /**
+ * Key to find all deployment units.
+ *
+ * @return Key in {@link ByteArray} format.
+ */
+ public static ByteArray allUnits() {
+ return key(null, null);
+ }
+
+ /**
+ * Key to find all deployment units with required id.
+ *
+ * @param id Required unit id.
+ * @return Key in {@link ByteArray} format.
+ */
+ public static ByteArray withId(String id) {
+ return key(id, null);
+ }
+
+ /**
+ * Key for unit with required id and version. Only one unit should exist
with this key.
+ * Version can be not {@code null} only in case when {@param id} is not
{@code null}.
+ *
+ * @param id Required unit id.
+ * @param version Required unit version.
+ * @return Key in {@link ByteArray} format.
+ */
+ public static ByteArray key(String id, String version) {
+ StringBuilder sb = new StringBuilder(UNITS_PREFIX);
+ Encoder encoder = Base64.getEncoder();
+ if (id != null) {
+
sb.append(encoder.encodeToString(id.getBytes(StandardCharsets.UTF_8)));
+ if (version != null) {
+ sb.append(":");
+
sb.append(encoder.encodeToString(version.getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+ return new ByteArray(sb.toString());
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
similarity index 94%
rename from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java
rename to
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
index 1101342177..18f7e9b2f4 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.deployunit;
+package org.apache.ignite.internal.deployunit.key;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -25,11 +25,12 @@ import java.util.Base64.Decoder;
import java.util.Base64.Encoder;
import java.util.List;
import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.UnitMeta;
/**
* Serializer for {@link UnitMeta}.
*/
-public class UnitMetaSerializer {
+public final class UnitMetaSerializer {
private static final String SEPARATOR = ";";
/**
@@ -72,15 +73,12 @@ public class UnitMetaSerializer {
*/
public static UnitMeta deserialize(byte[] bytes) {
String s = new String(bytes, UTF_8);
-
String[] split = s.split(SEPARATOR);
Decoder decoder = Base64.getDecoder();
String id = new String(decoder.decode(split[0]), UTF_8);
-
String version = new String(decoder.decode(split[1]), UTF_8);
-
String unitName = new String(decoder.decode(split[2]), UTF_8);
List<String> ids = new ArrayList<>();
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/AccumulateException.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/AccumulateException.java
new file mode 100644
index 0000000000..96619bd601
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/AccumulateException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Throws when accumulation process finished unsuccessfully {@link
Accumulator#get()}.
+ */
+public class AccumulateException extends IgniteInternalCheckedException {
+ /**
+ * Constructor.
+ *
+ * @param cause Cause exception.
+ */
+ public AccumulateException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/Accumulator.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/Accumulator.java
new file mode 100644
index 0000000000..c98624b03c
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/Accumulator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Values accumulator. Implementation should NOT be thead-safe.
+ *
+ * @param <R> Result value type.
+ */
+public interface Accumulator<R> {
+ /**
+ * Accumulate provided value.
+ *
+ * @param item Item from metastore.
+ */
+ void accumulate(Entry item);
+
+ /**
+ * Returns all accumulated values transformed to required type.
+ *
+ * @return All accumulated values transformed to required type.
+ * @throws AccumulateException in case when accumulation or transformation
failed.
+ */
+ R get() throws AccumulateException;
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/EntrySubscriber.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/EntrySubscriber.java
new file mode 100644
index 0000000000..779e3a2877
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/EntrySubscriber.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Implementation of {@link Subscriber} based on {@link Entry}.
+ *
+ * @param <R> Result value type.
+ */
+public class EntrySubscriber<R> implements Subscriber<Entry> {
+ private final CompletableFuture<R> result;
+
+ private final Accumulator<R> accumulator;
+
+ /**
+ * Constructor.
+ *
+ * @param result Result future.
+ * @param accumulator Values accumulator.
+ */
+ public EntrySubscriber(CompletableFuture<R> result, Accumulator<R>
accumulator) {
+ this.result = result;
+ this.accumulator = accumulator;
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Entry item) {
+ accumulator.accumulate(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ result.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ try {
+ result.complete(accumulator.get());
+ } catch (AccumulateException e) {
+ result.completeExceptionally(e.getCause());
+ }
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/SortedListAccumulator.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/SortedListAccumulator.java
new file mode 100644
index 0000000000..f31e90a8f9
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/SortedListAccumulator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Plain list accumulator. The resulted list will be sorted.
+ *
+ * @param <T> Result value type.
+ */
+public class SortedListAccumulator<T extends Comparable<T>> implements
Accumulator<List<T>> {
+ private final Function<Entry, T> mapper;
+
+ private final List<T> result = new ArrayList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param mapper Value mapper.
+ */
+ public SortedListAccumulator(Function<Entry, T> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void accumulate(Entry value) {
+ T apply = mapper.apply(value);
+ if (apply != null) {
+ result.add(apply);
+ }
+ }
+
+ @Override
+ public List<T> get() throws AccumulateException {
+ Collections.sort(result);
+ return result;
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
new file mode 100644
index 0000000000..db4b2d4686
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.internal.deployunit.UnitMeta;
+import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Unit status accumulator.
+ */
+public class UnitStatusAccumulator implements Accumulator<UnitStatus> {
+ private final String id;
+
+ private UnitStatusBuilder builder;
+
+ /**
+ * Constructor.
+ *
+ * @param id Identifier of required unit.
+ */
+ public UnitStatusAccumulator(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void accumulate(Entry item) {
+ if (builder == null) {
+ builder = UnitStatus.builder(id);
+ }
+ UnitMeta deserialize = UnitMetaSerializer.deserialize(item.value());
+ builder.append(deserialize.version(),
deserialize.consistentIdLocation());
+ }
+
+ @Override
+ public UnitStatus get() throws AccumulateException {
+ if (builder != null) {
+ return builder.build();
+ } else {
+ throw new AccumulateException(
+ new DeploymentUnitNotFoundException("Unit with " + id + "
doesn't exist."));
+ }
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
new file mode 100644
index 0000000000..e911c88981
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.internal.deployunit.UnitMeta;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Units accumulator with filtering mechanism.
+ */
+public class UnitsAccumulator implements Accumulator<List<UnitStatus>> {
+ private final Map<String, UnitStatusBuilder> map = new HashMap<>();
+
+ private final Predicate<UnitMeta> filter;
+
+ public UnitsAccumulator() {
+ this(t -> true);
+ }
+
+ public UnitsAccumulator(Predicate<UnitMeta> filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public void accumulate(Entry item) {
+ UnitMeta meta = UnitMetaSerializer.deserialize(item.value());
+ if (filter.test(meta)) {
+ map.computeIfAbsent(meta.id(), UnitStatus::builder)
+ .append(meta.version(), meta.consistentIdLocation());
+ }
+ }
+
+ @Override
+ public List<UnitStatus> get() throws AccumulateException {
+ return
map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toList());
+ }
+}
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
index 9f4b841110..695b187cdb 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
@@ -17,15 +17,15 @@
package org.apache.ignite.deployment;
-import static
org.apache.ignite.internal.deployunit.UnitMetaSerializer.deserialize;
-import static
org.apache.ignite.internal.deployunit.UnitMetaSerializer.serialize;
+import static
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.deserialize;
+import static
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.serialize;
import static org.hamcrest.Matchers.is;
import java.util.Arrays;
import java.util.Collections;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.deployunit.UnitMeta;
-import org.apache.ignite.internal.deployunit.UnitMetaSerializer;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
diff --git a/modules/rest-api/openapi/openapi.yaml
b/modules/rest-api/openapi/openapi.yaml
index 10d9f4e941..aad540dea8 100644
--- a/modules/rest-api/openapi/openapi.yaml
+++ b/modules/rest-api/openapi/openapi.yaml
@@ -386,6 +386,35 @@ paths:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
+ /management/v1/deployment/units/consistentId/{consistentId}:
+ get:
+ tags:
+ - deployment
+ description: Status of units which deployed on node.
+ operationId: byConsistentId
+ parameters:
+ - name: consistentId
+ in: path
+ required: true
+ schema:
+ required:
+ - "true"
+ type: string
+ responses:
+ "200":
+ description: All statutes returned successful.
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/UnitStatus'
+ "500":
+ description: Internal error.
+ content:
+ application/problem+json:
+ schema:
+ $ref: '#/components/schemas/Problem'
/management/v1/deployment/units/{unitId}:
delete:
tags:
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
index 67bdd5d3bb..4c18eec4b4 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
@@ -128,7 +128,7 @@ public interface DeploymentCodeApi {
*/
@Operation(operationId = "units", description = "All units statutes.")
@ApiResponse(responseCode = "200",
- description = "All statutes returned successful.",
+ description = "All statutes returned successfully.",
content = @Content(mediaType = APPLICATION_JSON, array =
@ArraySchema(schema = @Schema(implementation = UnitStatusDto.class)))
)
@ApiResponse(responseCode = "500",
@@ -148,7 +148,7 @@ public interface DeploymentCodeApi {
*/
@Operation(operationId = "versions", description = "All versions of unit
with provided unit identifier.")
@ApiResponse(responseCode = "200",
- description = "Versions returned successful.",
+ description = "Versions returned successfully.",
content = @Content(mediaType = APPLICATION_JSON, array =
@ArraySchema(schema = @Schema(implementation = String.class)))
)
@ApiResponse(responseCode = "404",
@@ -171,7 +171,7 @@ public interface DeploymentCodeApi {
*/
@Operation(operationId = "status", description = "Status of unit with
provided identifier.")
@ApiResponse(responseCode = "200",
- description = "Status returned successful.",
+ description = "Status returned successfully.",
content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(implementation = UnitStatusDto.class))
)
@ApiResponse(responseCode = "404",
@@ -190,4 +190,25 @@ public interface DeploymentCodeApi {
@Get("units/{unitId}/status")
CompletableFuture<UnitStatusDto> status(
@PathVariable("unitId") @Schema(name = "unitId", required = true)
String unitId);
+
+ /**
+ * Find unit by node consistent id.
+ */
+ @Operation(operationId = "byConsistentId", description = "Status of units
which deployed on node.")
+ @ApiResponse(responseCode = "200",
+ description = "All statutes returned successfully.",
+ content = @Content(mediaType = APPLICATION_JSON, array =
@ArraySchema(schema = @Schema(implementation = UnitStatusDto.class)))
+ )
+ @ApiResponse(responseCode = "500",
+ description = "Internal error.",
+ content = @Content(mediaType = PROBLEM_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @Consumes(APPLICATION_JSON)
+ @Produces({
+ APPLICATION_JSON,
+ PROBLEM_JSON
+ })
+ @Get("units/consistentId/{consistentId}")
+ CompletableFuture<Collection<UnitStatusDto>> findByConsistentId(
+ @PathVariable("consistentId") @Schema(name = "consistentId",
required = true) String consistentId);
}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
index cf8fa908ff..19c382dc74 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
@@ -81,6 +81,13 @@ public class DeploymentManagementController implements
DeploymentCodeApi {
return
deployment.statusAsync(unitId).thenApply(UnitStatusDto::fromUnitStatus);
}
+ @Override
+ public CompletableFuture<Collection<UnitStatusDto>>
findByConsistentId(String consistentId) {
+ return deployment.findUnitByConsistentIdAsync(consistentId)
+ .thenApply(units ->
units.stream().map(UnitStatusDto::fromUnitStatus)
+ .collect(Collectors.toList()));
+ }
+
private static DeploymentUnit toDeploymentUnit(CompletedFileUpload
unitContent) throws IOException {
String fileName = unitContent.getFilename();
InputStream is = unitContent.getInputStream();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index 492f0b8bc5..8aaf283ac6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -131,6 +131,36 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
assertThat(newVersions, willBe(List.of(unit1.version)));
}
+ @Test
+ public void testFindByConsistentId() throws InterruptedException {
+ String id = "test";
+ String version = "1.1.0";
+ Unit unit = deployAndVerify(id, Version.parseVersion(version), 1);
+
+ IgniteImpl cmg = cluster.node(0);
+ waitUnitReplica(cmg, unit);
+
+ IgniteImpl node = node(1);
+ CompletableFuture<List<UnitStatus>> nodes =
node.deployment().findUnitByConsistentIdAsync(node.name());
+ assertThat(nodes, willBe(Collections.singletonList(
+ UnitStatus.builder(id)
+ .append(unit.version, List.of(node.name(),
cmg.name()))
+ .build())
+ )
+ );
+
+ nodes = node.deployment().findUnitByConsistentIdAsync(node.name());
+ assertThat(nodes, willBe(Collections.singletonList(
+ UnitStatus.builder(id)
+ .append(unit.version, List.of(node.name(),
cmg.name()))
+ .build())
+ )
+ );
+
+ nodes =
node.deployment().findUnitByConsistentIdAsync("not-existed-node");
+ assertThat(nodes, willBe(Collections.emptyList()));
+ }
+
private Unit deployAndVerify(String id, Version version, int nodeIndex) {
IgniteImpl entryNode = node(nodeIndex);