This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d4630195ce4 NIFI-15617 Added SEND Provenance Events to several
Processors (#10911)
d4630195ce4 is described below
commit d4630195ce43b7b1972e748e7c68495b266270d9
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Feb 19 16:09:56 2026 +0100
NIFI-15617 Added SEND Provenance Events to several Processors (#10911)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java | 3 +++
.../org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java | 3 ++-
.../apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java | 2 ++
.../processors/aws/kinesis/firehose/PutKinesisFirehose.java | 1 +
.../nifi/processors/aws/kinesis/stream/PutKinesisStream.java | 1 +
.../processors/azure/data/explorer/PutAzureDataExplorer.java | 1 +
.../azure/data/explorer/StandardKustoIngestService.java | 10 +++++++++-
.../nifi/services/azure/data/explorer/KustoIngestService.java | 7 +++++++
.../org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java | 1 +
.../main/java/org/apache/nifi/processors/AbstractIoTDB.java | 7 +++++++
.../main/java/org/apache/nifi/processors/PutIoTDBRecord.java | 1 +
.../org/apache/nifi/redis/processor/PutRedisHashRecord.java | 4 ++++
.../apache/nifi/redis/service/RedisConnectionPoolService.java | 8 ++++++++
.../main/java/org/apache/nifi/redis/RedisConnectionPool.java | 8 ++++++++
.../java/org/apache/nifi/snmp/processors/SendTrapSNMP.java | 3 +++
.../nifi/processors/snowflake/PutSnowflakeInternalStage.java | 1 +
.../java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java | 1 +
.../java/org/apache/nifi/processors/splunk/SplunkAPICall.java | 10 ++++++++++
18 files changed, 70 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
index 6e4fa8fd27d..428d7012579 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
@@ -336,6 +336,9 @@ public class PutCloudWatchMetric extends
AbstractAwsSyncProcessor<CloudWatchClie
.build();
putMetricData(context, metricDataRequest);
+ final String namespace =
context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue();
+ final String metricName =
context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ session.getProvenanceReporter().send(flowFile,
"cloudwatch://%s/%s".formatted(namespace, metricName));
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully published cloudwatch metric for
{}", flowFile);
} catch (final Exception e) {
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
index 47d0e899a32..389175eb04c 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
@@ -187,9 +187,10 @@ public class PutDynamoDB extends AbstractDynamoDBProcessor
{
}
}
- // Handle any remaining flowfiles
+ final String transitUri = "dynamodb://%s".formatted(table);
for (final FlowFile flowFile : keysToFlowFileMap.values()) {
getLogger().debug("Successful posted items to dynamodb : {}",
table);
+ session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (final AwsServiceException exception) {
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
index 0864dabcda3..d12ab495dc8 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
@@ -215,6 +215,8 @@ public class PutDynamoDBRecord extends
AbstractDynamoDBProcessor {
final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile,
attributes);
if (result.isSuccess()) {
+ final String table =
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+ session.getProvenanceReporter().send(outgoingFlowFile,
"dynamodb://%s".formatted(table));
session.transfer(outgoingFlowFile, REL_SUCCESS);
} else {
handleError(context, session, result, outgoingFlowFile);
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
index 2a7286ea1ac..9303fc77db9 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
@@ -170,6 +170,7 @@ public class PutKinesisFirehose extends
AbstractAwsSyncProcessor<FirehoseClient,
failedFlowFiles.add(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile,
attributes);
+ session.getProvenanceReporter().send(flowFile,
"firehose://%s".formatted(streamName));
successfulFlowFiles.add(flowFile);
}
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
index 56946064248..35900e0214d 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
@@ -195,6 +195,7 @@ public class PutKinesisStream extends
AbstractAwsSyncProcessor<KinesisClient, Ki
failedFlowFiles.add(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile,
attributes);
+ session.getProvenanceReporter().send(flowFile,
"kinesis://%s".formatted(streamName));
successfulFlowFiles.add(flowFile);
}
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java
index be3f40be921..3347c5f0226 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java
@@ -223,6 +223,7 @@ public class PutAzureDataExplorer extends AbstractProcessor
{
final KustoIngestionResult result = service.ingestData(request);
if (result == KustoIngestionResult.SUCCEEDED) {
getLogger().info("Ingest {} for {}", result.getStatus(),
flowFile);
+ session.getProvenanceReporter().send(flowFile,
"%s/%s/%s".formatted(service.getClusterUri(), databaseName, tableName));
transferRelationship = SUCCESS;
} else if (result == KustoIngestionResult.FAILED) {
getLogger().error("Ingest {} for {}", result.getStatus(),
flowFile);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
index c067bfaa648..d333a9f019b 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
@@ -116,6 +116,8 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
private static final Map<String, String> NIFI_SINK = Map.of("processor",
StandardKustoIngestService.class.getSimpleName());
+ private volatile String clusterUri;
+
private volatile QueuedIngestClient queuedIngestClient;
private volatile ManagedStreamingIngestClient managedStreamingIngestClient;
@@ -132,7 +134,7 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
final String applicationClientId =
context.getProperty(APPLICATION_CLIENT_ID).getValue();
final String applicationKey =
context.getProperty(APPLICATION_KEY).getValue();
final String applicationTenantId =
context.getProperty(APPLICATION_TENANT_ID).getValue();
- final String clusterUri = context.getProperty(CLUSTER_URI).getValue();
+ this.clusterUri = context.getProperty(CLUSTER_URI).getValue();
final KustoAuthenticationStrategy kustoAuthenticationStrategy =
KustoAuthenticationStrategy.valueOf(context.getProperty(AUTHENTICATION_STRATEGY).getValue());
this.queuedIngestClient = createKustoQueuedIngestClient(clusterUri,
applicationClientId, applicationKey, applicationTenantId,
kustoAuthenticationStrategy);
@@ -163,6 +165,12 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
if (this.executionClient != null) {
this.executionClient = null;
}
+ this.clusterUri = null;
+ }
+
+ @Override
+ public String getClusterUri() {
+ return clusterUri;
}
protected QueuedIngestClient createKustoQueuedIngestClient(final String
clusterUrl,
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java
index f9fe6becdce..c2f99fa9662 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java
@@ -46,4 +46,11 @@ public interface KustoIngestService extends
ControllerService {
* @return Table Readable status
*/
boolean isTableReadable(String databaseName, String table);
+
+ /**
+ * Returns the Azure Data Explorer cluster URI used for ingestion
+ *
+ * @return the cluster URI
+ */
+ String getClusterUri();
}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index 4448c2cf6a5..fef26de4306 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -356,6 +356,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
getLogger().info("Appended and committed all records
successfully.");
}
+ session.getProvenanceReporter().send(flowFile,
"bigquery://%s".formatted(parentTable));
session.transfer(flowFile, REL_SUCCESS);
}
}
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
index 90ec7420e83..f9bca1e261d 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java
@@ -145,6 +145,7 @@ public abstract class AbstractIoTDB extends
AbstractProcessor {
}
protected final AtomicReference<Session> session = new
AtomicReference<>(null);
+ private volatile String transitUri;
@Override
public Set<Relationship> getRelationships() {
@@ -159,6 +160,7 @@ public abstract class AbstractIoTDB extends
AbstractProcessor {
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
+ transitUri = "iotdb://%s:%d".formatted(host, port);
session.set(new Session.Builder()
.host(host)
.port(port)
@@ -179,6 +181,11 @@ public abstract class AbstractIoTDB extends
AbstractProcessor {
}
session.set(null);
}
+ transitUri = null;
+ }
+
+ protected String getTransitUri() {
+ return transitUri;
}
@Override
diff --git
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
index 515ee60ca7b..c24da59e2db 100755
---
a/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
+++
b/nifi-extension-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java
@@ -193,6 +193,7 @@ public class PutIoTDBRecord extends AbstractIoTDB {
processSession.transfer(flowFile, REL_FAILURE);
return;
}
+ processSession.getProvenanceReporter().send(flowFile, getTransitUri());
processSession.transfer(flowFile, REL_SUCCESS);
}
diff --git
a/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
b/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
index 3f07d8eb192..fe975a45557 100644
---
a/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
+++
b/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisHashRecord.java
@@ -145,15 +145,18 @@ public class PutRedisHashRecord extends AbstractProcessor
{
}
private volatile RedisConnectionPool redisConnectionPool;
+ private volatile String transitUri;
@OnScheduled
public void onScheduled(final ProcessContext context) {
this.redisConnectionPool =
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+ this.transitUri =
"redis://%s".formatted(redisConnectionPool.getConnectionString());
}
@OnStopped
public void onStopped() {
this.redisConnectionPool = null;
+ this.transitUri = null;
}
@Override
@@ -220,6 +223,7 @@ public class PutRedisHashRecord extends AbstractProcessor {
}
flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT,
String.valueOf(count));
+ session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
}
diff --git
a/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
b/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
index 133ccc31518..fb038c587b1 100644
---
a/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
+++
b/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
@@ -43,6 +43,7 @@ public class RedisConnectionPoolService extends
AbstractControllerService implem
private volatile PropertyContext context;
private volatile RedisType redisType;
+ private volatile String connectionString;
private volatile JedisConnectionFactory connectionFactory;
private volatile SSLContext sslContext;
@@ -66,6 +67,7 @@ public class RedisConnectionPoolService extends
AbstractControllerService implem
final String redisMode =
context.getProperty(RedisUtils.REDIS_MODE).getValue();
this.redisType = RedisType.fromDisplayName(redisMode);
+ this.connectionString =
context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
}
@OnDisabled
@@ -74,6 +76,7 @@ public class RedisConnectionPoolService extends
AbstractControllerService implem
connectionFactory.destroy();
connectionFactory = null;
redisType = null;
+ connectionString = null;
context = null;
sslContext = null;
}
@@ -84,6 +87,11 @@ public class RedisConnectionPoolService extends
AbstractControllerService implem
return redisType;
}
+ @Override
+ public String getConnectionString() {
+ return connectionString;
+ }
+
@Override
public RedisConnection getConnection() {
if (connectionFactory == null) {
diff --git
a/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java
b/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java
index f0e3e163b56..8548e8b7f6f 100644
---
a/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java
+++
b/nifi-extension-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java
@@ -41,4 +41,12 @@ public interface RedisConnectionPool extends
ControllerService {
*/
RedisType getRedisType();
+ /**
+ * Returns the connection string used to connect to Redis. The format
depends on the Redis mode:
+ * standalone (hostname:port), sentinel (comma-separated list of
sentinels), or clustered (comma-separated list of cluster masters).
+ *
+ * @return the Redis connection string
+ */
+ String getConnectionString();
+
}
diff --git
a/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/SendTrapSNMP.java
b/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/SendTrapSNMP.java
index 991a3965302..8e822b47bfc 100644
---
a/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/SendTrapSNMP.java
+++
b/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/SendTrapSNMP.java
@@ -157,6 +157,9 @@ public class SendTrapSNMP extends AbstractSNMPProcessor {
snmpHandler.sendTrap(attributes, v2TrapConfiguration, target);
}
flowFile = processSession.putAllAttributes(flowFile, attributes);
+ final String managerHost =
context.getProperty(SNMP_MANAGER_HOST).evaluateAttributeExpressions(flowFile).getValue();
+ final String managerPort =
context.getProperty(SNMP_MANAGER_PORT).evaluateAttributeExpressions(flowFile).getValue();
+ processSession.getProvenanceReporter().send(flowFile,
"snmp://%s:%s".formatted(managerHost, managerPort));
processSession.transfer(flowFile, REL_SUCCESS);
} catch (IOException e) {
getLogger().error("Failed to send request to the agent. Check if
the agent supports the used version.", e);
diff --git
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
index 1b140df720f..ddbf6fe2c59 100644
---
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
+++
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
@@ -164,6 +164,7 @@ public class PutSnowflakeInternalStage extends
AbstractProcessor {
}
flowFile = session.putAttribute(flowFile, ATTRIBUTE_STAGED_FILE_PATH,
stagedFileName);
+ session.getProvenanceReporter().send(flowFile,
"snowflake://stages/%s/%s".formatted(internalStageName, stagedFileName));
session.transfer(flowFile, REL_SUCCESS);
}
diff --git
a/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
b/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
index 2438ff97554..35173307269 100644
---
a/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
+++
b/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
@@ -174,6 +174,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
if (successResponse.getCode() == 0) {
flowFile = enrichFlowFile(session, flowFile,
successResponse.getAckId());
+ session.getProvenanceReporter().send(flowFile,
getTransitBaseUri() + endpoint);
success = true;
} else {
flowFile = session.putAttribute(flowFile,
"splunk.response.code", String.valueOf(successResponse.getCode()));
diff --git
a/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
b/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
index bd9e550b87d..f1cc06e1ff0 100644
---
a/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
+++
b/nifi-extension-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
@@ -145,6 +145,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
private volatile ServiceArgs splunkServiceArguments;
private volatile Service splunkService;
private volatile String requestChannel;
+ private volatile String transitBaseUri;
protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
@@ -160,6 +161,10 @@ abstract class SplunkAPICall extends AbstractProcessor {
splunkServiceArguments = getSplunkServiceArgs(context);
splunkService = getSplunkService(splunkServiceArguments);
requestChannel =
context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+ final String scheme = context.getProperty(SCHEME).getValue();
+ final String hostname =
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+ final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ transitBaseUri = "%s://%s:%d".formatted(scheme, hostname, port);
}
private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
@@ -205,6 +210,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
requestChannel = null;
splunkServiceArguments = null;
+ transitBaseUri = null;
}
@Override
@@ -214,6 +220,10 @@ abstract class SplunkAPICall extends AbstractProcessor {
config.renameProperty("request-channel", REQUEST_CHANNEL.getName());
}
+ protected String getTransitBaseUri() {
+ return transitBaseUri;
+ }
+
protected ResponseMessage call(final String endpoint, final RequestMessage
request) {
request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);