This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 167d1895ae0fdefa282b79b7e69f85c26704172f Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Sep 22 10:13:54 2022 +0200 camel-azure-key-vault - Add dev console for secrets --- .../camel-azure/camel-azure-key-vault/pom.xml | 4 + .../key/vault/EventhubsReloadTriggerTask.java | 376 +++++++++++---------- 2 files changed, 201 insertions(+), 179 deletions(-) diff --git a/components/camel-azure/camel-azure-key-vault/pom.xml b/components/camel-azure/camel-azure-key-vault/pom.xml index 5f02311ad27..cb5d0aa85ad 100644 --- a/components/camel-azure/camel-azure-key-vault/pom.xml +++ b/components/camel-azure/camel-azure-key-vault/pom.xml @@ -40,6 +40,10 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-support</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-console</artifactId> + </dependency> <!-- azure sdk --> <dependency> diff --git a/components/camel-azure/camel-azure-key-vault/src/main/java/org/apache/camel/component/azure/key/vault/EventhubsReloadTriggerTask.java b/components/camel-azure/camel-azure-key-vault/src/main/java/org/apache/camel/component/azure/key/vault/EventhubsReloadTriggerTask.java index 0c8949ba805..fee5f1170e7 100644 --- a/components/camel-azure/camel-azure-key-vault/src/main/java/org/apache/camel/component/azure/key/vault/EventhubsReloadTriggerTask.java +++ b/components/camel-azure/camel-azure-key-vault/src/main/java/org/apache/camel/component/azure/key/vault/EventhubsReloadTriggerTask.java @@ -52,187 +52,205 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Period task which checks if Azure Key Vaults secrets has been updated and can trigger Camel to be reloaded. + * Period task which checks if Azure Key Vaults secrets has been updated and can + * trigger Camel to be reloaded. */ @PeriodicTask("azure-secret-refresh") public class EventhubsReloadTriggerTask extends ServiceSupport implements CamelContextAware, Runnable { - private static final Logger LOG = LoggerFactory.getLogger(EventhubsReloadTriggerTask.class); - - private static final String BLOB_SERVICE_URI_SEGMENT = ".blob.core.windows.net"; - private static final String SECRET_VERSION_ADD = "Microsoft.KeyVault.SecretNewVersionCreated"; - - private CamelContext camelContext; - private boolean reloadEnabled = true; - private String secrets; - private EventProcessorClient eventProcessorClient; - private KeyVaultPropertiesFunction propertiesFunction; - private final Map<String, Instant> updates = new HashMap<>(); - - public EventhubsReloadTriggerTask() { - } - - @Override - public CamelContext getCamelContext() { - return camelContext; - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - public boolean isReloadEnabled() { - return reloadEnabled; - } - - /** - * Whether Camel should be reloaded on Azure secret updated - */ - public void setReloadEnabled(boolean reloadEnabled) { - this.reloadEnabled = reloadEnabled; - } - - /** - * A map of the updated secrets with the latest updated time. - */ - public Map<String, Instant> getUpdates() { - return Collections.unmodifiableMap(updates); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - - // auto-detect secrets in-use - PropertiesComponent pc = camelContext.getPropertiesComponent(); - PropertiesFunction pf = pc.getPropertiesFunction("azure"); - if (pf instanceof KeyVaultPropertiesFunction) { - propertiesFunction = (KeyVaultPropertiesFunction) pf; - LOG.debug("Auto-detecting secrets from properties-function: {}", pf.getName()); - } - // specific secrets - secrets = camelContext.getVaultConfiguration().azure().getSecrets(); - if (ObjectHelper.isEmpty(secrets) && propertiesFunction == null) { - throw new IllegalArgumentException("Secrets must be configured on Azure vault configuration"); - } - - String eventhubConnectionString = null; - String blobAccessKey = null; - String blobAccountName = null; - String blobContainerName = null; - AzureVaultConfiguration azureVaultConfiguration = getCamelContext().getVaultConfiguration().azure(); - if (ObjectHelper.isNotEmpty(azureVaultConfiguration)) { - eventhubConnectionString = azureVaultConfiguration.getEventhubConnectionString(); - blobAccessKey = azureVaultConfiguration.getBlobAccessKey(); - blobAccountName = azureVaultConfiguration.getBlobAccountName(); - blobContainerName = azureVaultConfiguration.getBlobContainerName(); - } - if (ObjectHelper.isNotEmpty(eventhubConnectionString) && ObjectHelper.isNotEmpty(blobAccessKey) - && ObjectHelper.isNotEmpty(blobAccountName) && ObjectHelper.isNotEmpty(blobContainerName)) { - BlobContainerAsyncClient c = new BlobContainerClientBuilder() - .endpoint(String.format(Locale.ROOT, "https://%s" + BLOB_SERVICE_URI_SEGMENT, blobAccountName)) - .containerName(blobContainerName) - .credential(new StorageSharedKeyCredential(blobAccountName, blobAccessKey)) - .buildAsyncClient(); - - EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() - .checkpointStore(new BlobCheckpointStore(c)) - .consumerGroup("$Default") - .connectionString(eventhubConnectionString) - .processEvent(this::onEventListener) - .processError(this::onErrorListener) - .transportType(AmqpTransportType.AMQP); - - eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); - eventProcessorClient.start(); - } else { - throw new RuntimeCamelException( - "Using the Azure Key Vault Secret refresh task requires setting eventhubs connection String, Blob Account Name, Blob Access Key and Blob Container Name as application properties "); - } - } - - @Override - protected void doShutdown() throws Exception { - super.doShutdown(); - - if (eventProcessorClient != null) { - try { - eventProcessorClient.stop(); - } catch (Exception e) { - // ignore - } - eventProcessorClient = null; - } - - updates.clear(); - } - - @Override - public void run() { - } - - protected boolean matchSecret(String name) { - Set<String> set = new HashSet<>(); - if (secrets != null) { - Collections.addAll(set, secrets.split(",")); - } - if (propertiesFunction != null) { - set.addAll(propertiesFunction.getSecrets()); - } - - for (String part : set) { - boolean result = name.contains(part) || PatternHelper.matchPattern(name, part); - LOG.trace("Matching secret id: {}={} -> {}", name, part, result); - if (result) { - return true; - } - } - - return false; - } - - @Override - public String toString() { - return "Azure Secrets Refresh Task"; - } - - protected void onEventListener(final EventContext eventContext) { - boolean triggerReloading = false; - - ObjectMapper mapper = new ObjectMapper(); - JsonNode actualObj = null; - try { - actualObj = mapper.readTree(eventContext.getEventData().getBodyAsString()); - } catch (JsonMappingException e) { - e.printStackTrace(); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - for (int i = 0; i < actualObj.size(); i++) { - String secret = actualObj.get(i).get("subject").textValue(); - String eventType = actualObj.get(i).get("eventType").textValue(); - if (ObjectHelper.isNotEmpty(secret) && ObjectHelper.isNotEmpty(eventType)) { - if (eventType.equalsIgnoreCase(SECRET_VERSION_ADD)) { - if (matchSecret(secret)) { - if (isReloadEnabled()) { - LOG.info("Update for Azure secret: {} detected, triggering CamelContext reload", secret); - triggerReloading = true; - } - } - } - } - } - if (triggerReloading) { - ContextReloadStrategy reload = camelContext.hasService(ContextReloadStrategy.class); - if (reload != null) { - // trigger reload - reload.onReload(this); - } - } - } - - public void onErrorListener(final ErrorContext errorContext) { - //NOOP - } + private static final Logger LOG = LoggerFactory.getLogger(EventhubsReloadTriggerTask.class); + + private static final String BLOB_SERVICE_URI_SEGMENT = ".blob.core.windows.net"; + private static final String SECRET_VERSION_ADD = "Microsoft.KeyVault.SecretNewVersionCreated"; + + private CamelContext camelContext; + private boolean reloadEnabled = true; + private String secrets; + private EventProcessorClient eventProcessorClient; + private KeyVaultPropertiesFunction propertiesFunction; + private volatile Instant lastCheckTime; + private volatile Instant lastReloadTime; + private final Map<String, Instant> updates = new HashMap<>(); + + public EventhubsReloadTriggerTask() { + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public boolean isReloadEnabled() { + return reloadEnabled; + } + + /** + * Whether Camel should be reloaded on Azure secret updated + */ + public void setReloadEnabled(boolean reloadEnabled) { + this.reloadEnabled = reloadEnabled; + } + + /** + * A map of the updated secrets with the latest updated time. + */ + public Map<String, Instant> getUpdates() { + return Collections.unmodifiableMap(updates); + } + + /** + * Last time this task checked GCP for updated secrets. + */ + public Instant getLastCheckTime() { + return lastCheckTime; + } + + /** + * Last time GCP secrets update triggered reload. + */ + public Instant getLastReloadTime() { + return lastReloadTime; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + // auto-detect secrets in-use + PropertiesComponent pc = camelContext.getPropertiesComponent(); + PropertiesFunction pf = pc.getPropertiesFunction("azure"); + if (pf instanceof KeyVaultPropertiesFunction) { + propertiesFunction = (KeyVaultPropertiesFunction) pf; + LOG.debug("Auto-detecting secrets from properties-function: {}", pf.getName()); + } + // specific secrets + secrets = camelContext.getVaultConfiguration().azure().getSecrets(); + if (ObjectHelper.isEmpty(secrets) && propertiesFunction == null) { + throw new IllegalArgumentException("Secrets must be configured on Azure vault configuration"); + } + + String eventhubConnectionString = null; + String blobAccessKey = null; + String blobAccountName = null; + String blobContainerName = null; + AzureVaultConfiguration azureVaultConfiguration = getCamelContext().getVaultConfiguration().azure(); + if (ObjectHelper.isNotEmpty(azureVaultConfiguration)) { + eventhubConnectionString = azureVaultConfiguration.getEventhubConnectionString(); + blobAccessKey = azureVaultConfiguration.getBlobAccessKey(); + blobAccountName = azureVaultConfiguration.getBlobAccountName(); + blobContainerName = azureVaultConfiguration.getBlobContainerName(); + } + if (ObjectHelper.isNotEmpty(eventhubConnectionString) && ObjectHelper.isNotEmpty(blobAccessKey) + && ObjectHelper.isNotEmpty(blobAccountName) && ObjectHelper.isNotEmpty(blobContainerName)) { + BlobContainerAsyncClient c = new BlobContainerClientBuilder() + .endpoint(String.format(Locale.ROOT, "https://%s" + BLOB_SERVICE_URI_SEGMENT, blobAccountName)) + .containerName(blobContainerName) + .credential(new StorageSharedKeyCredential(blobAccountName, blobAccessKey)).buildAsyncClient(); + + EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() + .checkpointStore(new BlobCheckpointStore(c)).consumerGroup("$Default") + .connectionString(eventhubConnectionString).processEvent(this::onEventListener) + .processError(this::onErrorListener).transportType(AmqpTransportType.AMQP); + + eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); + eventProcessorClient.start(); + } else { + throw new RuntimeCamelException( + "Using the Azure Key Vault Secret refresh task requires setting eventhubs connection String, Blob Account Name, Blob Access Key and Blob Container Name as application properties "); + } + } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + + if (eventProcessorClient != null) { + try { + eventProcessorClient.stop(); + } catch (Exception e) { + // ignore + } + eventProcessorClient = null; + } + + updates.clear(); + } + + @Override + public void run() { + lastCheckTime = Instant.now(); + } + + protected boolean matchSecret(String name) { + Set<String> set = new HashSet<>(); + if (secrets != null) { + Collections.addAll(set, secrets.split(",")); + } + if (propertiesFunction != null) { + set.addAll(propertiesFunction.getSecrets()); + } + + for (String part : set) { + boolean result = name.contains(part) || PatternHelper.matchPattern(name, part); + LOG.trace("Matching secret id: {}={} -> {}", name, part, result); + if (result) { + return true; + } + } + + return false; + } + + @Override + public String toString() { + return "Azure Secrets Refresh Task"; + } + + protected void onEventListener(final EventContext eventContext) { + boolean triggerReloading = false; + + ObjectMapper mapper = new ObjectMapper(); + JsonNode actualObj = null; + try { + actualObj = mapper.readTree(eventContext.getEventData().getBodyAsString()); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + for (int i = 0; i < actualObj.size(); i++) { + String secret = actualObj.get(i).get("subject").textValue(); + String eventType = actualObj.get(i).get("eventType").textValue(); + if (ObjectHelper.isNotEmpty(secret) && ObjectHelper.isNotEmpty(eventType)) { + if (eventType.equalsIgnoreCase(SECRET_VERSION_ADD)) { + if (matchSecret(secret)) { + if (ObjectHelper.isNotEmpty(eventContext.getEventData().getEnqueuedTime())) { + updates.put(secret, eventContext.getEventData().getEnqueuedTime()); + } + if (isReloadEnabled()) { + LOG.info("Update for Azure secret: {} detected, triggering CamelContext reload", secret); + triggerReloading = true; + } + } + } + } + } + if (triggerReloading) { + ContextReloadStrategy reload = camelContext.hasService(ContextReloadStrategy.class); + if (reload != null) { + // trigger reload + lastReloadTime = Instant.now(); + reload.onReload(this); + } + } + } + + public void onErrorListener(final ErrorContext errorContext) { + // NOOP + } }
