This is an automated email from the ASF dual-hosted git repository.

amichair pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-rsa.git

commit 50fa0de23d47acc29d0e23bab3729f22be2919e4
Author: Amichai Rothman <[email protected]>
AuthorDate: Sun Mar 29 19:55:32 2026 +0300

    ARIES-2219 Fix imported services not reflecting updated service properties
---
 .../importer/TopologyManagerImport.java            | 49 ++++++++++++++++++----
 .../importer/TopologyManagerImportTest.java        | 41 +++++++++++++++---
 2 files changed, 76 insertions(+), 14 deletions(-)

diff --git 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index b57d494b..873f6415 100644
--- 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -19,12 +19,16 @@
 package org.apache.aries.rsa.topologymanager.importer;
 
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -132,27 +136,42 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
         try {
             // we have a set of all current imports, and a set of all possible 
imports (with overlap)
             Set<ImportRegistration> imported = importedServices.get(filter);
-            Set<EndpointDescription> possible = 
importPossibilities.get(filter);
-            // first we iterate over all current imports, and split them into 
two groups:
+            Set<EndpointDescription> possibleSet = 
importPossibilities.get(filter);
+            Map<EndpointDescription, EndpointDescription> possible = 
possibleSet.stream()
+                .collect(Collectors.toMap(e -> e, e -> e)); // convert to map 
for getting the value
+            // first we iterate over all current imports, and split them into 
three groups:
             // - still valid (no null references) and possible (in possible 
set)
+            // - still valid and possible but with changed properties
             // - invalid (contain null references) or no longer possible (not 
in possible set)
             // note that this part should be concurrency-safe (get every 
reference only once and don't modify anything)
             Set<EndpointDescription> valid = new HashSet<>(); // imports that 
are still valid and possible
             Set<ImportRegistration> invalid = new LinkedHashSet<>(); // 
imports that are no longer valid/possible
+            Map<ImportRegistration, EndpointDescription> updated = new 
LinkedHashMap<>(); // valid with changed props
             for (ImportRegistration reg : imported) {
                 ImportReference ref = reg.getImportReference();
                 EndpointDescription endpoint = ref == null ? null : 
ref.getImportedEndpoint();
                 // check if the currently imported endpoint is still valid and 
possible
-                if (endpoint != null && possible.contains(endpoint)) {
-                    valid.add(endpoint); // valid and possible
+                EndpointDescription pe = possible.get(endpoint); // get the 
new (maybe modified) possible endpoint
+                if (pe != null) {
+                    if (getChangedProps(endpoint.getProperties(), 
pe.getProperties()).isEmpty())
+                        valid.add(endpoint); // valid and possible
+                    else
+                        updated.put(reg, pe); // valid and possible and 
changed properties
                 } else {
                     invalid.add(reg); // invalid (reg or ref or endpoint is 
null) or no longer possible
                 }
             }
-            // now that we figured out what needs to be done, apply the changes
+            // now that we figured out what needs to be done, apply the 
changes to each group
             invalid.forEach(this::unimportRegistration); // remove 
invalid/non-possible imports
-            possible.forEach(endpoint -> { // import all possible endpoints 
that are not already imported
-                if (!valid.contains(endpoint)) {
+            updated.forEach((reg, e) -> { // update modified properties for 
existing imports
+                try {
+                    reg.update(e);
+                } catch (IllegalStateException ise) {
+                    LOG.warn("can't update closed endpoint {}", e, ise);
+                }
+            });
+            possible.keySet().forEach(endpoint -> { // import all possible 
endpoints that are not already imported
+                if (!valid.contains(endpoint) && 
!updated.containsValue(endpoint)) {
                     importService(filter, endpoint);
                 }
             });
@@ -188,7 +207,21 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
         importedServices.remove(reg);
         reg.close();
     }
-    
+
+    private static Set<String> getChangedProps(Map<String, Object> p1, 
Map<String, Object> p2) {
+        Set<String> changed = new LinkedHashSet<>();
+        for (Map.Entry<String, Object> entry : p1.entrySet()) {
+            Object v = p2.get(entry.getKey());
+            if (!Objects.deepEquals(entry.getValue(), v) || v == null && 
!p2.containsKey(entry.getKey()))
+                changed.add(entry.getKey());
+        }
+        for (String k : p2.keySet()) {
+            if (!p1.containsKey(k))
+                changed.add(k);
+        }
+        return changed;
+    }
+
     @Override
     public void endpointChanged(EndpointEvent event, String filter) {
         if (stopped) {
diff --git 
a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
 
b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
index 857729a9..af29da5c 100644
--- 
a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
+++ 
b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
@@ -22,6 +22,7 @@ import static org.easymock.EasyMock.*;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -31,11 +32,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.ImportReference;
 import org.osgi.service.remoteserviceadmin.ImportRegistration;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
 import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
 import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener;
 
@@ -52,19 +55,26 @@ public class TopologyManagerImportTest {
         return bc;
     }
 
-    private ImportRegistration mockImportRegistration(IMocksControl c, 
EndpointDescription endpoint) {
+    private ImportRegistration mockImportRegistration(IMocksControl c, 
EndpointDescription endpoint, boolean expectUpdate) {
         final ImportRegistration ireg = c.createMock(ImportRegistration.class);
         expect(ireg.getException()).andReturn(null).anyTimes();
-        expect(ireg.update(anyObject())).andReturn(true).anyTimes();
+        if (expectUpdate) {
+            expect(ireg.update(anyObject())).andReturn(true);
+        }
         ImportReference iref = c.createMock(ImportReference.class);
         expect(ireg.getImportReference()).andReturn(iref).anyTimes();
         expect(iref.getImportedEndpoint()).andReturn(endpoint).anyTimes();
         return ireg;
     }
 
-    private EndpointDescription createEndpoint() {
-        EndpointDescription endpoint = c.createMock(EndpointDescription.class);
-        final ImportRegistration ir = mockImportRegistration(c, endpoint);
+    private EndpointDescription createEndpoint(boolean expectUpdate, String 
id) {
+        HashMap<String, Object> props = new HashMap<>();
+        props.put(RemoteConstants.ENDPOINT_ID, id);
+        props.put(Constants.OBJECTCLASS, new String[]{String.class.getName()});
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "config1");
+        EndpointDescription endpoint = new EndpointDescription(props);
+
+        final ImportRegistration ir = mockImportRegistration(c, endpoint, 
expectUpdate);
         ir.close(); // must be closed
         expectLastCall().andAnswer(() -> {
             endpoints.get(endpoint).decrementAndGet();
@@ -78,6 +88,10 @@ public class TopologyManagerImportTest {
         return endpoint;
     }
 
+    private EndpointDescription createEndpoint() {
+        return createEndpoint(false, "id1");
+    }
+
     IMocksControl c;
     BundleContext bc;
     RemoteServiceAdmin rsa;
@@ -144,7 +158,7 @@ public class TopologyManagerImportTest {
         tm.endpointChanged(event, "myFilter");
         assertImports(endpoint, 1);
         tm.endpointChanged(event, "myFilter");
-        assertImports(endpoint, 1); // still one one import
+        assertImports(endpoint, 1); // still one import
     }
 
     @Test
@@ -158,4 +172,19 @@ public class TopologyManagerImportTest {
         tm.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), 
"myFilter");
         assertImports(endpoint, 0);
     }
+
+    @Test
+    public void testModifyEndpoint() throws InterruptedException {
+        EndpointDescription endpoint = createEndpoint(true,  "id1");
+        start();
+
+        tm.add(rsa);
+        tm.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), 
"myFilter");
+        assertImports(endpoint, 1);
+        Map<String, Object> props = new HashMap<>(endpoint.getProperties());
+        props.put("newProp", "newValue");
+        endpoint = new EndpointDescription(props);
+        tm.endpointChanged(new EndpointEvent(EndpointEvent.MODIFIED, 
endpoint), "myFilter");
+        assertImports(endpoint, 1);
+    }
 }

Reply via email to