This is an automated email from the ASF dual-hosted git repository.

amichai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-rsa.git

commit 2460eba3c93feac01ef58df6ba201fc2e1f8451f
Author: Amichai Rothman <[email protected]>
AuthorDate: Fri Mar 13 11:43:52 2026 +0200

    Refactor and clarify TopologyManagerImport updates calculation
---
 .../rsa/topologymanager/importer/ImportDiff.java   | 72 ----------------------
 .../importer/TopologyManagerImport.java            | 66 +++++++++++++-------
 2 files changed, 43 insertions(+), 95 deletions(-)

diff --git 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
deleted file mode 100644
index 468d2350..00000000
--- 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.ImportReference;
-import org.osgi.service.remoteserviceadmin.ImportRegistration;
-
-public class ImportDiff {
-    private Set<EndpointDescription> possible;
-    private Set<ImportRegistration> imported;
-
-    public ImportDiff(Set<EndpointDescription> possible, 
Set<ImportRegistration> imported) {
-        this.possible = possible;
-        this.imported = imported;
-    }
-
-    public Stream<ImportRegistration> getRemoved() {
-        return imported.stream()
-                .filter(this::toRemove);
-    }
-
-    public Stream<EndpointDescription> getAdded() {
-        Set<EndpointDescription> importedEndpoints = importedEndpoints();
-        return possible.stream()
-                .filter(not(importedEndpoints::contains));
-    }
-    
-    /**
-     * Checks if the import registration is not possible anymore or closed
-     * 
-     * @param ireg registration to check
-     * @return
-     */
-    private boolean toRemove(ImportRegistration ireg) {
-        ImportReference iref = ireg != null ? ireg.getImportReference() : null;
-        return iref == null || !possible.contains(iref.getImportedEndpoint()); 
-    }
-
-    private Set<EndpointDescription> importedEndpoints() {
-        return imported.stream()
-            
.map(ImportRegistration::getImportReference).filter(Objects::nonNull)
-            .map(ImportReference::getImportedEndpoint).filter(Objects::nonNull)
-            .collect(Collectors.toSet());
-    }
-
-    private static <T> Predicate<T> not(Predicate<T> t) {
-        return t.negate();
-    }
-}
diff --git 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index fe53007d..247d334c 100644
--- 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -18,13 +18,14 @@
  */
 package org.apache.aries.rsa.topologymanager.importer;
 
+import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
@@ -63,7 +64,7 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
     private final MultiMap<String, ImportRegistration> importedServices = new 
MultiMap<>();
 
     public TopologyManagerImport(BundleContext bc) {
-        this.rsaSet = new CopyOnWriteArraySet<>();
+        rsaSet = new CopyOnWriteArraySet<>();
         bctx = bc;
         execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>(), new NamedThreadFactory(getClass()));
     }
@@ -101,7 +102,7 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
         ImportReference ref = event.getImportReference();
         if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION 
&& ref != null) {
             importedServices.allValues().stream()
-                .filter(ir -> ref.equals(ir.getImportReference()))
+                .filter(reg -> ref.equals(reg.getImportReference()))
                 .forEach(this::unimportRegistration);
         }
     }
@@ -115,48 +116,67 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
 
     /**
      * Synchronizes the actual imports with the possible imports for the given 
filter,
-     * i.e. unimports previously imported endpoints that are no longer 
possible,
+     * i.e. un-imports previously imported endpoints that are no longer valid 
or possible,
      * and imports new possible endpoints that are not already imported.
-     * 
+     * <p>
      * TODO but optional: if the service is already imported and the endpoint 
is still
-     * in the list of possible imports check if a "better" endpoint is now in 
the list
+     * in the list of possible imports check if a "better" endpoint is now in 
the list.
      *
      * @param filter the filter whose endpoints are synchronized
      */
     private void synchronizeImports(final String filter) {
         try {
-            ImportDiff diff = new ImportDiff(importPossibilities.get(filter), 
importedServices.get(filter));
-            diff.getRemoved()
-                .forEach(this::unimportRegistration);
-            diff.getAdded()
-                .flatMap(this::importService)
-                .forEach(ir -> importedServices.put(filter, ir));
+            // we have a set of all current imports, and a set of all possible 
imports (with overlap)
+            Set<ImportRegistration> imported = importedServices.get(filter);
+            Set<EndpointDescription> possible = 
importPossibilities.get(filter);
+            // first we iterate over all current imports, and split them into 
two groups:
+            // - still valid (no null references) and possible (in possible 
set)
+            // - invalid (contain null references) or no longer possible (not 
in possible set)
+            // note that this part should be concurrency-safe (get every 
reference only once and don't modify anything)
+            Set<EndpointDescription> valid = new HashSet<>(); // imports that 
are still valid and possible
+            Set<ImportRegistration> invalid = new LinkedHashSet<>(); // 
imports that are no longer valid/possible
+            for (ImportRegistration reg : imported) {
+                ImportReference ref = reg.getImportReference();
+                EndpointDescription endpoint = ref == null ? null : 
ref.getImportedEndpoint();
+                // check if the currently imported endpoint is still valid and 
possible
+                if (endpoint != null && possible.contains(endpoint)) {
+                    valid.add(endpoint); // valid and possible
+                } else {
+                    invalid.add(reg); // invalid (reg or ref or endpoint is 
null) or no longer possible
+                }
+            }
+            // now that we figured out what needs to be done, apply the changes
+            invalid.forEach(this::unimportRegistration); // remove 
invalid/non-possible imports
+            possible.forEach(endpoint -> { // import all possible endpoints 
that are not already imported
+                if (!valid.contains(endpoint)) {
+                    importService(filter, endpoint);
+                }
+            });
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("error synchronizing imports", e);
         }
         // Notify EndpointListeners? NO!
     }
 
     /**
-     * Tries to import the service with each rsa until one import is 
successful.
+     * Tries to import the service with each RSA until one import is 
successful.
      *
      * @param filter the filter that matched the endpoint
      * @param endpoint endpoint to import
-     * @return 
      */
-    private Stream<ImportRegistration> importService(EndpointDescription 
endpoint) {
+    private void importService(String filter, EndpointDescription endpoint) {
         for (RemoteServiceAdmin rsa : rsaSet) {
-            ImportRegistration ir = rsa.importService(endpoint);
-            if (ir != null) {
-                if (ir.getException() == null) {
-                    LOG.debug("Service import was successful {}", ir);
-                    return Stream.of(ir);
+            ImportRegistration reg = rsa.importService(endpoint);
+            if (reg != null) {
+                if (reg.getException() == null) {
+                    LOG.debug("Service import was successful {}", reg);
+                    importedServices.put(filter, reg);
+                    return;
                 } else {
-                    LOG.info("Error importing service {}", endpoint, 
ir.getException());
+                    LOG.info("Error importing service {}", endpoint, 
reg.getException());
                 }
             }
         }
-        return Stream.empty();
     }
     
     private void unimportRegistration(ImportRegistration reg) {

Reply via email to