bbende commented on code in PR #9017:
URL: https://github.com/apache/nifi/pull/9017#discussion_r1664622395


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java:
##########
@@ -0,0 +1,504 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.nar;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.NarSummaryDTO;
+import org.apache.nifi.web.api.entity.NarSummariesEntity;
+import org.apache.nifi.web.api.entity.NarSummaryEntity;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HexFormat;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+public class StandardNarManager implements NarManager, InitializingBean, 
DisposableBean {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardNarManager.class);
+
+    private static final Duration MAX_WAIT_TIME_FOR_CLUSTER_COORDINATOR = 
Duration.ofSeconds(60);
+    private static final Duration MAX_WAIT_TIME_FOR_NARS = 
Duration.ofMinutes(5);
+
+    private final ClusterCoordinator clusterCoordinator;
+    private final ExtensionManager extensionManager;
+    private final ControllerServiceProvider controllerServiceProvider;
+    private final NarPersistenceProvider persistenceProvider;
+    private final NarComponentManager narComponentManager;
+    private final NarLoader narLoader;
+    private final WebClientService webClientService;
+    private final SSLContext sslContext;
+
+    private final Map<String, NarNode> narNodesById = new 
ConcurrentHashMap<>();
+    private final Map<String, Future<?>> installFuturesById = new 
ConcurrentHashMap<>();
+    private final ExecutorService installExecutorService;
+    private final ExecutorService deleteExecutorService;
+
+    public StandardNarManager(final FlowController flowController,
+                              final ClusterCoordinator clusterCoordinator,
+                              final NarComponentManager narComponentManager,
+                              final NarLoader narLoader,
+                              final WebClientService webClientService,
+                              final SSLContext sslContext) {
+        this.clusterCoordinator = clusterCoordinator;
+        this.extensionManager = flowController.getExtensionManager();
+        this.controllerServiceProvider = 
flowController.getControllerServiceProvider();
+        this.persistenceProvider = flowController.getNarPersistenceProvider();
+        this.narComponentManager = narComponentManager;
+        this.narLoader = narLoader;
+        this.webClientService = webClientService;
+        this.sslContext = sslContext;
+        this.installExecutorService = Executors.newSingleThreadExecutor();
+        this.deleteExecutorService = Executors.newSingleThreadExecutor();
+    }
+
+    // This serves two purposes...
+    // 1. Any previously stored NARs need to have their extensions loaded and 
made available for use during start up since they won't be in any of the 
standard NAR directories
+    // 2. NarLoader keeps track of NARs that were missing dependencies to 
consider them on future loads, so this restores state that may have been lost 
on a restart
+    @Override
+    public void afterPropertiesSet() throws IOException {
+        final Collection<NarPersistenceInfo> narInfos = loadExistingNars();
+        restoreState(narInfos);
+    }
+
+    private Collection<NarPersistenceInfo> loadExistingNars() throws 
IOException {
+        final Collection<NarPersistenceInfo> narInfos = 
persistenceProvider.getAllNarInfo();
+        LOGGER.info("Initializing NAR Manager, loading {} previously stored 
NARs", narInfos.size());
+
+        final Collection<File> narFiles = narInfos.stream()
+                .map(NarPersistenceInfo::getNarFile)
+                .collect(Collectors.toSet());
+        narLoader.load(narFiles);
+        return narInfos;
+    }
+
+    private void restoreState(final Collection<NarPersistenceInfo> narInfos) {
+        for (final NarPersistenceInfo narInfo : narInfos) {
+            try {
+                final NarNode narNode = restoreNarNode(narInfo);
+                narNodesById.put(narNode.getIdentifier(), narNode);
+                LOGGER.debug("Restored NAR [{}] with state [{}] and identifier 
[{}]",
+                        narNode.getManifest().getCoordinate(), 
narNode.getState(), narNode.getIdentifier());
+            } catch (final Exception e) {
+                LOGGER.warn("Failed to restore NAR for [{}]", 
narInfo.getNarFile().getAbsolutePath(), e);
+            }
+        }
+    }
+
+    private NarNode restoreNarNode(final NarPersistenceInfo narInfo) throws 
IOException {
+        final File narFile = narInfo.getNarFile();
+        final NarManifest manifest = NarManifest.fromNarFile(narFile);
+        final BundleCoordinate coordinate = manifest.getCoordinate();
+        final String identifier = createIdentifier(coordinate);
+        final NarState state = determineNarState(manifest);
+        final String narDigest = computeNarDigest(narFile);
+
+        return NarNode.builder()
+                .identifier(identifier)
+                .narFile(narFile)
+                .narFileHexDigest(narDigest)
+                .manifest(manifest)
+                
.source(NarSource.valueOf(narInfo.getNarProperties().getSourceType()))
+                .sourceIdentifier(narInfo.getNarProperties().getSourceId())
+                .state(state)
+                .build();
+    }
+
+    @Override
+    public void destroy() {
+        shutdownExecutor(installExecutorService, "Forcing shutdown of NAR 
Manager Upload Executor Service");
+        shutdownExecutor(deleteExecutorService, "Forcing shutdown of NAR 
Manager Delete Executor Service");
+    }
+
+    private void shutdownExecutor(final ExecutorService executorService, final 
String interruptedMessage) {
+        executorService.shutdown();
+        try {
+            if (!executorService.awaitTermination(5000, MILLISECONDS)) {
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException ignore) {
+            LOGGER.info(interruptedMessage);
+            executorService.shutdownNow();
+        }
+    }
+
+    @Override
+    public NarNode installNar(final NarInstallRequest installRequest) throws 
IOException {
+        return installNar(installRequest, true);
+    }
+
+    private NarNode installNar(final NarInstallRequest installRequest, final 
boolean async) throws IOException {
+        final InputStream inputStream = installRequest.getInputStream();
+        final File tempNarFile = 
persistenceProvider.createTempFile(inputStream);
+        try {
+            return installNar(installRequest, tempNarFile, async);
+        } finally {
+            if (tempNarFile.exists() && !tempNarFile.delete()) {
+                LOGGER.warn("Failed to delete temp NAR file at [{}], file must 
be cleaned up manually", tempNarFile.getAbsolutePath());
+            }
+        }
+    }
+
+    // The outer install method is not synchronized since copying the stream 
to the temp file make take a long time, so we
+    // synchronize here after already having the temp file to ensure only one 
request is checked and submitted for installing
+    private synchronized NarNode installNar(final NarInstallRequest 
installRequest, final File tempNarFile, final boolean async) throws IOException 
{
+        final NarManifest manifest = getNarManifest(tempNarFile);
+        final BundleCoordinate coordinate = manifest.getCoordinate();
+
+        final Bundle existingBundle = extensionManager.getBundle(coordinate);
+        if (existingBundle != null && !persistenceProvider.exists(coordinate)) 
{
+            throw new IllegalStateException("A NAR is already registered with 
the same group, id, and version, " +
+                    "and can not be replaced because it is not part of the NAR 
Manager");
+        }
+
+        final Set<Bundle> bundlesWithMatchingDependency = 
extensionManager.getDependentBundles(coordinate);
+        if (!bundlesWithMatchingDependency.isEmpty()) {
+            throw new IllegalStateException("Unable to replace NAR because it 
is a dependency of other NARs");
+        }
+
+        final NarPersistenceContext persistenceContext = 
NarPersistenceContext.builder()
+                .manifest(manifest)
+                .source(installRequest.getSource())
+                .sourceIdentifier(installRequest.getSourceIdentifier())
+                .clusterCoordinator(clusterCoordinator != null && 
clusterCoordinator.isActiveClusterCoordinator())
+                .build();
+
+        final NarPersistenceInfo narPersistenceInfo = 
persistenceProvider.saveNar(persistenceContext, tempNarFile);
+
+        final File narFile = narPersistenceInfo.getNarFile();
+        final String identifier = createIdentifier(coordinate);
+        final String narDigest = computeNarDigest(narFile);
+
+        final NarNode narNode = NarNode.builder()
+                .identifier(identifier)
+                .narFile(narFile)
+                .narFileHexDigest(narDigest)
+                .manifest(manifest)
+                .source(installRequest.getSource())
+                .sourceIdentifier(installRequest.getSourceIdentifier())
+                .state(NarState.WAITING_TO_INSTALL)
+                .build();
+        narNodesById.put(identifier, narNode);
+
+        final NarInstallTask installTask = createInstallTask(narNode);
+        if (async) {
+            LOGGER.info("Submitting install task for NAR with id [{}] and 
coordinate [{}]", identifier, coordinate);
+            final Future<?> installTaskFuture = 
installExecutorService.submit(installTask);
+            installFuturesById.put(identifier, installTaskFuture);
+        } else {
+            LOGGER.info("Synchronously installing NAR with id [{}] and 
coordinate [{}]", identifier, coordinate);
+            installTask.run();
+        }
+        return narNode;
+    }
+
+    @Override
+    public void completeInstall(final String identifier) {
+        LOGGER.info("Completed install for NAR [{}]", identifier);
+        installFuturesById.remove(identifier);
+    }
+
+    @Override
+    public synchronized void updateState(final BundleCoordinate coordinate, 
final NarState narState) {
+        final NarNode narNode = narNodesById.values().stream()
+                .filter(n -> 
n.getManifest().getCoordinate().equals(coordinate))
+                .findFirst()
+                .orElseThrow(() -> new NarNotFoundException(coordinate));
+        narNode.setState(narState);
+    }
+
+    @Override
+    public Collection<NarNode> getNars() {
+        return new ArrayList<>(narNodesById.values());
+    }
+
+    @Override
+    public Optional<NarNode> getNar(final String identifier) {
+        final NarNode narNode = narNodesById.get(identifier);
+        return Optional.ofNullable(narNode);
+    }
+
+    @Override
+    public synchronized void verifyDeleteNar(final String identifier, final 
boolean forceDelete) {
+        final NarNode narNode = getNarNodeOrThrowNotFound(identifier);
+
+        // Always allow deletion of a NAR that is not fully installed
+        if (narNode.getState() != NarState.INSTALLED) {
+            return;
+        }
+
+        final BundleCoordinate coordinate = 
narNode.getManifest().getCoordinate();
+        final Set<Bundle> bundlesWithMatchingDependency = 
extensionManager.getDependentBundles(coordinate);
+        if (!bundlesWithMatchingDependency.isEmpty()) {
+            throw new IllegalStateException("Unable to delete NAR because it 
is a dependency of other NARs");
+        }
+
+        if (!forceDelete && narComponentManager.componentsExist(coordinate)) {
+            throw new IllegalStateException("Unable to delete NAR because 
components are instantiated from this NAR");
+        }
+    }
+
+    @Override
+    public synchronized NarNode deleteNar(final String identifier) throws 
IOException {
+        final NarNode narNode = getNarNodeOrThrowNotFound(identifier);
+        final BundleCoordinate coordinate = 
narNode.getManifest().getCoordinate();
+        LOGGER.info("Deleting NAR with id [{}] and coordinate [{}]", 
identifier, coordinate);
+
+        final Future<?> installTask = installFuturesById.remove(identifier);
+        if (installTask != null) {
+            installTask.cancel(true);
+        }
+
+        final Bundle existingBundle = extensionManager.getBundle(coordinate);
+        if (existingBundle != null) {
+            narLoader.unload(existingBundle);
+        }
+
+        deleteExecutorService.submit(() -> {
+            LOGGER.info("Unloading components for deleting NAR with id [{}] 
and coordinate [{}]", identifier, coordinate);
+            final StandardStoppedComponents stoppedComponents = new 
StandardStoppedComponents(controllerServiceProvider);
+            narComponentManager.unloadComponents(coordinate, 
stoppedComponents);
+            LOGGER.info("Completed unloading components for deleting NAR with 
id [{}] and coordinate [{}]", identifier, coordinate);
+        });
+
+        persistenceProvider.deleteNar(coordinate);
+        narNodesById.remove(identifier);
+
+        return narNode;
+    }
+
+    @Override
+    public synchronized InputStream readNar(final String identifier) {
+        final NarNode narNode = getNarNodeOrThrowNotFound(identifier);
+        final BundleCoordinate coordinate = 
narNode.getManifest().getCoordinate();
+        try {
+            return persistenceProvider.readNar(coordinate);
+        } catch (final FileNotFoundException e) {
+            throw new NarNotFoundException(coordinate);
+        }
+    }
+
+    @Override
+    public synchronized void syncWithClusterCoordinator() {
+        if (clusterCoordinator == null) {
+            LOGGER.info("Cluster coordinator is null, will not sync NARs");
+            return;
+        }
+
+        // This sync method is called from the method that loads the flow from 
a connection response, which means there must already be a cluster coordinator 
to
+        // have gotten a response from, but during testing there were cases 
where calling clusterCoordinator.getElectedActiveCoordinatorNode() was still 
null, so
+        // the helper method here will keep checking for the identifier up to 
a certain threshold to avoid slight timing issues
+        final NodeIdentifier coordinatorNodeId = 
getElectedActiveCoordinatorNode();
+        if (coordinatorNodeId == null) {
+            LOGGER.warn("Unable to obtain the node identifier for the cluster 
coordinator, will not sync NARs");
+            return;
+        }
+
+        LOGGER.info("Determined cluster coordinator is at {}", 
coordinatorNodeId);
+        if (clusterCoordinator.isActiveClusterCoordinator()) {
+            LOGGER.info("Current node is the cluster coordinator, will not 
sync NARs");
+            return;
+        }
+
+        LOGGER.info("Synchronizing NARs with cluster coordinator");
+        final String coordinatorAddress = coordinatorNodeId.getApiAddress();
+        final int coordinatorPort = coordinatorNodeId.getApiPort();
+        final NarRestApiClient narRestApiClient = new 
NarRestApiClient(webClientService, coordinatorAddress, coordinatorPort, 
sslContext != null);
+
+        final int localNarCountBeforeSync = narNodesById.size();
+        try {
+            // This node may try to retrieve the summaries from the 
coordinator while the coordinator still hasn't finished initializing its flow 
controller and
+            // the response will be a 409, so the helper method here will 
catch any retryable exceptions and retry the request up to a configured 
threshold
+            final NarSummariesEntity narSummaries = 
getNarSummariesFromCoordinator(narRestApiClient);
+            if (narSummaries == null) {
+                LOGGER.error("Unable to retrieve listing of NARs from cluster 
coordinator within the maximum amount of time, will not sync NARs");
+                return;
+            }
+
+            LOGGER.info("Cluster coordinator returned {} NAR summaries", 
narSummaries.getNarSummaries().size());
+
+            for (final NarSummaryEntity narSummaryEntity : 
narSummaries.getNarSummaries()) {
+                final NarSummaryDTO narSummaryDTO = 
narSummaryEntity.getNarSummary();
+                final String coordinatorNarId = narSummaryDTO.getIdentifier();
+                final String coordinatorNarDigest = narSummaryDTO.getDigest();
+                final NarNode matchingNar = narNodesById.get(coordinatorNarId);
+                if (matchingNar == null) {
+                    LOGGER.info("Coordinator has NAR [{}] which does not exist 
locally, will download", coordinatorNarId);
+                    downloadNar(narRestApiClient, narSummaryDTO);
+                } else if 
(!coordinatorNarDigest.equals(matchingNar.getNarFileHexDigest())) {
+                    LOGGER.info("Coordinator has NAR [{}] which exists locally 
with a different digest, will download", coordinatorNarId);
+                    downloadNar(narRestApiClient, narSummaryDTO);
+                } else {
+                    LOGGER.info("Coordinator has NAR [{}] which exists locally 
with a matching digest, will not download", coordinatorNarId);
+                }
+            }
+        } catch (final Exception e) {
+            // if the current node has existing NARs and the sync fails then 
we throw an exception to fail start up, otherwise we don't know if the digests 
of the local NARs
+            // match with the coordinator which could result in joining the 
cluster and running slightly different code on one node
+            // if the current node has no existing NARs then we can let the 
node proceed and attempt to join the cluster because maybe the cluster 
coordinator had no NARs anyway,
+            // and if it did then flow synchronization will fail after this 
because the local flow will have ghosted components that are not ghosted in the 
cluster
+            if (localNarCountBeforeSync > 0) {
+                throw new RuntimeException("Failed to sync NARs from cluster 
coordinator", e);
+            } else {
+                LOGGER.error("Failed to sync NARs from cluster coordinator, no 
NARs exist locally, will proceed", e);
+            }
+        }
+    }
+
+    private void downloadNar(final NarRestApiClient narRestApiClient, final 
NarSummaryDTO narSummary) throws IOException {
+        try (final InputStream coordinatorNarInputStream = 
narRestApiClient.downloadNar(narSummary.getIdentifier())) {
+            final NarInstallRequest installRequest = 
NarInstallRequest.builder()
+                    .source(NarSource.valueOf(narSummary.getSourceType()))
+                    .sourceIdentifier(narSummary.getSourceIdentifier())
+                    .inputStream(coordinatorNarInputStream)
+                    .build();
+            installNar(installRequest, false);
+        }
+    }
+
+    private NarSummariesEntity getNarSummariesFromCoordinator(final 
NarRestApiClient narRestApiClient) {
+        final Instant waitUntilInstant = 
Instant.ofEpochMilli(System.currentTimeMillis() + 
MAX_WAIT_TIME_FOR_NARS.toMillis());
+        while (System.currentTimeMillis() < waitUntilInstant.toEpochMilli()) {
+            final NarSummariesEntity narSummaries = 
listNarSummaries(narRestApiClient);
+            if (narSummaries != null) {
+                return narSummaries;
+            }
+            LOGGER.info("Unable to retrieve NAR summaries from cluster 
coordinator, will retry until [{}]", waitUntilInstant);
+            sleep(Duration.ofSeconds(5));
+        }
+        return null;
+    }
+
+    private NarSummariesEntity listNarSummaries(final NarRestApiClient 
narRestApiClient) {
+        try {
+            return narRestApiClient.listNarSummaries();
+        } catch (final NarRestApiRetryableException e) {
+            LOGGER.warn("[{}], will retry", e.getMessage());
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("", e);
+            }

Review Comment:
   The reason I did this was because it is normal that the listing will need to 
be retied a couple of times while possibly waiting for the coordinator to be 
ready to serve requests, so printing a huge stack trace each time makes it look 
like a major problem when it most likely isn't. I'll probably remove this since 
it is likely not needed.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java:
##########
@@ -0,0 +1,504 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.nar;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.NarSummaryDTO;
+import org.apache.nifi.web.api.entity.NarSummariesEntity;
+import org.apache.nifi.web.api.entity.NarSummaryEntity;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HexFormat;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+public class StandardNarManager implements NarManager, InitializingBean, 
DisposableBean {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardNarManager.class);
+
+    private static final Duration MAX_WAIT_TIME_FOR_CLUSTER_COORDINATOR = 
Duration.ofSeconds(60);
+    private static final Duration MAX_WAIT_TIME_FOR_NARS = 
Duration.ofMinutes(5);
+
+    private final ClusterCoordinator clusterCoordinator;
+    private final ExtensionManager extensionManager;
+    private final ControllerServiceProvider controllerServiceProvider;
+    private final NarPersistenceProvider persistenceProvider;
+    private final NarComponentManager narComponentManager;
+    private final NarLoader narLoader;
+    private final WebClientService webClientService;
+    private final SSLContext sslContext;
+
+    private final Map<String, NarNode> narNodesById = new 
ConcurrentHashMap<>();
+    private final Map<String, Future<?>> installFuturesById = new 
ConcurrentHashMap<>();
+    private final ExecutorService installExecutorService;
+    private final ExecutorService deleteExecutorService;
+
+    public StandardNarManager(final FlowController flowController,
+                              final ClusterCoordinator clusterCoordinator,
+                              final NarComponentManager narComponentManager,
+                              final NarLoader narLoader,
+                              final WebClientService webClientService,
+                              final SSLContext sslContext) {
+        this.clusterCoordinator = clusterCoordinator;
+        this.extensionManager = flowController.getExtensionManager();
+        this.controllerServiceProvider = 
flowController.getControllerServiceProvider();
+        this.persistenceProvider = flowController.getNarPersistenceProvider();
+        this.narComponentManager = narComponentManager;
+        this.narLoader = narLoader;
+        this.webClientService = webClientService;
+        this.sslContext = sslContext;
+        this.installExecutorService = Executors.newSingleThreadExecutor();
+        this.deleteExecutorService = Executors.newSingleThreadExecutor();
+    }
+
+    // This serves two purposes...
+    // 1. Any previously stored NARs need to have their extensions loaded and 
made available for use during start up since they won't be in any of the 
standard NAR directories
+    // 2. NarLoader keeps track of NARs that were missing dependencies to 
consider them on future loads, so this restores state that may have been lost 
on a restart
+    @Override
+    public void afterPropertiesSet() throws IOException {
+        final Collection<NarPersistenceInfo> narInfos = loadExistingNars();
+        restoreState(narInfos);
+    }
+
+    private Collection<NarPersistenceInfo> loadExistingNars() throws 
IOException {
+        final Collection<NarPersistenceInfo> narInfos = 
persistenceProvider.getAllNarInfo();
+        LOGGER.info("Initializing NAR Manager, loading {} previously stored 
NARs", narInfos.size());
+
+        final Collection<File> narFiles = narInfos.stream()
+                .map(NarPersistenceInfo::getNarFile)
+                .collect(Collectors.toSet());
+        narLoader.load(narFiles);
+        return narInfos;
+    }
+
+    private void restoreState(final Collection<NarPersistenceInfo> narInfos) {
+        for (final NarPersistenceInfo narInfo : narInfos) {
+            try {
+                final NarNode narNode = restoreNarNode(narInfo);
+                narNodesById.put(narNode.getIdentifier(), narNode);
+                LOGGER.debug("Restored NAR [{}] with state [{}] and identifier 
[{}]",
+                        narNode.getManifest().getCoordinate(), 
narNode.getState(), narNode.getIdentifier());
+            } catch (final Exception e) {
+                LOGGER.warn("Failed to restore NAR for [{}]", 
narInfo.getNarFile().getAbsolutePath(), e);
+            }
+        }
+    }
+
+    private NarNode restoreNarNode(final NarPersistenceInfo narInfo) throws 
IOException {
+        final File narFile = narInfo.getNarFile();
+        final NarManifest manifest = NarManifest.fromNarFile(narFile);
+        final BundleCoordinate coordinate = manifest.getCoordinate();
+        final String identifier = createIdentifier(coordinate);
+        final NarState state = determineNarState(manifest);
+        final String narDigest = computeNarDigest(narFile);
+
+        return NarNode.builder()
+                .identifier(identifier)
+                .narFile(narFile)
+                .narFileHexDigest(narDigest)
+                .manifest(manifest)
+                
.source(NarSource.valueOf(narInfo.getNarProperties().getSourceType()))
+                .sourceIdentifier(narInfo.getNarProperties().getSourceId())
+                .state(state)
+                .build();
+    }
+
+    @Override
+    public void destroy() {
+        shutdownExecutor(installExecutorService, "Forcing shutdown of NAR 
Manager Upload Executor Service");
+        shutdownExecutor(deleteExecutorService, "Forcing shutdown of NAR 
Manager Delete Executor Service");
+    }
+
+    private void shutdownExecutor(final ExecutorService executorService, final 
String interruptedMessage) {
+        executorService.shutdown();
+        try {
+            if (!executorService.awaitTermination(5000, MILLISECONDS)) {
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException ignore) {
+            LOGGER.info(interruptedMessage);
+            executorService.shutdownNow();
+        }
+    }
+
+    @Override
+    public NarNode installNar(final NarInstallRequest installRequest) throws 
IOException {
+        return installNar(installRequest, true);
+    }
+
+    private NarNode installNar(final NarInstallRequest installRequest, final 
boolean async) throws IOException {
+        final InputStream inputStream = installRequest.getInputStream();
+        final File tempNarFile = 
persistenceProvider.createTempFile(inputStream);
+        try {
+            return installNar(installRequest, tempNarFile, async);
+        } finally {
+            if (tempNarFile.exists() && !tempNarFile.delete()) {
+                LOGGER.warn("Failed to delete temp NAR file at [{}], file must 
be cleaned up manually", tempNarFile.getAbsolutePath());
+            }
+        }
+    }
+
+    // The outer install method is not synchronized since copying the stream 
to the temp file make take a long time, so we
+    // synchronize here after already having the temp file to ensure only one 
request is checked and submitted for installing
+    private synchronized NarNode installNar(final NarInstallRequest 
installRequest, final File tempNarFile, final boolean async) throws IOException 
{
+        final NarManifest manifest = getNarManifest(tempNarFile);
+        final BundleCoordinate coordinate = manifest.getCoordinate();
+
+        final Bundle existingBundle = extensionManager.getBundle(coordinate);
+        if (existingBundle != null && !persistenceProvider.exists(coordinate)) 
{
+            throw new IllegalStateException("A NAR is already registered with 
the same group, id, and version, " +
+                    "and can not be replaced because it is not part of the NAR 
Manager");
+        }
+
+        final Set<Bundle> bundlesWithMatchingDependency = 
extensionManager.getDependentBundles(coordinate);
+        if (!bundlesWithMatchingDependency.isEmpty()) {
+            throw new IllegalStateException("Unable to replace NAR because it 
is a dependency of other NARs");
+        }
+
+        final NarPersistenceContext persistenceContext = 
NarPersistenceContext.builder()
+                .manifest(manifest)
+                .source(installRequest.getSource())
+                .sourceIdentifier(installRequest.getSourceIdentifier())
+                .clusterCoordinator(clusterCoordinator != null && 
clusterCoordinator.isActiveClusterCoordinator())
+                .build();
+
+        final NarPersistenceInfo narPersistenceInfo = 
persistenceProvider.saveNar(persistenceContext, tempNarFile);
+
+        final File narFile = narPersistenceInfo.getNarFile();
+        final String identifier = createIdentifier(coordinate);
+        final String narDigest = computeNarDigest(narFile);
+
+        final NarNode narNode = NarNode.builder()
+                .identifier(identifier)
+                .narFile(narFile)
+                .narFileHexDigest(narDigest)
+                .manifest(manifest)
+                .source(installRequest.getSource())
+                .sourceIdentifier(installRequest.getSourceIdentifier())
+                .state(NarState.WAITING_TO_INSTALL)
+                .build();
+        narNodesById.put(identifier, narNode);
+
+        final NarInstallTask installTask = createInstallTask(narNode);
+        if (async) {
+            LOGGER.info("Submitting install task for NAR with id [{}] and 
coordinate [{}]", identifier, coordinate);
+            final Future<?> installTaskFuture = 
installExecutorService.submit(installTask);
+            installFuturesById.put(identifier, installTaskFuture);
+        } else {
+            LOGGER.info("Synchronously installing NAR with id [{}] and 
coordinate [{}]", identifier, coordinate);
+            installTask.run();
+        }
+        return narNode;
+    }
+
+    @Override
+    public void completeInstall(final String identifier) {
+        LOGGER.info("Completed install for NAR [{}]", identifier);
+        installFuturesById.remove(identifier);
+    }
+
+    @Override
+    public synchronized void updateState(final BundleCoordinate coordinate, 
final NarState narState) {
+        final NarNode narNode = narNodesById.values().stream()
+                .filter(n -> 
n.getManifest().getCoordinate().equals(coordinate))
+                .findFirst()
+                .orElseThrow(() -> new NarNotFoundException(coordinate));
+        narNode.setState(narState);
+    }
+
+    @Override
+    public Collection<NarNode> getNars() {
+        return new ArrayList<>(narNodesById.values());
+    }
+
+    @Override
+    public Optional<NarNode> getNar(final String identifier) {
+        final NarNode narNode = narNodesById.get(identifier);
+        return Optional.ofNullable(narNode);
+    }
+
+    @Override
+    public synchronized void verifyDeleteNar(final String identifier, final 
boolean forceDelete) {
+        final NarNode narNode = getNarNodeOrThrowNotFound(identifier);
+
+        // Always allow deletion of a NAR that is not fully installed
+        if (narNode.getState() != NarState.INSTALLED) {
+            return;
+        }
+
+        final BundleCoordinate coordinate = 
narNode.getManifest().getCoordinate();
+        final Set<Bundle> bundlesWithMatchingDependency = 
extensionManager.getDependentBundles(coordinate);
+        if (!bundlesWithMatchingDependency.isEmpty()) {
+            throw new IllegalStateException("Unable to delete NAR because it 
is a dependency of other NARs");
+        }
+
+        if (!forceDelete && narComponentManager.componentsExist(coordinate)) {
+            throw new IllegalStateException("Unable to delete NAR because 
components are instantiated from this NAR");
+        }
+    }
+
+    @Override
+    public synchronized NarNode deleteNar(final String identifier) throws 
IOException {
+        final NarNode narNode = getNarNodeOrThrowNotFound(identifier);
+        final BundleCoordinate coordinate = 
narNode.getManifest().getCoordinate();
+        LOGGER.info("Deleting NAR with id [{}] and coordinate [{}]", 
identifier, coordinate);
+
+        final Future<?> installTask = installFuturesById.remove(identifier);
+        if (installTask != null) {
+            installTask.cancel(true);
+        }
+
+        final Bundle existingBundle = extensionManager.getBundle(coordinate);
+        if (existingBundle != null) {
+            narLoader.unload(existingBundle);
+        }
+
+        deleteExecutorService.submit(() -> {
+            LOGGER.info("Unloading components for deleting NAR with id [{}] 
and coordinate [{}]", identifier, coordinate);
+            final StandardStoppedComponents stoppedComponents = new 
StandardStoppedComponents(controllerServiceProvider);
+            narComponentManager.unloadComponents(coordinate, 
stoppedComponents);
+            LOGGER.info("Completed unloading components for deleting NAR with 
id [{}] and coordinate [{}]", identifier, coordinate);
+        });
+
+        persistenceProvider.deleteNar(coordinate);
+        narNodesById.remove(identifier);
+
+        return narNode;
+    }
+
+    @Override
+    public synchronized InputStream readNar(final String identifier) {
+        final NarNode narNode = getNarNodeOrThrowNotFound(identifier);
+        final BundleCoordinate coordinate = 
narNode.getManifest().getCoordinate();
+        try {
+            return persistenceProvider.readNar(coordinate);
+        } catch (final FileNotFoundException e) {
+            throw new NarNotFoundException(coordinate);
+        }
+    }
+
+    @Override
+    public synchronized void syncWithClusterCoordinator() {
+        if (clusterCoordinator == null) {
+            LOGGER.info("Cluster coordinator is null, will not sync NARs");
+            return;
+        }
+
+        // This sync method is called from the method that loads the flow from 
a connection response, which means there must already be a cluster coordinator 
to
+        // have gotten a response from, but during testing there were cases 
where calling clusterCoordinator.getElectedActiveCoordinatorNode() was still 
null, so
+        // the helper method here will keep checking for the identifier up to 
a certain threshold to avoid slight timing issues
+        final NodeIdentifier coordinatorNodeId = 
getElectedActiveCoordinatorNode();
+        if (coordinatorNodeId == null) {
+            LOGGER.warn("Unable to obtain the node identifier for the cluster 
coordinator, will not sync NARs");
+            return;
+        }
+
+        LOGGER.info("Determined cluster coordinator is at {}", 
coordinatorNodeId);
+        if (clusterCoordinator.isActiveClusterCoordinator()) {
+            LOGGER.info("Current node is the cluster coordinator, will not 
sync NARs");
+            return;
+        }
+
+        LOGGER.info("Synchronizing NARs with cluster coordinator");
+        final String coordinatorAddress = coordinatorNodeId.getApiAddress();
+        final int coordinatorPort = coordinatorNodeId.getApiPort();
+        final NarRestApiClient narRestApiClient = new 
NarRestApiClient(webClientService, coordinatorAddress, coordinatorPort, 
sslContext != null);
+
+        final int localNarCountBeforeSync = narNodesById.size();
+        try {
+            // This node may try to retrieve the summaries from the 
coordinator while the coordinator still hasn't finished initializing its flow 
controller and
+            // the response will be a 409, so the helper method here will 
catch any retryable exceptions and retry the request up to a configured 
threshold
+            final NarSummariesEntity narSummaries = 
getNarSummariesFromCoordinator(narRestApiClient);
+            if (narSummaries == null) {
+                LOGGER.error("Unable to retrieve listing of NARs from cluster 
coordinator within the maximum amount of time, will not sync NARs");
+                return;
+            }
+
+            LOGGER.info("Cluster coordinator returned {} NAR summaries", 
narSummaries.getNarSummaries().size());
+
+            for (final NarSummaryEntity narSummaryEntity : 
narSummaries.getNarSummaries()) {
+                final NarSummaryDTO narSummaryDTO = 
narSummaryEntity.getNarSummary();
+                final String coordinatorNarId = narSummaryDTO.getIdentifier();
+                final String coordinatorNarDigest = narSummaryDTO.getDigest();
+                final NarNode matchingNar = narNodesById.get(coordinatorNarId);
+                if (matchingNar == null) {
+                    LOGGER.info("Coordinator has NAR [{}] which does not exist 
locally, will download", coordinatorNarId);
+                    downloadNar(narRestApiClient, narSummaryDTO);
+                } else if 
(!coordinatorNarDigest.equals(matchingNar.getNarFileHexDigest())) {
+                    LOGGER.info("Coordinator has NAR [{}] which exists locally 
with a different digest, will download", coordinatorNarId);
+                    downloadNar(narRestApiClient, narSummaryDTO);
+                } else {
+                    LOGGER.info("Coordinator has NAR [{}] which exists locally 
with a matching digest, will not download", coordinatorNarId);
+                }
+            }
+        } catch (final Exception e) {
+            // if the current node has existing NARs and the sync fails then 
we throw an exception to fail start up, otherwise we don't know if the digests 
of the local NARs
+            // match with the coordinator which could result in joining the 
cluster and running slightly different code on one node
+            // if the current node has no existing NARs then we can let the 
node proceed and attempt to join the cluster because maybe the cluster 
coordinator had no NARs anyway,
+            // and if it did then flow synchronization will fail after this 
because the local flow will have ghosted components that are not ghosted in the 
cluster
+            if (localNarCountBeforeSync > 0) {
+                throw new RuntimeException("Failed to sync NARs from cluster 
coordinator", e);
+            } else {
+                LOGGER.error("Failed to sync NARs from cluster coordinator, no 
NARs exist locally, will proceed", e);
+            }
+        }
+    }
+
+    private void downloadNar(final NarRestApiClient narRestApiClient, final 
NarSummaryDTO narSummary) throws IOException {
+        try (final InputStream coordinatorNarInputStream = 
narRestApiClient.downloadNar(narSummary.getIdentifier())) {
+            final NarInstallRequest installRequest = 
NarInstallRequest.builder()
+                    .source(NarSource.valueOf(narSummary.getSourceType()))
+                    .sourceIdentifier(narSummary.getSourceIdentifier())
+                    .inputStream(coordinatorNarInputStream)
+                    .build();
+            installNar(installRequest, false);
+        }
+    }
+
+    private NarSummariesEntity getNarSummariesFromCoordinator(final 
NarRestApiClient narRestApiClient) {
+        final Instant waitUntilInstant = 
Instant.ofEpochMilli(System.currentTimeMillis() + 
MAX_WAIT_TIME_FOR_NARS.toMillis());
+        while (System.currentTimeMillis() < waitUntilInstant.toEpochMilli()) {
+            final NarSummariesEntity narSummaries = 
listNarSummaries(narRestApiClient);
+            if (narSummaries != null) {
+                return narSummaries;
+            }
+            LOGGER.info("Unable to retrieve NAR summaries from cluster 
coordinator, will retry until [{}]", waitUntilInstant);
+            sleep(Duration.ofSeconds(5));
+        }
+        return null;
+    }
+
+    private NarSummariesEntity listNarSummaries(final NarRestApiClient 
narRestApiClient) {
+        try {
+            return narRestApiClient.listNarSummaries();
+        } catch (final NarRestApiRetryableException e) {
+            LOGGER.warn("[{}], will retry", e.getMessage());
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("", e);
+            }

Review Comment:
   The reason I did this was because it is normal that the listing will need to 
be retried a couple of times while possibly waiting for the coordinator to be 
ready to serve requests, so printing a huge stack trace each time makes it look 
like a major problem when it most likely isn't. I'll probably remove this since 
it is likely not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to