This is an automated email from the ASF dual-hosted git repository. amichai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
commit 2460eba3c93feac01ef58df6ba201fc2e1f8451f Author: Amichai Rothman <[email protected]> AuthorDate: Fri Mar 13 11:43:52 2026 +0200 Refactor and clarify TopologyManagerImport updates calculation --- .../rsa/topologymanager/importer/ImportDiff.java | 72 ---------------------- .../importer/TopologyManagerImport.java | 66 +++++++++++++------- 2 files changed, 43 insertions(+), 95 deletions(-) diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java deleted file mode 100644 index 468d2350..00000000 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.aries.rsa.topologymanager.importer; - -import java.util.Objects; -import java.util.Set; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.ImportReference; -import org.osgi.service.remoteserviceadmin.ImportRegistration; - -public class ImportDiff { - private Set<EndpointDescription> possible; - private Set<ImportRegistration> imported; - - public ImportDiff(Set<EndpointDescription> possible, Set<ImportRegistration> imported) { - this.possible = possible; - this.imported = imported; - } - - public Stream<ImportRegistration> getRemoved() { - return imported.stream() - .filter(this::toRemove); - } - - public Stream<EndpointDescription> getAdded() { - Set<EndpointDescription> importedEndpoints = importedEndpoints(); - return possible.stream() - .filter(not(importedEndpoints::contains)); - } - - /** - * Checks if the import registration is not possible anymore or closed - * - * @param ireg registration to check - * @return - */ - private boolean toRemove(ImportRegistration ireg) { - ImportReference iref = ireg != null ? ireg.getImportReference() : null; - return iref == null || !possible.contains(iref.getImportedEndpoint()); - } - - private Set<EndpointDescription> importedEndpoints() { - return imported.stream() - .map(ImportRegistration::getImportReference).filter(Objects::nonNull) - .map(ImportReference::getImportedEndpoint).filter(Objects::nonNull) - .collect(Collectors.toSet()); - } - - private static <T> Predicate<T> not(Predicate<T> t) { - return t.negate(); - } -} diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index fe53007d..247d334c 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -18,13 +18,14 @@ */ package org.apache.aries.rsa.topologymanager.importer; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import org.osgi.framework.BundleContext; import org.osgi.service.remoteserviceadmin.EndpointDescription; @@ -63,7 +64,7 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi private final MultiMap<String, ImportRegistration> importedServices = new MultiMap<>(); public TopologyManagerImport(BundleContext bc) { - this.rsaSet = new CopyOnWriteArraySet<>(); + rsaSet = new CopyOnWriteArraySet<>(); bctx = bc; execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getClass())); } @@ -101,7 +102,7 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi ImportReference ref = event.getImportReference(); if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION && ref != null) { importedServices.allValues().stream() - .filter(ir -> ref.equals(ir.getImportReference())) + .filter(reg -> ref.equals(reg.getImportReference())) .forEach(this::unimportRegistration); } } @@ -115,48 +116,67 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi /** * Synchronizes the actual imports with the possible imports for the given filter, - * i.e. unimports previously imported endpoints that are no longer possible, + * i.e. un-imports previously imported endpoints that are no longer valid or possible, * and imports new possible endpoints that are not already imported. - * + * <p> * TODO but optional: if the service is already imported and the endpoint is still - * in the list of possible imports check if a "better" endpoint is now in the list + * in the list of possible imports check if a "better" endpoint is now in the list. * * @param filter the filter whose endpoints are synchronized */ private void synchronizeImports(final String filter) { try { - ImportDiff diff = new ImportDiff(importPossibilities.get(filter), importedServices.get(filter)); - diff.getRemoved() - .forEach(this::unimportRegistration); - diff.getAdded() - .flatMap(this::importService) - .forEach(ir -> importedServices.put(filter, ir)); + // we have a set of all current imports, and a set of all possible imports (with overlap) + Set<ImportRegistration> imported = importedServices.get(filter); + Set<EndpointDescription> possible = importPossibilities.get(filter); + // first we iterate over all current imports, and split them into two groups: + // - still valid (no null references) and possible (in possible set) + // - invalid (contain null references) or no longer possible (not in possible set) + // note that this part should be concurrency-safe (get every reference only once and don't modify anything) + Set<EndpointDescription> valid = new HashSet<>(); // imports that are still valid and possible + Set<ImportRegistration> invalid = new LinkedHashSet<>(); // imports that are no longer valid/possible + for (ImportRegistration reg : imported) { + ImportReference ref = reg.getImportReference(); + EndpointDescription endpoint = ref == null ? null : ref.getImportedEndpoint(); + // check if the currently imported endpoint is still valid and possible + if (endpoint != null && possible.contains(endpoint)) { + valid.add(endpoint); // valid and possible + } else { + invalid.add(reg); // invalid (reg or ref or endpoint is null) or no longer possible + } + } + // now that we figured out what needs to be done, apply the changes + invalid.forEach(this::unimportRegistration); // remove invalid/non-possible imports + possible.forEach(endpoint -> { // import all possible endpoints that are not already imported + if (!valid.contains(endpoint)) { + importService(filter, endpoint); + } + }); } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("error synchronizing imports", e); } // Notify EndpointListeners? NO! } /** - * Tries to import the service with each rsa until one import is successful. + * Tries to import the service with each RSA until one import is successful. * * @param filter the filter that matched the endpoint * @param endpoint endpoint to import - * @return */ - private Stream<ImportRegistration> importService(EndpointDescription endpoint) { + private void importService(String filter, EndpointDescription endpoint) { for (RemoteServiceAdmin rsa : rsaSet) { - ImportRegistration ir = rsa.importService(endpoint); - if (ir != null) { - if (ir.getException() == null) { - LOG.debug("Service import was successful {}", ir); - return Stream.of(ir); + ImportRegistration reg = rsa.importService(endpoint); + if (reg != null) { + if (reg.getException() == null) { + LOG.debug("Service import was successful {}", reg); + importedServices.put(filter, reg); + return; } else { - LOG.info("Error importing service {}", endpoint, ir.getException()); + LOG.info("Error importing service {}", endpoint, reg.getException()); } } } - return Stream.empty(); } private void unimportRegistration(ImportRegistration reg) {
