bbende commented on code in PR #9017: URL: https://github.com/apache/nifi/pull/9017#discussion_r1664622395
########## nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.nar; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.NarSummaryDTO; +import org.apache.nifi.web.api.entity.NarSummariesEntity; +import org.apache.nifi.web.api.entity.NarSummaryEntity; +import org.apache.nifi.web.client.api.WebClientService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HexFormat; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class StandardNarManager implements NarManager, InitializingBean, DisposableBean { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardNarManager.class); + + private static final Duration MAX_WAIT_TIME_FOR_CLUSTER_COORDINATOR = Duration.ofSeconds(60); + private static final Duration MAX_WAIT_TIME_FOR_NARS = Duration.ofMinutes(5); + + private final ClusterCoordinator clusterCoordinator; + private final ExtensionManager extensionManager; + private final ControllerServiceProvider controllerServiceProvider; + private final NarPersistenceProvider persistenceProvider; + private final NarComponentManager narComponentManager; + private final NarLoader narLoader; + private final WebClientService webClientService; + private final SSLContext sslContext; + + private final Map<String, NarNode> narNodesById = new ConcurrentHashMap<>(); + private final Map<String, Future<?>> installFuturesById = new ConcurrentHashMap<>(); + private final ExecutorService installExecutorService; + private final ExecutorService deleteExecutorService; + + public StandardNarManager(final FlowController flowController, + final ClusterCoordinator clusterCoordinator, + final NarComponentManager narComponentManager, + final NarLoader narLoader, + final WebClientService webClientService, + final SSLContext sslContext) { + this.clusterCoordinator = clusterCoordinator; + this.extensionManager = flowController.getExtensionManager(); + this.controllerServiceProvider = flowController.getControllerServiceProvider(); + this.persistenceProvider = flowController.getNarPersistenceProvider(); + this.narComponentManager = narComponentManager; + this.narLoader = narLoader; + this.webClientService = webClientService; + this.sslContext = sslContext; + this.installExecutorService = Executors.newSingleThreadExecutor(); + this.deleteExecutorService = Executors.newSingleThreadExecutor(); + } + + // This serves two purposes... + // 1. Any previously stored NARs need to have their extensions loaded and made available for use during start up since they won't be in any of the standard NAR directories + // 2. NarLoader keeps track of NARs that were missing dependencies to consider them on future loads, so this restores state that may have been lost on a restart + @Override + public void afterPropertiesSet() throws IOException { + final Collection<NarPersistenceInfo> narInfos = loadExistingNars(); + restoreState(narInfos); + } + + private Collection<NarPersistenceInfo> loadExistingNars() throws IOException { + final Collection<NarPersistenceInfo> narInfos = persistenceProvider.getAllNarInfo(); + LOGGER.info("Initializing NAR Manager, loading {} previously stored NARs", narInfos.size()); + + final Collection<File> narFiles = narInfos.stream() + .map(NarPersistenceInfo::getNarFile) + .collect(Collectors.toSet()); + narLoader.load(narFiles); + return narInfos; + } + + private void restoreState(final Collection<NarPersistenceInfo> narInfos) { + for (final NarPersistenceInfo narInfo : narInfos) { + try { + final NarNode narNode = restoreNarNode(narInfo); + narNodesById.put(narNode.getIdentifier(), narNode); + LOGGER.debug("Restored NAR [{}] with state [{}] and identifier [{}]", + narNode.getManifest().getCoordinate(), narNode.getState(), narNode.getIdentifier()); + } catch (final Exception e) { + LOGGER.warn("Failed to restore NAR for [{}]", narInfo.getNarFile().getAbsolutePath(), e); + } + } + } + + private NarNode restoreNarNode(final NarPersistenceInfo narInfo) throws IOException { + final File narFile = narInfo.getNarFile(); + final NarManifest manifest = NarManifest.fromNarFile(narFile); + final BundleCoordinate coordinate = manifest.getCoordinate(); + final String identifier = createIdentifier(coordinate); + final NarState state = determineNarState(manifest); + final String narDigest = computeNarDigest(narFile); + + return NarNode.builder() + .identifier(identifier) + .narFile(narFile) + .narFileHexDigest(narDigest) + .manifest(manifest) + .source(NarSource.valueOf(narInfo.getNarProperties().getSourceType())) + .sourceIdentifier(narInfo.getNarProperties().getSourceId()) + .state(state) + .build(); + } + + @Override + public void destroy() { + shutdownExecutor(installExecutorService, "Forcing shutdown of NAR Manager Upload Executor Service"); + shutdownExecutor(deleteExecutorService, "Forcing shutdown of NAR Manager Delete Executor Service"); + } + + private void shutdownExecutor(final ExecutorService executorService, final String interruptedMessage) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5000, MILLISECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException ignore) { + LOGGER.info(interruptedMessage); + executorService.shutdownNow(); + } + } + + @Override + public NarNode installNar(final NarInstallRequest installRequest) throws IOException { + return installNar(installRequest, true); + } + + private NarNode installNar(final NarInstallRequest installRequest, final boolean async) throws IOException { + final InputStream inputStream = installRequest.getInputStream(); + final File tempNarFile = persistenceProvider.createTempFile(inputStream); + try { + return installNar(installRequest, tempNarFile, async); + } finally { + if (tempNarFile.exists() && !tempNarFile.delete()) { + LOGGER.warn("Failed to delete temp NAR file at [{}], file must be cleaned up manually", tempNarFile.getAbsolutePath()); + } + } + } + + // The outer install method is not synchronized since copying the stream to the temp file make take a long time, so we + // synchronize here after already having the temp file to ensure only one request is checked and submitted for installing + private synchronized NarNode installNar(final NarInstallRequest installRequest, final File tempNarFile, final boolean async) throws IOException { + final NarManifest manifest = getNarManifest(tempNarFile); + final BundleCoordinate coordinate = manifest.getCoordinate(); + + final Bundle existingBundle = extensionManager.getBundle(coordinate); + if (existingBundle != null && !persistenceProvider.exists(coordinate)) { + throw new IllegalStateException("A NAR is already registered with the same group, id, and version, " + + "and can not be replaced because it is not part of the NAR Manager"); + } + + final Set<Bundle> bundlesWithMatchingDependency = extensionManager.getDependentBundles(coordinate); + if (!bundlesWithMatchingDependency.isEmpty()) { + throw new IllegalStateException("Unable to replace NAR because it is a dependency of other NARs"); + } + + final NarPersistenceContext persistenceContext = NarPersistenceContext.builder() + .manifest(manifest) + .source(installRequest.getSource()) + .sourceIdentifier(installRequest.getSourceIdentifier()) + .clusterCoordinator(clusterCoordinator != null && clusterCoordinator.isActiveClusterCoordinator()) + .build(); + + final NarPersistenceInfo narPersistenceInfo = persistenceProvider.saveNar(persistenceContext, tempNarFile); + + final File narFile = narPersistenceInfo.getNarFile(); + final String identifier = createIdentifier(coordinate); + final String narDigest = computeNarDigest(narFile); + + final NarNode narNode = NarNode.builder() + .identifier(identifier) + .narFile(narFile) + .narFileHexDigest(narDigest) + .manifest(manifest) + .source(installRequest.getSource()) + .sourceIdentifier(installRequest.getSourceIdentifier()) + .state(NarState.WAITING_TO_INSTALL) + .build(); + narNodesById.put(identifier, narNode); + + final NarInstallTask installTask = createInstallTask(narNode); + if (async) { + LOGGER.info("Submitting install task for NAR with id [{}] and coordinate [{}]", identifier, coordinate); + final Future<?> installTaskFuture = installExecutorService.submit(installTask); + installFuturesById.put(identifier, installTaskFuture); + } else { + LOGGER.info("Synchronously installing NAR with id [{}] and coordinate [{}]", identifier, coordinate); + installTask.run(); + } + return narNode; + } + + @Override + public void completeInstall(final String identifier) { + LOGGER.info("Completed install for NAR [{}]", identifier); + installFuturesById.remove(identifier); + } + + @Override + public synchronized void updateState(final BundleCoordinate coordinate, final NarState narState) { + final NarNode narNode = narNodesById.values().stream() + .filter(n -> n.getManifest().getCoordinate().equals(coordinate)) + .findFirst() + .orElseThrow(() -> new NarNotFoundException(coordinate)); + narNode.setState(narState); + } + + @Override + public Collection<NarNode> getNars() { + return new ArrayList<>(narNodesById.values()); + } + + @Override + public Optional<NarNode> getNar(final String identifier) { + final NarNode narNode = narNodesById.get(identifier); + return Optional.ofNullable(narNode); + } + + @Override + public synchronized void verifyDeleteNar(final String identifier, final boolean forceDelete) { + final NarNode narNode = getNarNodeOrThrowNotFound(identifier); + + // Always allow deletion of a NAR that is not fully installed + if (narNode.getState() != NarState.INSTALLED) { + return; + } + + final BundleCoordinate coordinate = narNode.getManifest().getCoordinate(); + final Set<Bundle> bundlesWithMatchingDependency = extensionManager.getDependentBundles(coordinate); + if (!bundlesWithMatchingDependency.isEmpty()) { + throw new IllegalStateException("Unable to delete NAR because it is a dependency of other NARs"); + } + + if (!forceDelete && narComponentManager.componentsExist(coordinate)) { + throw new IllegalStateException("Unable to delete NAR because components are instantiated from this NAR"); + } + } + + @Override + public synchronized NarNode deleteNar(final String identifier) throws IOException { + final NarNode narNode = getNarNodeOrThrowNotFound(identifier); + final BundleCoordinate coordinate = narNode.getManifest().getCoordinate(); + LOGGER.info("Deleting NAR with id [{}] and coordinate [{}]", identifier, coordinate); + + final Future<?> installTask = installFuturesById.remove(identifier); + if (installTask != null) { + installTask.cancel(true); + } + + final Bundle existingBundle = extensionManager.getBundle(coordinate); + if (existingBundle != null) { + narLoader.unload(existingBundle); + } + + deleteExecutorService.submit(() -> { + LOGGER.info("Unloading components for deleting NAR with id [{}] and coordinate [{}]", identifier, coordinate); + final StandardStoppedComponents stoppedComponents = new StandardStoppedComponents(controllerServiceProvider); + narComponentManager.unloadComponents(coordinate, stoppedComponents); + LOGGER.info("Completed unloading components for deleting NAR with id [{}] and coordinate [{}]", identifier, coordinate); + }); + + persistenceProvider.deleteNar(coordinate); + narNodesById.remove(identifier); + + return narNode; + } + + @Override + public synchronized InputStream readNar(final String identifier) { + final NarNode narNode = getNarNodeOrThrowNotFound(identifier); + final BundleCoordinate coordinate = narNode.getManifest().getCoordinate(); + try { + return persistenceProvider.readNar(coordinate); + } catch (final FileNotFoundException e) { + throw new NarNotFoundException(coordinate); + } + } + + @Override + public synchronized void syncWithClusterCoordinator() { + if (clusterCoordinator == null) { + LOGGER.info("Cluster coordinator is null, will not sync NARs"); + return; + } + + // This sync method is called from the method that loads the flow from a connection response, which means there must already be a cluster coordinator to + // have gotten a response from, but during testing there were cases where calling clusterCoordinator.getElectedActiveCoordinatorNode() was still null, so + // the helper method here will keep checking for the identifier up to a certain threshold to avoid slight timing issues + final NodeIdentifier coordinatorNodeId = getElectedActiveCoordinatorNode(); + if (coordinatorNodeId == null) { + LOGGER.warn("Unable to obtain the node identifier for the cluster coordinator, will not sync NARs"); + return; + } + + LOGGER.info("Determined cluster coordinator is at {}", coordinatorNodeId); + if (clusterCoordinator.isActiveClusterCoordinator()) { + LOGGER.info("Current node is the cluster coordinator, will not sync NARs"); + return; + } + + LOGGER.info("Synchronizing NARs with cluster coordinator"); + final String coordinatorAddress = coordinatorNodeId.getApiAddress(); + final int coordinatorPort = coordinatorNodeId.getApiPort(); + final NarRestApiClient narRestApiClient = new NarRestApiClient(webClientService, coordinatorAddress, coordinatorPort, sslContext != null); + + final int localNarCountBeforeSync = narNodesById.size(); + try { + // This node may try to retrieve the summaries from the coordinator while the coordinator still hasn't finished initializing its flow controller and + // the response will be a 409, so the helper method here will catch any retryable exceptions and retry the request up to a configured threshold + final NarSummariesEntity narSummaries = getNarSummariesFromCoordinator(narRestApiClient); + if (narSummaries == null) { + LOGGER.error("Unable to retrieve listing of NARs from cluster coordinator within the maximum amount of time, will not sync NARs"); + return; + } + + LOGGER.info("Cluster coordinator returned {} NAR summaries", narSummaries.getNarSummaries().size()); + + for (final NarSummaryEntity narSummaryEntity : narSummaries.getNarSummaries()) { + final NarSummaryDTO narSummaryDTO = narSummaryEntity.getNarSummary(); + final String coordinatorNarId = narSummaryDTO.getIdentifier(); + final String coordinatorNarDigest = narSummaryDTO.getDigest(); + final NarNode matchingNar = narNodesById.get(coordinatorNarId); + if (matchingNar == null) { + LOGGER.info("Coordinator has NAR [{}] which does not exist locally, will download", coordinatorNarId); + downloadNar(narRestApiClient, narSummaryDTO); + } else if (!coordinatorNarDigest.equals(matchingNar.getNarFileHexDigest())) { + LOGGER.info("Coordinator has NAR [{}] which exists locally with a different digest, will download", coordinatorNarId); + downloadNar(narRestApiClient, narSummaryDTO); + } else { + LOGGER.info("Coordinator has NAR [{}] which exists locally with a matching digest, will not download", coordinatorNarId); + } + } + } catch (final Exception e) { + // if the current node has existing NARs and the sync fails then we throw an exception to fail start up, otherwise we don't know if the digests of the local NARs + // match with the coordinator which could result in joining the cluster and running slightly different code on one node + // if the current node has no existing NARs then we can let the node proceed and attempt to join the cluster because maybe the cluster coordinator had no NARs anyway, + // and if it did then flow synchronization will fail after this because the local flow will have ghosted components that are not ghosted in the cluster + if (localNarCountBeforeSync > 0) { + throw new RuntimeException("Failed to sync NARs from cluster coordinator", e); + } else { + LOGGER.error("Failed to sync NARs from cluster coordinator, no NARs exist locally, will proceed", e); + } + } + } + + private void downloadNar(final NarRestApiClient narRestApiClient, final NarSummaryDTO narSummary) throws IOException { + try (final InputStream coordinatorNarInputStream = narRestApiClient.downloadNar(narSummary.getIdentifier())) { + final NarInstallRequest installRequest = NarInstallRequest.builder() + .source(NarSource.valueOf(narSummary.getSourceType())) + .sourceIdentifier(narSummary.getSourceIdentifier()) + .inputStream(coordinatorNarInputStream) + .build(); + installNar(installRequest, false); + } + } + + private NarSummariesEntity getNarSummariesFromCoordinator(final NarRestApiClient narRestApiClient) { + final Instant waitUntilInstant = Instant.ofEpochMilli(System.currentTimeMillis() + MAX_WAIT_TIME_FOR_NARS.toMillis()); + while (System.currentTimeMillis() < waitUntilInstant.toEpochMilli()) { + final NarSummariesEntity narSummaries = listNarSummaries(narRestApiClient); + if (narSummaries != null) { + return narSummaries; + } + LOGGER.info("Unable to retrieve NAR summaries from cluster coordinator, will retry until [{}]", waitUntilInstant); + sleep(Duration.ofSeconds(5)); + } + return null; + } + + private NarSummariesEntity listNarSummaries(final NarRestApiClient narRestApiClient) { + try { + return narRestApiClient.listNarSummaries(); + } catch (final NarRestApiRetryableException e) { + LOGGER.warn("[{}], will retry", e.getMessage()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("", e); + } Review Comment: The reason I did this was because it is normal that the listing will need to be retied a couple of times while possibly waiting for the coordinator to be ready to serve requests, so printing a huge stack trace each time makes it look like a major problem when it most likely isn't. I'll probably remove this since it is likely not needed. ########## nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/StandardNarManager.java: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.nar; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.NarSummaryDTO; +import org.apache.nifi.web.api.entity.NarSummariesEntity; +import org.apache.nifi.web.api.entity.NarSummaryEntity; +import org.apache.nifi.web.client.api.WebClientService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HexFormat; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class StandardNarManager implements NarManager, InitializingBean, DisposableBean { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardNarManager.class); + + private static final Duration MAX_WAIT_TIME_FOR_CLUSTER_COORDINATOR = Duration.ofSeconds(60); + private static final Duration MAX_WAIT_TIME_FOR_NARS = Duration.ofMinutes(5); + + private final ClusterCoordinator clusterCoordinator; + private final ExtensionManager extensionManager; + private final ControllerServiceProvider controllerServiceProvider; + private final NarPersistenceProvider persistenceProvider; + private final NarComponentManager narComponentManager; + private final NarLoader narLoader; + private final WebClientService webClientService; + private final SSLContext sslContext; + + private final Map<String, NarNode> narNodesById = new ConcurrentHashMap<>(); + private final Map<String, Future<?>> installFuturesById = new ConcurrentHashMap<>(); + private final ExecutorService installExecutorService; + private final ExecutorService deleteExecutorService; + + public StandardNarManager(final FlowController flowController, + final ClusterCoordinator clusterCoordinator, + final NarComponentManager narComponentManager, + final NarLoader narLoader, + final WebClientService webClientService, + final SSLContext sslContext) { + this.clusterCoordinator = clusterCoordinator; + this.extensionManager = flowController.getExtensionManager(); + this.controllerServiceProvider = flowController.getControllerServiceProvider(); + this.persistenceProvider = flowController.getNarPersistenceProvider(); + this.narComponentManager = narComponentManager; + this.narLoader = narLoader; + this.webClientService = webClientService; + this.sslContext = sslContext; + this.installExecutorService = Executors.newSingleThreadExecutor(); + this.deleteExecutorService = Executors.newSingleThreadExecutor(); + } + + // This serves two purposes... + // 1. Any previously stored NARs need to have their extensions loaded and made available for use during start up since they won't be in any of the standard NAR directories + // 2. NarLoader keeps track of NARs that were missing dependencies to consider them on future loads, so this restores state that may have been lost on a restart + @Override + public void afterPropertiesSet() throws IOException { + final Collection<NarPersistenceInfo> narInfos = loadExistingNars(); + restoreState(narInfos); + } + + private Collection<NarPersistenceInfo> loadExistingNars() throws IOException { + final Collection<NarPersistenceInfo> narInfos = persistenceProvider.getAllNarInfo(); + LOGGER.info("Initializing NAR Manager, loading {} previously stored NARs", narInfos.size()); + + final Collection<File> narFiles = narInfos.stream() + .map(NarPersistenceInfo::getNarFile) + .collect(Collectors.toSet()); + narLoader.load(narFiles); + return narInfos; + } + + private void restoreState(final Collection<NarPersistenceInfo> narInfos) { + for (final NarPersistenceInfo narInfo : narInfos) { + try { + final NarNode narNode = restoreNarNode(narInfo); + narNodesById.put(narNode.getIdentifier(), narNode); + LOGGER.debug("Restored NAR [{}] with state [{}] and identifier [{}]", + narNode.getManifest().getCoordinate(), narNode.getState(), narNode.getIdentifier()); + } catch (final Exception e) { + LOGGER.warn("Failed to restore NAR for [{}]", narInfo.getNarFile().getAbsolutePath(), e); + } + } + } + + private NarNode restoreNarNode(final NarPersistenceInfo narInfo) throws IOException { + final File narFile = narInfo.getNarFile(); + final NarManifest manifest = NarManifest.fromNarFile(narFile); + final BundleCoordinate coordinate = manifest.getCoordinate(); + final String identifier = createIdentifier(coordinate); + final NarState state = determineNarState(manifest); + final String narDigest = computeNarDigest(narFile); + + return NarNode.builder() + .identifier(identifier) + .narFile(narFile) + .narFileHexDigest(narDigest) + .manifest(manifest) + .source(NarSource.valueOf(narInfo.getNarProperties().getSourceType())) + .sourceIdentifier(narInfo.getNarProperties().getSourceId()) + .state(state) + .build(); + } + + @Override + public void destroy() { + shutdownExecutor(installExecutorService, "Forcing shutdown of NAR Manager Upload Executor Service"); + shutdownExecutor(deleteExecutorService, "Forcing shutdown of NAR Manager Delete Executor Service"); + } + + private void shutdownExecutor(final ExecutorService executorService, final String interruptedMessage) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5000, MILLISECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException ignore) { + LOGGER.info(interruptedMessage); + executorService.shutdownNow(); + } + } + + @Override + public NarNode installNar(final NarInstallRequest installRequest) throws IOException { + return installNar(installRequest, true); + } + + private NarNode installNar(final NarInstallRequest installRequest, final boolean async) throws IOException { + final InputStream inputStream = installRequest.getInputStream(); + final File tempNarFile = persistenceProvider.createTempFile(inputStream); + try { + return installNar(installRequest, tempNarFile, async); + } finally { + if (tempNarFile.exists() && !tempNarFile.delete()) { + LOGGER.warn("Failed to delete temp NAR file at [{}], file must be cleaned up manually", tempNarFile.getAbsolutePath()); + } + } + } + + // The outer install method is not synchronized since copying the stream to the temp file make take a long time, so we + // synchronize here after already having the temp file to ensure only one request is checked and submitted for installing + private synchronized NarNode installNar(final NarInstallRequest installRequest, final File tempNarFile, final boolean async) throws IOException { + final NarManifest manifest = getNarManifest(tempNarFile); + final BundleCoordinate coordinate = manifest.getCoordinate(); + + final Bundle existingBundle = extensionManager.getBundle(coordinate); + if (existingBundle != null && !persistenceProvider.exists(coordinate)) { + throw new IllegalStateException("A NAR is already registered with the same group, id, and version, " + + "and can not be replaced because it is not part of the NAR Manager"); + } + + final Set<Bundle> bundlesWithMatchingDependency = extensionManager.getDependentBundles(coordinate); + if (!bundlesWithMatchingDependency.isEmpty()) { + throw new IllegalStateException("Unable to replace NAR because it is a dependency of other NARs"); + } + + final NarPersistenceContext persistenceContext = NarPersistenceContext.builder() + .manifest(manifest) + .source(installRequest.getSource()) + .sourceIdentifier(installRequest.getSourceIdentifier()) + .clusterCoordinator(clusterCoordinator != null && clusterCoordinator.isActiveClusterCoordinator()) + .build(); + + final NarPersistenceInfo narPersistenceInfo = persistenceProvider.saveNar(persistenceContext, tempNarFile); + + final File narFile = narPersistenceInfo.getNarFile(); + final String identifier = createIdentifier(coordinate); + final String narDigest = computeNarDigest(narFile); + + final NarNode narNode = NarNode.builder() + .identifier(identifier) + .narFile(narFile) + .narFileHexDigest(narDigest) + .manifest(manifest) + .source(installRequest.getSource()) + .sourceIdentifier(installRequest.getSourceIdentifier()) + .state(NarState.WAITING_TO_INSTALL) + .build(); + narNodesById.put(identifier, narNode); + + final NarInstallTask installTask = createInstallTask(narNode); + if (async) { + LOGGER.info("Submitting install task for NAR with id [{}] and coordinate [{}]", identifier, coordinate); + final Future<?> installTaskFuture = installExecutorService.submit(installTask); + installFuturesById.put(identifier, installTaskFuture); + } else { + LOGGER.info("Synchronously installing NAR with id [{}] and coordinate [{}]", identifier, coordinate); + installTask.run(); + } + return narNode; + } + + @Override + public void completeInstall(final String identifier) { + LOGGER.info("Completed install for NAR [{}]", identifier); + installFuturesById.remove(identifier); + } + + @Override + public synchronized void updateState(final BundleCoordinate coordinate, final NarState narState) { + final NarNode narNode = narNodesById.values().stream() + .filter(n -> n.getManifest().getCoordinate().equals(coordinate)) + .findFirst() + .orElseThrow(() -> new NarNotFoundException(coordinate)); + narNode.setState(narState); + } + + @Override + public Collection<NarNode> getNars() { + return new ArrayList<>(narNodesById.values()); + } + + @Override + public Optional<NarNode> getNar(final String identifier) { + final NarNode narNode = narNodesById.get(identifier); + return Optional.ofNullable(narNode); + } + + @Override + public synchronized void verifyDeleteNar(final String identifier, final boolean forceDelete) { + final NarNode narNode = getNarNodeOrThrowNotFound(identifier); + + // Always allow deletion of a NAR that is not fully installed + if (narNode.getState() != NarState.INSTALLED) { + return; + } + + final BundleCoordinate coordinate = narNode.getManifest().getCoordinate(); + final Set<Bundle> bundlesWithMatchingDependency = extensionManager.getDependentBundles(coordinate); + if (!bundlesWithMatchingDependency.isEmpty()) { + throw new IllegalStateException("Unable to delete NAR because it is a dependency of other NARs"); + } + + if (!forceDelete && narComponentManager.componentsExist(coordinate)) { + throw new IllegalStateException("Unable to delete NAR because components are instantiated from this NAR"); + } + } + + @Override + public synchronized NarNode deleteNar(final String identifier) throws IOException { + final NarNode narNode = getNarNodeOrThrowNotFound(identifier); + final BundleCoordinate coordinate = narNode.getManifest().getCoordinate(); + LOGGER.info("Deleting NAR with id [{}] and coordinate [{}]", identifier, coordinate); + + final Future<?> installTask = installFuturesById.remove(identifier); + if (installTask != null) { + installTask.cancel(true); + } + + final Bundle existingBundle = extensionManager.getBundle(coordinate); + if (existingBundle != null) { + narLoader.unload(existingBundle); + } + + deleteExecutorService.submit(() -> { + LOGGER.info("Unloading components for deleting NAR with id [{}] and coordinate [{}]", identifier, coordinate); + final StandardStoppedComponents stoppedComponents = new StandardStoppedComponents(controllerServiceProvider); + narComponentManager.unloadComponents(coordinate, stoppedComponents); + LOGGER.info("Completed unloading components for deleting NAR with id [{}] and coordinate [{}]", identifier, coordinate); + }); + + persistenceProvider.deleteNar(coordinate); + narNodesById.remove(identifier); + + return narNode; + } + + @Override + public synchronized InputStream readNar(final String identifier) { + final NarNode narNode = getNarNodeOrThrowNotFound(identifier); + final BundleCoordinate coordinate = narNode.getManifest().getCoordinate(); + try { + return persistenceProvider.readNar(coordinate); + } catch (final FileNotFoundException e) { + throw new NarNotFoundException(coordinate); + } + } + + @Override + public synchronized void syncWithClusterCoordinator() { + if (clusterCoordinator == null) { + LOGGER.info("Cluster coordinator is null, will not sync NARs"); + return; + } + + // This sync method is called from the method that loads the flow from a connection response, which means there must already be a cluster coordinator to + // have gotten a response from, but during testing there were cases where calling clusterCoordinator.getElectedActiveCoordinatorNode() was still null, so + // the helper method here will keep checking for the identifier up to a certain threshold to avoid slight timing issues + final NodeIdentifier coordinatorNodeId = getElectedActiveCoordinatorNode(); + if (coordinatorNodeId == null) { + LOGGER.warn("Unable to obtain the node identifier for the cluster coordinator, will not sync NARs"); + return; + } + + LOGGER.info("Determined cluster coordinator is at {}", coordinatorNodeId); + if (clusterCoordinator.isActiveClusterCoordinator()) { + LOGGER.info("Current node is the cluster coordinator, will not sync NARs"); + return; + } + + LOGGER.info("Synchronizing NARs with cluster coordinator"); + final String coordinatorAddress = coordinatorNodeId.getApiAddress(); + final int coordinatorPort = coordinatorNodeId.getApiPort(); + final NarRestApiClient narRestApiClient = new NarRestApiClient(webClientService, coordinatorAddress, coordinatorPort, sslContext != null); + + final int localNarCountBeforeSync = narNodesById.size(); + try { + // This node may try to retrieve the summaries from the coordinator while the coordinator still hasn't finished initializing its flow controller and + // the response will be a 409, so the helper method here will catch any retryable exceptions and retry the request up to a configured threshold + final NarSummariesEntity narSummaries = getNarSummariesFromCoordinator(narRestApiClient); + if (narSummaries == null) { + LOGGER.error("Unable to retrieve listing of NARs from cluster coordinator within the maximum amount of time, will not sync NARs"); + return; + } + + LOGGER.info("Cluster coordinator returned {} NAR summaries", narSummaries.getNarSummaries().size()); + + for (final NarSummaryEntity narSummaryEntity : narSummaries.getNarSummaries()) { + final NarSummaryDTO narSummaryDTO = narSummaryEntity.getNarSummary(); + final String coordinatorNarId = narSummaryDTO.getIdentifier(); + final String coordinatorNarDigest = narSummaryDTO.getDigest(); + final NarNode matchingNar = narNodesById.get(coordinatorNarId); + if (matchingNar == null) { + LOGGER.info("Coordinator has NAR [{}] which does not exist locally, will download", coordinatorNarId); + downloadNar(narRestApiClient, narSummaryDTO); + } else if (!coordinatorNarDigest.equals(matchingNar.getNarFileHexDigest())) { + LOGGER.info("Coordinator has NAR [{}] which exists locally with a different digest, will download", coordinatorNarId); + downloadNar(narRestApiClient, narSummaryDTO); + } else { + LOGGER.info("Coordinator has NAR [{}] which exists locally with a matching digest, will not download", coordinatorNarId); + } + } + } catch (final Exception e) { + // if the current node has existing NARs and the sync fails then we throw an exception to fail start up, otherwise we don't know if the digests of the local NARs + // match with the coordinator which could result in joining the cluster and running slightly different code on one node + // if the current node has no existing NARs then we can let the node proceed and attempt to join the cluster because maybe the cluster coordinator had no NARs anyway, + // and if it did then flow synchronization will fail after this because the local flow will have ghosted components that are not ghosted in the cluster + if (localNarCountBeforeSync > 0) { + throw new RuntimeException("Failed to sync NARs from cluster coordinator", e); + } else { + LOGGER.error("Failed to sync NARs from cluster coordinator, no NARs exist locally, will proceed", e); + } + } + } + + private void downloadNar(final NarRestApiClient narRestApiClient, final NarSummaryDTO narSummary) throws IOException { + try (final InputStream coordinatorNarInputStream = narRestApiClient.downloadNar(narSummary.getIdentifier())) { + final NarInstallRequest installRequest = NarInstallRequest.builder() + .source(NarSource.valueOf(narSummary.getSourceType())) + .sourceIdentifier(narSummary.getSourceIdentifier()) + .inputStream(coordinatorNarInputStream) + .build(); + installNar(installRequest, false); + } + } + + private NarSummariesEntity getNarSummariesFromCoordinator(final NarRestApiClient narRestApiClient) { + final Instant waitUntilInstant = Instant.ofEpochMilli(System.currentTimeMillis() + MAX_WAIT_TIME_FOR_NARS.toMillis()); + while (System.currentTimeMillis() < waitUntilInstant.toEpochMilli()) { + final NarSummariesEntity narSummaries = listNarSummaries(narRestApiClient); + if (narSummaries != null) { + return narSummaries; + } + LOGGER.info("Unable to retrieve NAR summaries from cluster coordinator, will retry until [{}]", waitUntilInstant); + sleep(Duration.ofSeconds(5)); + } + return null; + } + + private NarSummariesEntity listNarSummaries(final NarRestApiClient narRestApiClient) { + try { + return narRestApiClient.listNarSummaries(); + } catch (final NarRestApiRetryableException e) { + LOGGER.warn("[{}], will retry", e.getMessage()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("", e); + } Review Comment: The reason I did this was because it is normal that the listing will need to be retried a couple of times while possibly waiting for the coordinator to be ready to serve requests, so printing a huge stack trace each time makes it look like a major problem when it most likely isn't. I'll probably remove this since it is likely not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
