This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c453d2b NIFI-9723: When we add controller-level Controller Services
on restart of NiFi, ensure that all Controller Services are updated to include
their property values, etc. Also ensure that for these services and reporting
tasks we decrypt the property values.
c453d2b is described below
commit c453d2ba8f1748dd9d26b218705c3372c081110b
Author: Mark Payne <[email protected]>
AuthorDate: Thu Feb 24 11:05:25 2022 -0500
NIFI-9723: When we add controller-level Controller Services on restart of
NiFi, ensure that all Controller Services are updated to include their property
values, etc. Also ensure that for these services and reporting tasks we decrypt
the property values.
Signed-off-by: Joe Gresock <[email protected]>
This closes #5797.
---
.../serialization/VersionedFlowSynchronizer.java | 39 +++++++++--
.../nifi/cs/tests/system/StandardCountService.java | 21 +++++-
.../nifi/reporting/WriteToFileReportingTask.java | 53 +++++++++++++--
.../apache/nifi/tests/system/NiFiClientUtil.java | 67 +++++++++++++++---
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 4 +-
.../SpawnedStandaloneNiFiInstanceFactory.java | 6 +-
.../system/reportingtask/ReportingTaskIT.java | 79 ++++++++++++++++++++++
7 files changed, 242 insertions(+), 27 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 80a8e9e..cd2fef0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -47,6 +47,7 @@ import
org.apache.nifi.controller.inheritance.FlowInheritability;
import org.apache.nifi.controller.inheritance.FlowInheritabilityCheck;
import
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.encrypt.EncryptionException;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ScheduledState;
@@ -484,7 +485,9 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
taskNode.setSchedulingStrategy(SchedulingStrategy.valueOf(reportingTask.getSchedulingStrategy()));
taskNode.setAnnotationData(reportingTask.getAnnotationData());
- taskNode.setProperties(reportingTask.getProperties());
+
+ final Map<String, String> decryptedProperties =
decryptProperties(reportingTask.getProperties());
+ taskNode.setProperties(decryptedProperties);
// enable/disable/start according to the ScheduledState
switch (reportingTask.getScheduledState()) {
@@ -652,16 +655,18 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
// Service B's references won't be updated. To avoid this, we create
them all first, and then configure/update
// them so that when AbstractComponentNode#setProperty is called, it
properly establishes that reference.
final List<VersionedControllerService> controllerServices =
dataflow.getControllerServices();
+ final Set<ControllerServiceNode> controllerServicesAdded = new
HashSet<>();
for (final VersionedControllerService versionedControllerService :
controllerServices) {
final ControllerServiceNode serviceNode =
flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier());
if (serviceNode == null) {
- addRootControllerService(controller,
versionedControllerService);
+ final ControllerServiceNode added =
addRootControllerService(controller, versionedControllerService);
+ controllerServicesAdded.add(added);
}
}
for (final VersionedControllerService versionedControllerService :
controllerServices) {
final ControllerServiceNode serviceNode =
flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier());
- if
(affectedComponentSet.isControllerServiceAffected(serviceNode.getIdentifier()))
{
+ if (controllerServicesAdded.contains(serviceNode) ||
affectedComponentSet.isControllerServiceAffected(serviceNode.getIdentifier())) {
updateRootControllerService(serviceNode,
versionedControllerService);
}
}
@@ -687,12 +692,13 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
}
- private void addRootControllerService(final FlowController controller,
final VersionedControllerService versionedControllerService) {
+ private ControllerServiceNode addRootControllerService(final
FlowController controller, final VersionedControllerService
versionedControllerService) {
final BundleCoordinate bundleCoordinate =
createBundleCoordinate(versionedControllerService.getBundle(),
versionedControllerService.getType());
final ControllerServiceNode serviceNode =
controller.getFlowManager().createControllerService(versionedControllerService.getType(),
versionedControllerService.getInstanceIdentifier(),
bundleCoordinate,Collections.emptySet(), true, true, null);
controller.getFlowManager().addRootControllerService(serviceNode);
+ return serviceNode;
}
private void updateRootControllerService(final ControllerServiceNode
serviceNode, final VersionedControllerService versionedControllerService) {
@@ -701,12 +707,35 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
serviceNode.setName(versionedControllerService.getName());
serviceNode.setAnnotationData(versionedControllerService.getAnnotationData());
serviceNode.setComments(versionedControllerService.getComments());
-
serviceNode.setProperties(versionedControllerService.getProperties());
+
+ final Map<String, String> decryptedProperties =
decryptProperties(versionedControllerService.getProperties());
+ serviceNode.setProperties(decryptedProperties);
} finally {
serviceNode.resumeValidationTrigger();
}
}
+ private Map<String, String> decryptProperties(final Map<String, String>
encrypted) {
+ final Map<String, String> decrypted = new HashMap<>(encrypted.size());
+ encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value)));
+ return decrypted;
+ }
+
+ private String decrypt(final String value) {
+ if (value != null && value.startsWith(FlowSerializer.ENC_PREFIX) &&
value.endsWith(FlowSerializer.ENC_SUFFIX)) {
+ try {
+ return
encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(),
value.length() - FlowSerializer.ENC_SUFFIX.length()));
+ } catch (EncryptionException e) {
+ final String moreDescriptiveMessage = "There was a problem
decrypting a sensitive flow configuration value. " +
+ "Check that the nifi.sensitive.props.key value in
nifi.properties matches the value used to encrypt the flow.json.gz file";
+ logger.error(moreDescriptiveMessage, e);
+ throw new EncryptionException(moreDescriptiveMessage, e);
+ }
+ } else {
+ return value;
+ }
+ }
+
private BundleCoordinate createBundleCoordinate(final Bundle bundle, final
String componentType) {
BundleCoordinate coordinate;
try {
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
index aa84df0..1a8c855 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/StandardCountService.java
@@ -17,10 +17,13 @@
package org.apache.nifi.cs.tests.system;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,12 +35,26 @@ public class StandardCountService extends
AbstractControllerService implements C
.required(false)
.identifiesControllerService(CountService.class)
.build();
+ static final PropertyDescriptor START_VALUE = new
PropertyDescriptor.Builder()
+ .name("Start Value")
+ .displayName("Start Value")
+ .description("The value to start counting from")
+ .required(true)
+ .addValidator(StandardValidators.LONG_VALIDATOR)
+ .defaultValue("0")
+ .build();
private final AtomicLong counter = new AtomicLong(0L);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Collections.singletonList(COUNT_SERVICE);
+ return Arrays.asList(COUNT_SERVICE, START_VALUE);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final long startValue =
Long.parseLong(context.getProperty(START_VALUE).getValue());
+ counter.set(startValue);
}
@Override
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/reporting/WriteToFileReportingTask.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/reporting/WriteToFileReportingTask.java
index b1424dd..7b5d332 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/reporting/WriteToFileReportingTask.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/reporting/WriteToFileReportingTask.java
@@ -19,18 +19,23 @@ package org.apache.nifi.reporting;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.cs.tests.system.CountService;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import static org.apache.nifi.components.Validator.VALID;
import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
-import static
org.apache.nifi.processor.util.StandardValidators.FILE_EXISTS_VALIDATOR;
public class WriteToFileReportingTask extends AbstractReportingTask {
@@ -39,27 +44,65 @@ public class WriteToFileReportingTask extends
AbstractReportingTask {
.displayName("Filename")
.description("The File to write to")
.required(true)
- .addValidator(FILE_EXISTS_VALIDATOR)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TEXT = new Builder()
.name("Text")
.displayName("Text")
.description("The Text to Write")
- .required(true)
+ .required(false)
.addValidator(VALID)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
+ static final PropertyDescriptor COUNT_SERVICE = new Builder()
+ .name("Count Service")
+ .displayName("Count Service")
+ .description("The Count Service to Use")
+ .required(false)
+ .identifiesControllerService(CountService.class)
+ .build();
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ if (validationContext.getProperty(COUNT_SERVICE).isSet() &&
validationContext.getProperty(TEXT).isSet()) {
+ return Collections.singleton(new ValidationResult.Builder()
+ .subject("Count Service and Text")
+ .valid(false)
+ .explanation("Cannot set both the Text property and the Count
Service property")
+ .build());
+ }
+ if (!validationContext.getProperty(COUNT_SERVICE).isSet() &&
!validationContext.getProperty(TEXT).isSet()) {
+ return Collections.singleton(new ValidationResult.Builder()
+ .subject("Count Service and Text")
+ .valid(false)
+ .explanation("Either the Text property or the Count Service
property must be set")
+ .build());
+ }
+
+ return Collections.emptyList();
+ }
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Arrays.asList(FILENAME, TEXT);
+ return Arrays.asList(FILENAME, TEXT, COUNT_SERVICE);
}
@Override
public void onTrigger(final ReportingContext context) {
final File outFile = new
File(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue());
- final String text =
context.getProperty(TEXT).evaluateAttributeExpressions().getValue();
+ final File parentFile = outFile.getParentFile();
+ if (!parentFile.exists() && !parentFile.mkdirs()) {
+ getLogger().error("Could not create directory {}",
parentFile.getAbsolutePath());
+ return;
+ }
+
+ String text =
context.getProperty(TEXT).evaluateAttributeExpressions().getValue();
+ if (text == null) {
+ final CountService countService =
context.getProperty(COUNT_SERVICE).asControllerService(CountService.class);
+ final long count = countService.count();
+ text = String.valueOf(count);
+ }
try (final FileOutputStream fos = new FileOutputStream(outFile)) {
fos.write(text.getBytes(StandardCharsets.UTF_8));
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 610112a..435ccea 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -99,7 +99,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -175,6 +174,10 @@ public class NiFiClientUtil {
return createControllerService(simpleTypeName, "root");
}
+ public ControllerServiceEntity createRootLevelControllerService(final
String simpleTypeName) throws NiFiClientException, IOException {
+ return createControllerService(NiFiSystemIT.TEST_CS_PACKAGE + "." +
simpleTypeName, null, NiFiSystemIT.NIFI_GROUP_ID,
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
+ }
+
public ControllerServiceEntity createControllerService(final String
simpleTypeName, final String groupId) throws NiFiClientException, IOException {
return createControllerService(NiFiSystemIT.TEST_CS_PACKAGE + "." +
simpleTypeName, groupId, NiFiSystemIT.NIFI_GROUP_ID,
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
@@ -242,11 +245,12 @@ public class NiFiClientUtil {
return activateReportingTask(reportingTask, "RUNNING");
}
- public ReportingTaskEntity stopReportingTask(final ReportingTaskEntity
reportingTask) throws NiFiClientException, IOException {
- return activateReportingTask(reportingTask, "STOPPED");
+ public ReportingTaskEntity stopReportingTask(final ReportingTaskEntity
reportingTask) throws NiFiClientException, IOException, InterruptedException {
+ final ReportingTaskEntity entity =
activateReportingTask(reportingTask, "STOPPED");
+ return waitForReportingTaskState(entity.getId(), "STOPPED");
}
- public void stopReportingTasks() throws NiFiClientException, IOException {
+ public void stopReportingTasks() throws NiFiClientException, IOException,
InterruptedException {
final ReportingTasksEntity tasksEntity =
nifiClient.getFlowClient().getReportingTasks();
for (final ReportingTaskEntity taskEntity :
tasksEntity.getReportingTasks()) {
stopReportingTask(taskEntity);
@@ -447,18 +451,18 @@ public class NiFiClientUtil {
}
public void waitForValidProcessor(String id) throws InterruptedException,
IOException, NiFiClientException {
- waitForValidStatus(id, ProcessorDTO.VALID);
+ waitForValidationStatus(id, ProcessorDTO.VALID);
}
public void waitForInvalidProcessor(String id) throws NiFiClientException,
IOException, InterruptedException {
- waitForValidStatus(id, ProcessorDTO.INVALID);
+ waitForValidationStatus(id, ProcessorDTO.INVALID);
}
- public void waitForValidStatus(final String processorId, final String
expectedStatus) throws NiFiClientException, IOException, InterruptedException {
+ public void waitForValidationStatus(final String processorId, final String
expectedStatus) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final ProcessorEntity entity =
getProcessorClient().getProcessor(processorId);
final String validationStatus =
entity.getComponent().getValidationStatus();
- if (expectedStatus.equals(validationStatus)) {
+ if (expectedStatus.equalsIgnoreCase(validationStatus)) {
return;
}
@@ -503,6 +507,51 @@ public class NiFiClientUtil {
}
}
+ public ReportingTaskEntity waitForReportingTaskState(final String
reportingTaskId, final String expectedState) throws NiFiClientException,
IOException, InterruptedException {
+ while (true) {
+ final ReportingTaskEntity entity =
nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId);
+ final String state = entity.getComponent().getState();
+
+ // We've reached the desired state if the state equal the expected
state, OR if we expect stopped and the state is disabled (because disabled
implies stopped)
+ final boolean desiredStateReached = expectedState.equals(state) ||
("STOPPED".equalsIgnoreCase(expectedState) &&
"DISABLED".equalsIgnoreCase(state));
+
+ if (!desiredStateReached) {
+ Thread.sleep(10L);
+ continue;
+ }
+
+ if ("RUNNING".equals(expectedState)) {
+ return entity;
+ }
+
+ if (entity.getStatus().getActiveThreadCount() == 0) {
+ return entity;
+ }
+
+ Thread.sleep(10L);
+ }
+ }
+
+ public void waitForReportingTaskValid(final String reportingTaskId) throws
NiFiClientException, IOException, InterruptedException {
+ waitForReportingTaskValidationStatus(reportingTaskId, "Valid");
+ }
+
+ public void waitForReportingTaskValidationStatus(final String
reportingTaskId, final String expectedStatus) throws NiFiClientException,
IOException, InterruptedException {
+ while (true) {
+ final ReportingTaskEntity entity =
nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId);
+ final String validationStatus =
entity.getComponent().getValidationStatus();
+ if (expectedStatus.equalsIgnoreCase(validationStatus)) {
+ return;
+ }
+
+ if ("Invalid".equalsIgnoreCase(validationStatus)) {
+ logger.info("Reporting Task with ID {} is currently invalid
due to: {}", reportingTaskId, entity.getComponent().getValidationErrors());
+ }
+
+ Thread.sleep(100L);
+ }
+ }
+
public ControllerServiceEntity updateControllerService(final
ControllerServiceEntity currentEntity, final Map<String, String> properties)
throws NiFiClientException, IOException {
final ControllerServiceDTO dto = new ControllerServiceDTO();
dto.setProperties(properties);
@@ -653,8 +702,6 @@ public class NiFiClientUtil {
public void disableControllerLevelServices() throws NiFiClientException,
IOException {
final ControllerServicesEntity services =
nifiClient.getFlowClient().getControllerServices();
- final Map<String, RevisionDTO> revisions = new HashMap<>();
-
for (final ControllerServiceEntity service :
services.getControllerServices()) {
final ControllerServiceRunStatusEntity runStatusEntity = new
ControllerServiceRunStatusEntity();
runStatusEntity.setDisconnectedNodeAcknowledged(true);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 3e6248c..64d00b6 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -165,11 +165,11 @@ public abstract class NiFiSystemIT {
return false;
}
- protected void destroyFlow() throws NiFiClientException, IOException {
+ protected void destroyFlow() throws NiFiClientException, IOException,
InterruptedException {
getClientUtil().stopProcessGroupComponents("root");
getClientUtil().disableControllerServices("root", true);
- getClientUtil().disableControllerLevelServices();
getClientUtil().stopReportingTasks();
+ getClientUtil().disableControllerLevelServices();
getClientUtil().stopTransmitting("root");
getClientUtil().deleteAll("root");
getClientUtil().deleteControllerLevelServices();
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
index f4d0df4..6612667 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
@@ -197,7 +197,7 @@ public class SpawnedStandaloneNiFiInstanceFactory
implements NiFiInstanceFactory
while (true) {
try {
client.getFlowClient().getRootGroupId();
- logger.info("Startup Completed NiFi [{}]",
instanceDirectory.getName());
+ logger.info("NiFi Startup Completed [{}]",
instanceDirectory.getName());
return;
} catch (final Exception e) {
try {
@@ -216,11 +216,11 @@ public class SpawnedStandaloneNiFiInstanceFactory
implements NiFiInstanceFactory
return;
}
- logger.info("Shutdown Started NiFi [{}]",
instanceDirectory.getName());
+ logger.info("NiFi Shutdown Started [{}]",
instanceDirectory.getName());
try {
runNiFi.stop();
- logger.info("Shutdown Completed NiFi [{}]",
instanceDirectory.getName());
+ logger.info("NiFi Shutdown Completed [{}]",
instanceDirectory.getName());
} catch (IOException e) {
throw new RuntimeException("Failed to stop NiFi", e);
} finally {
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/reportingtask/ReportingTaskIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/reportingtask/ReportingTaskIT.java
new file mode 100644
index 0000000..5b1c75e
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/reportingtask/ReportingTaskIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.reportingtask;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ReportingTaskIT extends NiFiSystemIT {
+
+ @Test
+ public void testReportingTaskDependingOnControllerService() throws
NiFiClientException, IOException, InterruptedException {
+ // Create a Count controller service and configure a property on the
service.
+ final ControllerServiceEntity controllerService =
getClientUtil().createRootLevelControllerService("StandardCountService");
+ final Map<String, String> controllerServiceProperties = new
HashMap<>();
+ controllerServiceProperties.put("Start Value", "1000000");
+ getClientUtil().updateControllerServiceProperties(controllerService,
controllerServiceProperties);
+ getClientUtil().enableControllerService(controllerService);
+
+ // Create a Reporting Task that references the Controller Service
+ ReportingTaskEntity reportingTask =
getClientUtil().createReportingTask("WriteToFileReportingTask");
+ final Map<String, String> reportingTaskProperties = new HashMap<>();
+ reportingTaskProperties.put("Count Service",
controllerService.getId());
+ final File reportingTaskFile = new
File("target/reporting-task-count.txt");
+ Files.deleteIfExists(reportingTaskFile.toPath());
+ reportingTaskProperties.put("Filename",
reportingTaskFile.getAbsolutePath());
+ reportingTask =
getClientUtil().updateReportingTaskProperties(reportingTask,
reportingTaskProperties);
+
+ // Schedule reporting task to run every 10 milliseconds.
+ reportingTask.getComponent().setSchedulingPeriod("10 millis");
+
getNifiClient().getReportingTasksClient().updateReportingTask(reportingTask);
+ getClientUtil().waitForReportingTaskValid(reportingTask.getId());
+
+ // Start reporting Task
+ getClientUtil().startReportingTask(reportingTask);
+
+ // Wait for the reporting task to run at least 5 times.
+ waitFor(() -> getCount(reportingTaskFile) >= 1_000_005);
+
+ // Stop the instance, delete the file, and restart.
+ getNiFiInstance().stop();
+ waitFor(() -> reportingTaskFile.delete() ||
!reportingTaskFile.exists());
+ getNiFiInstance().start(true);
+
+ // Upon restart, we should start counting again at 1,000,000.
+ // Wait until the reporting task runs enough times to create the file
again and write a value of at least 1,000,005.
+ waitFor(() -> getCount(reportingTaskFile) >= 1_000_005);
+ }
+
+ private long getCount(final File file) throws IOException {
+ final byte[] bytes = Files.readAllBytes(file.toPath());
+ final String contents = new String(bytes, StandardCharsets.UTF_8);
+ return Long.parseLong(contents);
+ }
+}