This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a0c705715b NIFI-10429: Added the ability to Replay latest provenance
event for a given Processor.
a0c705715b is described below
commit a0c705715bf4f9f782c5cfce1521cfc6a4279765
Author: Mark Payne <[email protected]>
AuthorDate: Wed Aug 31 12:05:45 2022 -0400
NIFI-10429: Added the ability to Replay latest provenance event for a given
Processor.
This closes #6359
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/provenance/NoOpProvenanceRepository.java | 6 ++
nifi-docs/src/main/asciidoc/user-guide.adoc | 11 +-
.../nifi/provenance/ProvenanceRepository.java | 10 +-
.../nifi/provenance/MockProvenanceRepository.java | 21 ++--
.../api/entity/NodeReplayLastEventSnapshotDTO.java | 65 ++++++++++++
.../api/entity/ReplayLastEventRequestEntity.java | 49 +++++++++
.../api/entity/ReplayLastEventResponseEntity.java | 70 ++++++++++++
.../web/api/entity/ReplayLastEventSnapshotDTO.java | 56 ++++++++++
.../http/StandardHttpResponseMapper.java | 2 +
.../endpoints/ReplayLastEventEndpointMerger.java | 97 +++++++++++++++++
.../org/apache/nifi/controller/FlowController.java | 65 ++++++------
.../org/apache/nifi/web/NiFiServiceFacade.java | 7 ++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 5 +
.../apache/nifi/web/api/ApplicationResource.java | 9 ++
.../nifi/web/api/ProvenanceEventResource.java | 117 ++++++++++++++++++++-
.../nifi/web/controller/ControllerFacade.java | 34 ++++++
.../src/main/resources/nifi-web-api-context.xml | 1 +
.../src/main/webapp/js/nf/canvas/nf-actions.js | 90 ++++++++++++++++
.../main/webapp/js/nf/canvas/nf-context-menu.js | 19 ++++
.../provenance/PersistentProvenanceRepository.java | 6 ++
.../provenance/WriteAheadProvenanceRepository.java | 6 ++
.../apache/nifi/provenance/index/EventIndex.java | 10 ++
.../lucene/LatestEventsPerProcessorQuery.java | 20 +++-
.../provenance/index/lucene/LuceneEventIndex.java | 22 +++-
.../provenance/VolatileProvenanceRepository.java | 12 +++
.../repository/StatelessProvenanceRepository.java | 6 ++
.../apache/nifi/tests/system/NiFiClientUtil.java | 1 -
.../provenance/ClusteredReplayProvenanceIT.java | 28 +++++
.../system/provenance/ReplayProvenanceIT.java | 74 +++++++++++++
.../cli/impl/client/nifi/ProvenanceClient.java | 8 ++
.../client/nifi/impl/JerseyProvenanceClient.java | 22 ++++
31 files changed, 901 insertions(+), 48 deletions(-)
diff --git
a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java
b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java
index 8aed44745c..6444c42fbc 100644
---
a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java
+++
b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java
@@ -26,6 +26,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import static java.util.Collections.EMPTY_SET;
@@ -102,6 +103,11 @@ public class NoOpProvenanceRepository implements
ProvenanceRepository {
return null;
}
+ @Override
+ public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String
componentId) throws IOException {
+ return Optional.empty();
+ }
+
@Override
public QuerySubmission retrieveQuerySubmission(String queryIdentifier,
NiFiUser niFiUser) {
return null;
diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc
b/nifi-docs/src/main/asciidoc/user-guide.adoc
index 8dceaa60d0..55b50caa2f 100644
--- a/nifi-docs/src/main/asciidoc/user-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/user-guide.adoc
@@ -304,6 +304,7 @@ NOTE: For Processors, Ports, Remote Process Groups,
Connections and Labels, it i
- *Run Once*: This option allows the user to run a selected Processor exactly
once. If the Processor is prevented from executing (e.g., there are no incoming
FlowFiles or the outgoing connection has back pressure applied) the Processor
won't get triggered. *Execution* settings apply (i.e., *Primary Node* and *All
Nodes* settings will result in running the Processor only once on the Primary
Node or one time on each of the nodes, respectively). Works only with *Timer
driven* and *CRON driv [...]
- *Enable* or *Disable*: This option allows the user to enable or disable a
Processor; the option will be either Enable or Disable, depending on the
current state of the Processor.
- *View data provenance*: This option displays the NiFi Data Provenance table,
with information about data provenance events for the FlowFiles routed through
that Processor (see <<data_provenance>>).
+- *Replay last event*: This option will replay the last Provenance event,
effectively requeuing the last FlowFile that was processed by the Processor
(see <<replay_flowfile>>).
- *View status history*: This option opens a graphical representation of the
Processor's statistical information over time.
- *View usage*: This option takes the user to the Processor's usage
documentation.
- *View connections->Upstream*: This option allows the user to see and "jump
to" upstream connections that are coming into the Processor. This is
particularly useful when processors connect into and out of other Process
Groups.
@@ -2834,14 +2835,22 @@ a result of the processing event, the user may select
the checkbox next to "Only
image:event-attributes.png["Event Attributes", width=700]
+[[replay_flowfile]]
=== Replaying a FlowFile
A DFM may need to inspect a FlowFile's content at some point in the dataflow
to ensure that it is being processed as expected. And if it
-is not being processed properly, the DFM may need to make adjustments to the
dataflow and replay the FlowFile again. The Content tab of the View Details
dialog window is where the DFM can do these things. The Content tab shows
information about the FlowFile's content, such as its location in the Content
Repository
+is not being processed properly, the DFM may need to make adjustments to the
dataflow and replay the FlowFile again.
+This can be achieved from the Content tab of the View Details dialog window.
The Content tab shows information
+about the FlowFile's content, such as its location in the Content Repository
and its size. In addition, it is here that the user may click the "Download"
button to download a copy of the FlowFile's content as it existed
at this point in the flow. The user may also click the "Submit" button to
replay the FlowFile at this point in the flow. Upon clicking "Submit",
the FlowFile is sent to the connection feeding the component that produced
this processing event.
+When a user is developing a dataflow, it can be very beneficial to have easy
access to replaying a FlowFile, as well. For example, a user may configure
+a Processor, run a FlowFile through it, and find that the configuration needs
to be modified. The user can then update the configuration, and run the
+same FlowFile through again to verify the results. In order to ease this
process, the user can right-click on a Processor and choose the "Replay last
event"
+item. From here, the user can choose to either replay the last event from just
the Primary Node or from all nodes.
+
image:event-content.png["Event Content", width=700]
=== Viewing FlowFile Lineage
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
index bbde0c70c3..8e159d864b 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.provenance;
-
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
@@ -27,6 +26,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
public interface ProvenanceRepository extends ProvenanceEventRepository {
@@ -94,6 +94,14 @@ public interface ProvenanceRepository extends
ProvenanceEventRepository {
*/
QuerySubmission submitQuery(Query query, NiFiUser user);
+ /**
+ * Retrieves the most recent Provenance Event that is cached for the given
component that is also accessible by the given user
+ * @param componentId the ID of the component
+ * @return an Optional containing the event, or an empty optional if no
events are available or none of the available events are accessible by the
given user
+ * @throws IOException if unable to read from the repository
+ */
+ Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId)
throws IOException;
+
/**
* @param queryIdentifier of the query
* @param user The user who is retrieving the query.
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
index e1d8321f6b..c694d64972 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
@@ -16,13 +16,6 @@
*/
package org.apache.nifi.provenance;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
@@ -31,6 +24,15 @@ import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchableField;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
public class MockProvenanceRepository implements ProvenanceRepository {
private final List<ProvenanceEventRecord> records = new ArrayList<>();
@@ -90,6 +92,11 @@ public class MockProvenanceRepository implements
ProvenanceRepository {
throw new UnsupportedOperationException("MockProvenanceRepository does
not support querying");
}
+ @Override
+ public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String
componentId) throws IOException {
+ return Optional.empty();
+ }
+
@Override
public QuerySubmission retrieveQuerySubmission(String queryIdentifier,
NiFiUser user) {
throw new UnsupportedOperationException("MockProvenanceRepository does
not support querying");
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeReplayLastEventSnapshotDTO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeReplayLastEventSnapshotDTO.java
new file mode 100644
index 0000000000..0d36d6e1d0
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeReplayLastEventSnapshotDTO.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "nodeReplayLastEventSnapshot")
+public class NodeReplayLastEventSnapshotDTO {
+ private String nodeId;
+ private String address;
+ private Integer apiPort;
+ private ReplayLastEventSnapshotDTO snapshot;
+
+ @ApiModelProperty("The unique ID that identifies the node")
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @ApiModelProperty("The API address of the node")
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ @ApiModelProperty("The API port used to communicate with the node")
+ public Integer getApiPort() {
+ return apiPort;
+ }
+
+ public void setApiPort(Integer apiPort) {
+ this.apiPort = apiPort;
+ }
+
+ @ApiModelProperty("The snapshot from the node")
+ public ReplayLastEventSnapshotDTO getSnapshot() {
+ return snapshot;
+ }
+
+ public void setSnapshot(final ReplayLastEventSnapshotDTO snapshot) {
+ this.snapshot = snapshot;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventRequestEntity.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventRequestEntity.java
new file mode 100644
index 0000000000..82effe0065
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventRequestEntity.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "replayLastEventRequestEntity")
+public class ReplayLastEventRequestEntity extends Entity {
+
+ private String componentId;
+ private String nodes;
+
+ @ApiModelProperty("The UUID of the component whose last event should be
replayed.")
+ public String getComponentId() {
+ return componentId;
+ }
+
+ public void setComponentId(final String componentId) {
+ this.componentId = componentId;
+ }
+
+ @ApiModelProperty(
+ value = "Which nodes are to replay their last provenance event.",
+ allowableValues = "ALL, PRIMARY"
+ )
+ public String getNodes() {
+ return nodes;
+ }
+
+ public void setNodes(String nodes) {
+ this.nodes = nodes;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventResponseEntity.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventResponseEntity.java
new file mode 100644
index 0000000000..e8f3ab39be
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventResponseEntity.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+
+@XmlRootElement(name = "replayLastEventResponseEntity")
+public class ReplayLastEventResponseEntity extends Entity {
+
+ private String componentId;
+ private String nodes;
+ private ReplayLastEventSnapshotDTO aggregateSnapshot;
+ private List<NodeReplayLastEventSnapshotDTO> nodeSnapshots;
+
+ @ApiModelProperty("The UUID of the component whose last event should be
replayed.")
+ public String getComponentId() {
+ return componentId;
+ }
+
+ public void setComponentId(final String componentId) {
+ this.componentId = componentId;
+ }
+
+ @ApiModelProperty(
+ value = "Which nodes were requested to replay their last provenance
event.",
+ allowableValues = "ALL, PRIMARY"
+ )
+ public String getNodes() {
+ return nodes;
+ }
+
+ public void setNodes(String nodes) {
+ this.nodes = nodes;
+ }
+
+ @ApiModelProperty("The aggregate result of all nodes' responses")
+ public ReplayLastEventSnapshotDTO getAggregateSnapshot() {
+ return aggregateSnapshot;
+ }
+
+ public void setAggregateSnapshot(final ReplayLastEventSnapshotDTO
aggregateSnapshot) {
+ this.aggregateSnapshot = aggregateSnapshot;
+ }
+
+ @ApiModelProperty("The node-wise results")
+ public List<NodeReplayLastEventSnapshotDTO> getNodeSnapshots() {
+ return nodeSnapshots;
+ }
+
+ public void setNodeSnapshots(final List<NodeReplayLastEventSnapshotDTO>
nodeSnapshots) {
+ this.nodeSnapshots = nodeSnapshots;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventSnapshotDTO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventSnapshotDTO.java
new file mode 100644
index 0000000000..47185593ea
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReplayLastEventSnapshotDTO.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collection;
+
+@XmlRootElement(name = "replayLastEventSnapshot")
+public class ReplayLastEventSnapshotDTO {
+ private Collection<Long> eventsReplayed;
+ private String failureExplanation;
+ private Boolean eventAvailable;
+
+ @ApiModelProperty("The IDs of the events that were successfully replayed")
+ public Collection<Long> getEventsReplayed() {
+ return eventsReplayed;
+ }
+
+ public void setEventsReplayed(final Collection<Long> eventsReplayed) {
+ this.eventsReplayed = eventsReplayed;
+ }
+
+ @ApiModelProperty("If unable to replay an event, specifies why the event
could not be replayed")
+ public String getFailureExplanation() {
+ return failureExplanation;
+ }
+
+ public void setFailureExplanation(final String failureExplanation) {
+ this.failureExplanation = failureExplanation;
+ }
+
+ @ApiModelProperty("Whether or not an event was available. This may not be
populated if there was a failure.")
+ public Boolean getEventAvailable() {
+ return eventAvailable;
+ }
+
+ public void setEventAvailable(final Boolean eventAvailable) {
+ this.eventAvailable = eventAvailable;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index 61e5cab30b..440dc60d5f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -65,6 +65,7 @@ import
org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpoi
import
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
+import
org.apache.nifi.cluster.coordination.http.endpoints.ReplayLastEventEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskTypesEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
@@ -173,6 +174,7 @@ public class StandardHttpResponseMapper implements
HttpResponseMapper {
endpointMergers.add(new ParameterContextUpdateEndpointMerger());
endpointMergers.add(new VerifyConfigEndpointMerger());
endpointMergers.add(new RuntimeManifestEndpointMerger());
+ endpointMergers.add(new ReplayLastEventEndpointMerger());
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReplayLastEventEndpointMerger.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReplayLastEventEndpointMerger.java
new file mode 100644
index 0000000000..7911713e2b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReplayLastEventEndpointMerger.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.NodeReplayLastEventSnapshotDTO;
+import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class ReplayLastEventEndpointMerger extends
AbstractSingleEntityEndpoint<ReplayLastEventResponseEntity> implements
EndpointResponseMerger {
+ public static final String REPLAY_URI =
"/nifi-api/provenance-events/latest/replays";
+
+ @Override
+ public boolean canHandle(final URI uri, final String method) {
+ return "POST".equals(method) && REPLAY_URI.equals(uri.getPath());
+ }
+
+ @Override
+ protected Class<ReplayLastEventResponseEntity> getEntityClass() {
+ return ReplayLastEventResponseEntity.class;
+ }
+
+ @Override
+ protected void mergeResponses(final ReplayLastEventResponseEntity
clientEntity, final Map<NodeIdentifier, ReplayLastEventResponseEntity>
entityMap, final Set<NodeResponse> successfulResponses,
+ final Set<NodeResponse>
problematicResponses) {
+
+ // Move all aggregate snapshots into the node snapshots.
+ final Set<Long> replayedEventIds = new HashSet<>();
+ final Set<String> failureExplanations = new HashSet<>();
+ boolean eventAvailable = false;
+ for (final Map.Entry<NodeIdentifier, ReplayLastEventResponseEntity>
entry : entityMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ReplayLastEventResponseEntity nodeEntity = entry.getValue();
+
+ final ReplayLastEventSnapshotDTO nodeSnapshot =
nodeEntity.getAggregateSnapshot();
+ final NodeReplayLastEventSnapshotDTO nodeResponseDto = new
NodeReplayLastEventSnapshotDTO();
+ nodeResponseDto.setAddress(nodeId.getApiAddress());
+ nodeResponseDto.setApiPort(nodeId.getApiPort());
+ nodeResponseDto.setNodeId(nodeId.getId());
+ nodeResponseDto.setSnapshot(nodeSnapshot);
+
+ if (clientEntity.getNodeSnapshots() == null) {
+ clientEntity.setNodeSnapshots(new ArrayList<>());
+ }
+ clientEntity.getNodeSnapshots().add(nodeResponseDto);
+
+ final Collection<Long> eventsReplayed =
nodeSnapshot.getEventsReplayed();
+ if (eventsReplayed != null) {
+ replayedEventIds.addAll(eventsReplayed);
+ }
+
+ final String failureExplanation =
nodeSnapshot.getFailureExplanation();
+ if (failureExplanation != null) {
+ failureExplanations.add(nodeId.getApiAddress() + ":" +
nodeId.getApiPort() + " - " + failureExplanation);
+ }
+
+ eventAvailable = eventAvailable ||
nodeSnapshot.getEventAvailable() == Boolean.TRUE;
+ }
+
+ // Update the aggregate snapshot
+
clientEntity.getAggregateSnapshot().setEventsReplayed(replayedEventIds);
+ clientEntity.getAggregateSnapshot().setEventAvailable(eventAvailable);
+
+ if (failureExplanations.isEmpty()) {
+ return;
+ }
+ if (failureExplanations.size() == 1) {
+ clientEntity.getAggregateSnapshot().setFailureExplanation("One
node failed to replay the latest event: " +
failureExplanations.iterator().next());
+ } else {
+
clientEntity.getAggregateSnapshot().setFailureExplanation(failureExplanations.size()
+ " nodes failed to replay the latest events. See logs for more details.");
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ce5fc64f83..6ac54d3c3a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2781,16 +2781,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
return "Cannot replay data from Provenance Event because the event
does not specify the Source FlowFile Queue";
}
- final Set<Connection> connections = flowManager.findAllConnections();
- FlowFileQueue queue = null;
- for (final Connection connection : connections) {
- if
(event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
- queue = connection.getFlowFileQueue();
- break;
- }
- }
-
- if (queue == null) {
+ final Connection connection =
flowManager.getConnection(event.getSourceQueueIdentifier());
+ if (connection == null) {
return "Cannot replay data from Provenance Event because the
Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no
longer exists";
}
@@ -2818,10 +2810,28 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
}
// Make sure event has the Content Claim info
- final Long contentSize = event.getPreviousFileSize();
- final String contentClaimId =
event.getPreviousContentClaimIdentifier();
- final String contentClaimSection =
event.getPreviousContentClaimSection();
- final String contentClaimContainer =
event.getPreviousContentClaimContainer();
+ boolean usePrevious = true;
+ Long contentSize = event.getPreviousFileSize();
+ String contentClaimId = event.getPreviousContentClaimIdentifier();
+ String contentClaimSection = event.getPreviousContentClaimSection();
+ String contentClaimContainer =
event.getPreviousContentClaimContainer();
+ Long contentClaimOffset = event.getPreviousContentClaimOffset();
+
+ final int previousClaimNulls = countNulls(contentSize, contentClaimId,
contentClaimSection, contentClaimContainer);
+ if (previousClaimNulls == 4) {
+ contentClaimId = event.getContentClaimIdentifier();
+ contentClaimSection = event.getContentClaimSection();
+ contentClaimContainer = event.getContentClaimContainer();
+
+ final int currentClaimNullCounts = countNulls(contentClaimId,
contentClaimSection, contentClaimContainer);
+
+ // If the current claim is also all null, we will stick with using
the previous. Otherwise, we'll denote that we're using the current claim
+ usePrevious = currentClaimNullCounts == 3;
+ if (!usePrevious) {
+ contentSize = event.getFileSize();
+ contentClaimOffset = event.getContentClaimOffset();
+ }
+ }
// All content fields must be null or no content fields can be null.
final int nullCount = countNulls(contentSize, contentClaimId,
contentClaimSection, contentClaimContainer);
@@ -2834,16 +2844,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
throw new IllegalArgumentException("Cannot replay data from
Provenance Event because the event does not specify the Source FlowFile Queue");
}
- final Set<Connection> connections = flowManager.findAllConnections();
- FlowFileQueue queue = null;
- for (final Connection connection : connections) {
- if
(event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
- queue = connection.getFlowFileQueue();
- break;
- }
- }
-
- if (queue == null) {
+ final Connection connection =
flowManager.getConnection(event.getSourceQueueIdentifier());
+ if (connection == null) {
throw new IllegalStateException("Cannot replay data from
Provenance Event because the Source FlowFile Queue with ID " +
event.getSourceQueueIdentifier() + " no longer exists");
}
@@ -2859,18 +2861,17 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
// being written to by the Content Repository. This is important
only because we are creating a FlowFile with this Resource
// Claim. If, for instance, we are simply creating the claim to
request its content, as in #getContentAvailability, etc.
// then this is not necessary.
- ResourceClaim resourceClaim =
resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(),
- event.getPreviousContentClaimSection(),
event.getPreviousContentClaimIdentifier());
+ ResourceClaim resourceClaim =
resourceClaimManager.getResourceClaim(contentClaimContainer,
contentClaimSection, contentClaimId);
if (resourceClaim == null) {
- resourceClaim =
resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
- event.getPreviousContentClaimSection(),
event.getPreviousContentClaimIdentifier(), false, false);
+ resourceClaim =
resourceClaimManager.newResourceClaim(contentClaimContainer,
+ contentClaimSection, contentClaimId, false, false);
}
// Increment Claimant Count, since we will now be referencing the
Content Claim
resourceClaimManager.incrementClaimantCount(resourceClaim);
- final long claimOffset = event.getPreviousContentClaimOffset() ==
null ? 0L : event.getPreviousContentClaimOffset();
+ final long claimOffset = contentClaimOffset == null ? 0L :
contentClaimOffset;
contentClaim = new StandardContentClaim(resourceClaim,
claimOffset);
- contentClaim.setLength(event.getPreviousFileSize() == null ? -1L :
event.getPreviousFileSize());
+ contentClaim.setLength(contentSize == null ? -1L : contentSize);
if (!contentRepository.isAccessible(contentClaim)) {
resourceClaimManager.decrementClaimantCount(resourceClaim);
@@ -2893,7 +2894,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
// FlowFileRecord's contentClaimOffset to 0.
final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
// Copy relevant info from source FlowFile
- .addAttributes(event.getPreviousAttributes())
+ .addAttributes(usePrevious ? event.getPreviousAttributes() :
event.getAttributes())
.contentClaim(contentClaim)
.contentClaimOffset(0L) // use 0 because we used the content
claim offset in the Content Claim itself
.entryDate(System.currentTimeMillis())
@@ -2923,10 +2924,12 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
.setLineageStartDate(event.getLineageStartDate())
.setComponentType(event.getComponentType())
.setComponentId(event.getComponentId())
+ .setSourceQueueIdentifier(event.getSourceQueueIdentifier())
.build();
provenanceRepository.registerEvent(replayEvent);
// Update the FlowFile Repository to indicate that we have added the
FlowFile to the flow
+ final FlowFileQueue queue = connection.getFlowFileQueue();
final StandardRepositoryRecord record = new
StandardRepositoryRecord(queue);
record.setWorking(flowFileRecord, false);
record.setDestination(queue);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index aa75f20234..7cf07768c8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -224,6 +224,13 @@ public interface NiFiServiceFacade {
*/
ProvenanceEventDTO submitReplay(Long eventId);
+ /**
+ * Submits a replay request for the latest event generated by the
component with the given ID
+ * @param componentId the ID the component
+ * @return the event, or <code>null</code> if no event was available
+ */
+ ProvenanceEventDTO submitReplayLastEvent(String componentId);
+
/**
* Gets the content for the specified claim.
*
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index aa39cb6d7a..b8135eee9b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -3336,6 +3336,11 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
return controllerFacade.submitReplay(eventId);
}
+ @Override
+ public ProvenanceEventDTO submitReplayLastEvent(final String componentId) {
+ return controllerFacade.submitReplayLastEvent(componentId);
+ }
+
// -----------------------------------------
// Read Operations
// -----------------------------------------
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 8006caa6ac..a339800d36 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -916,6 +916,15 @@ public abstract class ApplicationResource {
throw new NoClusterCoordinatorException();
}
+ protected Optional<NodeIdentifier> getPrimaryNodeId() {
+ final ClusterCoordinator coordinator = getClusterCoordinator();
+ if (coordinator == null) {
+ throw new NoClusterCoordinatorException();
+ }
+
+ return Optional.ofNullable(coordinator.getPrimaryNode());
+ }
+
protected ReplicationTarget getReplicationTarget() {
return clusterCoordinator.isActiveClusterCoordinator() ?
ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
index 6cc234362f..5294ad0a4f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
@@ -22,6 +22,11 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -31,8 +36,13 @@ import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO;
import org.apache.nifi.web.api.entity.SubmitReplayRequestEntity;
import org.apache.nifi.web.api.request.LongParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
@@ -52,7 +62,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
-
+import java.util.Collections;
/**
* RESTful endpoint for querying data provenance.
@@ -63,8 +73,10 @@ import java.net.URI;
description = "Endpoint for accessing data flow provenance."
)
public class ProvenanceEventResource extends ApplicationResource {
+ private static final Logger logger =
LoggerFactory.getLogger(ProvenanceEventResource.class);
private NiFiServiceFacade serviceFacade;
+ private Authorizer authorizer;
/**
* Gets the content for the input of the specified event.
@@ -310,6 +322,105 @@ public class ProvenanceEventResource extends
ApplicationResource {
return generateOkResponse(entity).build();
}
+
+ /**
+ * Triggers the latest Provenance Event for the specified component to be
replayed.
+ *
+ * @param httpServletRequest request
+ * @param requestEntity The replay request
+ * @return A replayLastEventResponseEntity
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("latest/replays")
+ @ApiOperation(
+ value = "Replays content from a provenance event",
+ response = ReplayLastEventResponseEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Component Provenance Data -
/provenance-data/{component-type}/{uuid}"),
+ @Authorization(value = "Read Component Data -
/data/{component-type}/{uuid}"),
+ @Authorization(value = "Write Component Data -
/data/{component-type}/{uuid}")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ }
+ )
+ public Response submitReplayLatestEvent(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The replay request.",
+ required = true
+ ) final ReplayLastEventRequestEntity requestEntity) {
+
+ // ensure the event id is specified
+ if (requestEntity == null || requestEntity.getComponentId() == null) {
+ throw new IllegalArgumentException("The id of the component must
be specified.");
+ }
+ final String requestedNodes = requestEntity.getNodes();
+ if (requestedNodes == null) {
+ throw new IllegalArgumentException("The nodes must be specified.");
+ }
+ if (!"ALL".equalsIgnoreCase(requestedNodes) &&
!"PRIMARY".equalsIgnoreCase(requestedNodes)) {
+ throw new IllegalArgumentException("The nodes must be either ALL
or PRIMARY");
+ }
+
+ // replicate if cluster manager
+ if (isReplicateRequest()) {
+ // Replicate to either Primary Node or all nodes
+ if (requestedNodes.equalsIgnoreCase("PRIMARY")) {
+ final NodeIdentifier primaryNodeId =
getPrimaryNodeId().orElseThrow(() -> new IllegalStateException("There is
currently no Primary Node elected"));
+ return replicate(HttpMethod.POST, requestEntity,
primaryNodeId.getId());
+ } else {
+ return replicate(HttpMethod.POST, requestEntity);
+ }
+ }
+
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ lookup -> {
+ final Authorizable provenance = lookup.getProvenance();
+ provenance.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ },
+ () -> {}, // No verification step necessary - this can be done any
time
+ entity -> {
+ final ReplayLastEventSnapshotDTO aggregateSnapshot = new
ReplayLastEventSnapshotDTO();
+
+ // Submit provenance query
+ try {
+ final ProvenanceEventDTO provenanceEventDto =
serviceFacade.submitReplayLastEvent(entity.getComponentId());
+
+ if (provenanceEventDto == null) {
+ aggregateSnapshot.setEventAvailable(false);
+ } else {
+ aggregateSnapshot.setEventAvailable(true);
+
aggregateSnapshot.setEventsReplayed(Collections.singleton(provenanceEventDto.getEventId()));
+ }
+ } catch (final AccessDeniedException ade) {
+ logger.error("Failed to replay latest Provenance Event",
ade);
+ aggregateSnapshot.setFailureExplanation("Access Denied");
+ } catch (final Exception e) {
+ logger.error("Failed to replay latest Provenance Event",
e);
+ aggregateSnapshot.setFailureExplanation(e.getMessage());
+ }
+
+ final ReplayLastEventResponseEntity responseEntity = new
ReplayLastEventResponseEntity();
+ responseEntity.setComponentId(entity.getComponentId());
+ responseEntity.setNodes(entity.getNodes());
+ responseEntity.setAggregateSnapshot(aggregateSnapshot);
+
+ return generateOkResponse(responseEntity).build();
+ });
+ }
+
+
/**
* Creates a new replay request for the content associated with the
specified provenance event id.
*
@@ -393,4 +504,8 @@ public class ProvenanceEventResource extends
ApplicationResource {
this.serviceFacade = serviceFacade;
}
+ public void setAuthorizer(Authorizer authorizer) {
+ this.authorizer = authorizer;
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index a762d8a8a1..f04a9cf45e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -132,6 +132,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimeZone;
@@ -1359,6 +1360,39 @@ public class ControllerFacade implements Authorizable {
}
}
+ /**
+ * Submits for replay the latest provenance event that is cached for the
component with the given ID
+ * @param componentId the ID of the component
+ * @return the ProvenanceEventDTO representing the event that was
replayed, or <code>null</code> if the no event was available
+ * @throws AccessDeniedException if an event is available but the current
user is not permitted to replay the event
+ */
+ public ProvenanceEventDTO submitReplayLastEvent(String componentId) {
+ try {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to
access details for current user."));
+ }
+
+ // lookup the original event
+ final Optional<ProvenanceEventRecord> optionalEvent =
flowController.getProvenanceRepository().getLatestCachedEvent(componentId);
+ if (!optionalEvent.isPresent()) {
+ return null;
+ }
+
+ // Authorize the replay
+ final ProvenanceEventRecord event = optionalEvent.get();
+ authorizeReplay(event);
+
+ // Replay the FlowFile
+ flowController.replayFlowFile(event, user);
+
+ // convert the event record
+ return createProvenanceEventDto(event, false);
+ } catch (final IOException ioe) {
+ throw new NiFiCoreException("An error occurred while getting the
specified event.", ioe);
+ }
+ }
+
/**
* Authorizes access to replay a specified provenance event. Whether to
check read data permission can be specified. The context this
* method is invoked may have already verified these permissions. Using a
flag here as it forces the caller to acknowledge this fact
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 7c4b65359f..839e6af554 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -560,6 +560,7 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="requestReplicator" ref="requestReplicator" />
<property name="flowController" ref="flowController" />
+ <property name="authorizer" ref="authorizer" />
</bean>
<bean id="countersResource"
class="org.apache.nifi.web.api.CountersResource" scope="singleton">
<property name="serviceFacade" ref="serviceFacade"/>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index d256a9c615..7cfbd2a1fa 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -676,6 +676,96 @@
}
},
+ replayLastProvenanceEvent: function(selection, nodes) {
+ if (selection.size() === 1) {
+ var selectionData = selection.datum();
+
+ // submit replay event
+ var entity = {
+ 'componentId': selectionData.id,
+ 'nodes': nodes
+ };
+
+ $.ajax({
+ type: 'POST',
+ url: config.urls.api + '/provenance-events/latest/replays',
+ data: JSON.stringify(entity),
+ dataType: 'json',
+ contentType: 'application/json'
+ }).fail(function (xhr, status, error) {
+ nfDialog.showOkDialog({
+ headerText: 'Failed to Replay Event',
+ dialogContent: nfCommon.escapeHtml(xhr.responseText)
+ });
+ }).done(function (response) {
+ if
(nfCommon.isDefinedAndNotNull(response.aggregateSnapshot.failureExplanation)) {
+ nfDialog.showOkDialog({
+ headerText: 'Replay Event Failure',
+ dialogContent:
response.aggregateSnapshot.failureExplanation
+ });
+ } else if (response.aggregateSnapshot.eventAvailable !==
true) {
+ nfDialog.showOkDialog({
+ headerText: 'No Event Available',
+ dialogContent: 'There was no recent event
available to be replayed.'
+ });
+ } else if
(nfCommon.isDefinedAndNotNull(response.nodeSnapshots)) {
+ var replayedCount = 0;
+ var unavailableCount = 0;
+
+ for (var i = 0; i < response.nodeSnapshots.length;
i++) {
+ var nodeResponse = response.nodeSnapshots[i];
+ if (nodeResponse.snapshot.eventAvailable) {
+ replayedCount++;
+ } else {
+ unavailableCount++;
+ }
+ }
+
+ var messageText;
+ if (unavailableCount === 0) {
+ messageText = 'All nodes successfully replayed the
latest event.';
+ } else {
+ messageText = replayedCount + ' nodes successfully
replayed the latest event but ' + unavailableCount + ' had no recent event
avaialble to be replayed.';
+ }
+
+ nfDialog.showOkDialog({
+ headerText: 'Events Replayed',
+ dialogContent: messageText
+ });
+ } else {
+ nfDialog.showOkDialog({
+ headerText: 'Event Replayed',
+ dialogContent: 'Successfully replayed the latest
event.'
+ });
+ }
+
+ var componentConnections =
nfConnection.getComponentConnections(selectionData.id);
+ for (var i=0; i < componentConnections.length; i++) {
+ var connection = componentConnections[i];
+ nfConnection.reload(connection.id);
+ }
+ });
+ }
+ },
+
+ /**
+ * Submits a request to replay the last provenance event on all nodes
in the cluster.
+ *
+ * @argument {selection} selection The selection
+ */
+ replayLastAllNodes: function (selection) {
+ this.replayLastProvenanceEvent(selection, 'ALL');
+ },
+
+ /**
+ * Submits a request to replay the last provenance event on the
primary node in the cluster.
+ *
+ * @argument {selection} selection The selection
+ */
+ replayLastPrimaryNode: function (selection) {
+ this.replayLastProvenanceEvent(selection, 'PRIMARY');
+ },
+
/**
* Starts the components in the specified selection.
*
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
index 6673fbc062..cb09577fc9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js
@@ -637,6 +637,20 @@
&& !nfCanvasUtils.isRemoteProcessGroup(selection) &&
nfCommon.canAccessProvenance();
};
+ /**
+ * Determines whether the current selection should provide ability to
replay latest provenance event.
+ *
+ * @param {selection} selection
+ */
+ var canReplayProvenance = function (selection) {
+ // ensure the correct number of components are selected
+ if (selection.size() !== 1) {
+ return false;
+ }
+
+ return nfCanvasUtils.isProcessor(selection) &&
nfCommon.canAccessProvenance();
+ }
+
/**
* Determines whether the current selection is a remote process group.
*
@@ -831,6 +845,11 @@
{id: 'disable-all-controller-services-menu-item-noselection',
condition: emptySelection, menuItem: {clazz: 'icon icon-enable-false', text:
'Disable all controller services', action: 'disableAllControllerServices'}},
{separator: true},
{id: 'data-provenance-menu-item', condition: canAccessProvenance,
menuItem: {clazz: 'icon icon-provenance', imgStyle: 'context-menu-provenance',
text: 'View data provenance', action: 'openProvenance'}},
+ {id: 'data-provenance-replay-last-menu-item', condition:
canReplayProvenance, groupMenuItem: {clazz: 'fa fa-repeat', text: 'Replay last
event'}, menuItems: [
+ {id: 'replay-last-all-nodes-menu-item', condition:
canReplayProvenance, menuItem: {text: 'All nodes', action:
'replayLastAllNodes'}},
+ {id: 'replay-last-primary-node-menu-item', condition:
canReplayProvenance, menuItem: {text: 'Primary node', action:
'replayLastPrimaryNode'}}
+ ]},
+ {separator: true},
{id: 'show-stats-menu-item', condition: supportsStats, menuItem:
{clazz: 'fa fa-area-chart', text: 'View status history', action: 'showStats'}},
{id: 'view-state-menu-item', condition: isStatefulProcessor, menuItem:
{clazz: 'fa fa-tasks', text: 'View state', action: 'viewState'}},
{id: 'list-queue-menu-item', condition: canListQueue, menuItem:
{clazz: 'fa fa-list', text: 'List queue', action: 'listQueue'}},
diff --git
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 72d593dbb2..24f95f67f1 100644
---
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -99,6 +99,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -2061,6 +2062,11 @@ public class PersistentProvenanceRepository implements
ProvenanceRepository {
return result;
}
+ @Override
+ public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String
componentId) throws IOException {
+ return Optional.empty();
+ }
+
/**
* This is for testing only and not actually used other than in debugging
*
diff --git
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
index 07eab61c9b..08cfc3f017 100644
---
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
+++
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
@@ -57,6 +57,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
@@ -257,6 +258,11 @@ public class WriteAheadProvenanceRepository implements
ProvenanceRepository {
return eventIndex.submitQuery(query, createEventAuthorizer(user), user
== null ? null : user.getIdentity());
}
+ @Override
+ public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String
componentId) throws IOException {
+ return eventIndex.getLatestCachedEvent(componentId);
+ }
+
@Override
public QuerySubmission retrieveQuerySubmission(final String
queryIdentifier, final NiFiUser user) {
return eventIndex.retrieveQuerySubmission(queryIdentifier, user);
diff --git
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
index 2f13742439..a8addb6a03 100644
---
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
+++
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
@@ -29,6 +29,7 @@ import org.apache.nifi.provenance.store.EventStore;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
+import java.util.Optional;
/**
* An Event Index is responsible for indexing Provenance Events in such a way
that the index can be quickly
@@ -81,6 +82,15 @@ public interface EventIndex extends Closeable {
*/
QuerySubmission submitQuery(Query query, EventAuthorizer authorizer,
String userId);
+ /**
+ * Retrieves the most recent Provenance Event that is cached for the given
component that is also accessible by the given user
+ * @param componentId the ID of the component
+ *
+ * @return an Optional containing the event, or an empty optional if no
events are available or none of the available events are accessible by the
given user
+ * @throws IOException if unable to read from the repository
+ */
+ Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId)
throws IOException;
+
/**
* Asynchronously computes the lineage for the FlowFile that is identified
by the Provenance Event with the given ID.
*
diff --git
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
index 1c09d5775e..36bfec85d1 100644
---
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
+++
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
@@ -17,11 +17,6 @@
package org.apache.nifi.provenance.index.lucene;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.search.Query;
@@ -29,6 +24,12 @@ import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.util.RingBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
public class LatestEventsPerProcessorQuery implements CachedQuery {
private static final String COMPONENT_ID_FIELD_NAME =
SearchableFields.ComponentID.getSearchableFieldName();
private final ConcurrentMap<String, RingBuffer<Long>> latestRecords = new
ConcurrentHashMap<>();
@@ -40,6 +41,15 @@ public class LatestEventsPerProcessorQuery implements
CachedQuery {
ringBuffer.add(storageSummary.getEventId());
}
+ public List<Long> getLatestEventIds(final String componentId) {
+ final RingBuffer<Long> ringBuffer = latestRecords.get(componentId);
+ if (ringBuffer == null) {
+ return Collections.emptyList();
+ }
+
+ return ringBuffer.asList();
+ }
+
@Override
public Optional<List<Long>> evaluate(final Query query) {
if (query.getMaxResults() > 1000) {
diff --git
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
index f8cf7092d3..d47ecde77c 100644
---
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
+++
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
@@ -107,6 +107,7 @@ public class LuceneEventIndex implements EventIndex {
private final EventReporter eventReporter;
private final List<CachedQuery> cachedQueries = new ArrayList<>();
+ private LatestEventsPerProcessorQuery latestEventsPerProcessorQuery; //
effectively final
private ScheduledExecutorService maintenanceExecutor; // effectively final
private ScheduledExecutorService cacheWarmerExecutor;
@@ -163,7 +164,8 @@ public class LuceneEventIndex implements EventIndex {
maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries,
30, 30, TimeUnit.SECONDS);
cachedQueries.add(new LatestEventsQuery());
- cachedQueries.add(new LatestEventsPerProcessorQuery());
+ latestEventsPerProcessorQuery = new LatestEventsPerProcessorQuery();
+ cachedQueries.add(latestEventsPerProcessorQuery);
triggerReindexOfDefunctIndices();
triggerCacheWarming();
@@ -667,6 +669,24 @@ public class LuceneEventIndex implements EventIndex {
return submission;
}
+ @Override
+ public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String
componentId) throws IOException {
+ final List<Long> eventIds =
latestEventsPerProcessorQuery.getLatestEventIds(componentId);
+ if (eventIds.isEmpty()) {
+ logger.info("There are no recent Provenance Events cached for
Component with ID {}", componentId);
+ return Optional.empty();
+ }
+
+ final Long latestEventId = eventIds.get(eventIds.size() - 1);
+ final Optional<ProvenanceEventRecord> latestEvent =
eventStore.getEvent(latestEventId);
+ if (latestEvent.isPresent()) {
+ logger.info("Returning {} as the most recent Provenance Events
cached for Component with ID {}", latestEvent.get(), componentId);
+ } else {
+ logger.info("There are no recent Provenance Events cached for
Component with ID {}", componentId);
+ }
+
+ return latestEvent;
+ }
@Override
public ComputeLineageSubmission submitLineageComputation(final String
flowFileUuid, final NiFiUser user, final EventAuthorizer eventAuthorizer) {
diff --git
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index 3ac9232e4a..123efb2289 100644
---
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -50,6 +50,7 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -490,6 +491,17 @@ public class VolatileProvenanceRepository implements
ProvenanceRepository {
return result;
}
+ @Override
+ public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String
componentId) throws IOException {
+ final List<ProvenanceEventRecord> matches =
ringBuffer.getSelectedElements(event ->
componentId.equals(event.getComponentId()));
+
+ if (matches.isEmpty()) {
+ return Optional.empty();
+ }
+
+ return Optional.of(matches.get(matches.size() - 1));
+ }
+
@Override
public QuerySubmission retrieveQuerySubmission(final String
queryIdentifier, final NiFiUser user) {
final QuerySubmission submission =
querySubmissionMap.get(queryIdentifier);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java
index 4f8914c3aa..fcc2e797d5 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -152,6 +153,11 @@ public class StatelessProvenanceRepository implements
ProvenanceRepository {
throw new UnsupportedOperationException();
}
+ @Override
+ public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String
componentId) throws IOException {
+ return Optional.empty();
+ }
+
@Override
public QuerySubmission retrieveQuerySubmission(final String
queryIdentifier, final NiFiUser user) {
throw new UnsupportedOperationException();
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 7276a53ace..c33e403875 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -1399,5 +1399,4 @@ public class NiFiClientUtil {
return
nifiClient.getReportingTasksClient().updateReportingTask(entity);
}
-
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredReplayProvenanceIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredReplayProvenanceIT.java
new file mode 100644
index 0000000000..5407830f3a
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredReplayProvenanceIT.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.provenance;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+
+public class ClusteredReplayProvenanceIT extends ReplayProvenanceIT {
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createTwoNodeInstanceFactory();
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ReplayProvenanceIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ReplayProvenanceIT.java
new file mode 100644
index 0000000000..a163493d10
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ReplayProvenanceIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.provenance;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import
org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient.ReplayEventNodes;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReplayProvenanceIT extends NiFiSystemIT {
+
+ @ParameterizedTest
+ @EnumSource(ReplayEventNodes.class)
+ public void testReplayLastEvent(final ReplayEventNodes nodes) throws
NiFiClientException, IOException, InterruptedException {
+ ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+ ConnectionEntity connection =
getClientUtil().createConnection(generate, terminate, "success");
+
+ // Run Generate once
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(connection.getId(), getNumberOfNodes());
+ getClientUtil().stopProcessor(generate);
+
+ // Run terminate once
+ getClientUtil().startProcessor(terminate);
+ waitForQueueCount(connection.getId(), 0);
+ getClientUtil().stopProcessor(terminate);
+
+ // Replay last event for terminate and ensure that data is queued up.
+ final ReplayLastEventResponseEntity replayResponse =
getNifiClient().getProvenanceClient().replayLastEvent(terminate.getId(), nodes);
+
assertNull(replayResponse.getAggregateSnapshot().getFailureExplanation());
+ assertEquals(Boolean.TRUE,
replayResponse.getAggregateSnapshot().getEventAvailable());
+ final int expectedEventsPlayed = (nodes == ReplayEventNodes.PRIMARY) ?
1 : getNumberOfNodes();
+ assertEquals(expectedEventsPlayed,
replayResponse.getAggregateSnapshot().getEventsReplayed().size());
+
+ waitForQueueCount(connection.getId(), expectedEventsPlayed);
+
+ // Attempt to replay event for generate - it should provide an error
because this is a source processor whose event cannot be replayed
+ final ReplayLastEventResponseEntity generateReplayResponse =
getNifiClient().getProvenanceClient().replayLastEvent(generate.getId(), nodes);
+ final String failureExplanation =
generateReplayResponse.getAggregateSnapshot().getFailureExplanation();
+ assertNotNull(failureExplanation);
+
+ // The failure text is not provided if multiple nodes failed, as it
can get too unwieldy to understand.
+ final String expectedFailureText = (nodes == ReplayEventNodes.ALL &&
getNumberOfNodes() > 1) ? "See logs for more details" : "Source FlowFile Queue";
+ assertTrue(failureExplanation.contains(expectedFailureText),
failureExplanation);
+ }
+
+}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java
index 60774c9361..8f9c6808ef 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java
@@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import java.io.IOException;
@@ -33,4 +34,11 @@ public interface ProvenanceClient {
LineageEntity getLineageRequest(String lineageRequestId) throws
NiFiClientException, IOException;
LineageEntity deleteLineageRequest(String lineageRequestId) throws
NiFiClientException, IOException;
+
+ ReplayLastEventResponseEntity replayLastEvent(String processorId,
ReplayEventNodes replayEventNodes) throws NiFiClientException, IOException;
+
+ enum ReplayEventNodes {
+ PRIMARY,
+ ALL;
+ }
}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java
index ea83b34fac..830fa8213a 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java
@@ -22,14 +22,18 @@ import
org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.web.api.entity.LineageEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity;
+import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
+import java.util.Objects;
public class JerseyProvenanceClient extends AbstractJerseyClient implements
ProvenanceClient {
private final WebTarget provenanceTarget;
+ private final WebTarget provenanceEventsTarget;
public JerseyProvenanceClient(final WebTarget baseTarget) {
this(baseTarget, null);
@@ -38,6 +42,7 @@ public class JerseyProvenanceClient extends
AbstractJerseyClient implements Prov
public JerseyProvenanceClient(final WebTarget baseTarget, final
RequestConfig requestConfig) {
super(requestConfig);
this.provenanceTarget = baseTarget.path("/provenance");
+ this.provenanceEventsTarget = baseTarget.path("/provenance-events");
}
@Override
@@ -116,4 +121,21 @@ public class JerseyProvenanceClient extends
AbstractJerseyClient implements Prov
});
}
+
+ @Override
+ public ReplayLastEventResponseEntity replayLastEvent(final String
processorId, final ReplayEventNodes nodes) throws NiFiClientException,
IOException {
+ Objects.requireNonNull(processorId, "Processor ID required");
+ Objects.requireNonNull(nodes, "Nodes must be specified");
+
+ final ReplayLastEventRequestEntity requestEntity = new
ReplayLastEventRequestEntity();
+ requestEntity.setComponentId(processorId);
+ requestEntity.setNodes(nodes.name());
+
+ return executeAction("Error replaying last event for Processor " +
processorId, () -> {
+ final WebTarget target =
provenanceEventsTarget.path("/latest/replays");
+ return getRequestBuilder(target).post(
+ Entity.entity(requestEntity, MediaType.APPLICATION_JSON_TYPE),
+ ReplayLastEventResponseEntity.class);
+ });
+ }
}