This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6c894f3ae1 NIFI-14130 Replaced more anonymous classes with lambdas
(#9609)
6c894f3ae1 is described below
commit 6c894f3ae143a7550e37ca9a41fea582aa347150
Author: dan-s1 <[email protected]>
AuthorDate: Tue Jan 7 16:32:24 2025 -0500
NIFI-14130 Replaced more anonymous classes with lambdas (#9609)
Signed-off-by: David Handermann <[email protected]>
---
.../aws/cloudwatch/PutCloudWatchMetric.java | 46 +++++++---------
.../aws/ml/textract/GetAwsTextractJobStatus.java | 18 +++----
.../org/apache/nifi/processors/aws/s3/ListS3.java | 47 +++++++----------
.../schemaregistry/ConfluentSchemaRegistry.java | 19 +++----
.../org/apache/nifi/processors/hadoop/GetHDFS.java | 37 ++++++-------
.../apache/nifi/processors/hadoop/MoveHDFS.java | 33 +++++-------
.../jms/cf/JndiJmsConnectionFactoryHandler.java | 20 +++----
.../jms/processors/JMSPublisherConsumerIT.java | 7 +--
.../apache/nifi/processors/standard/PutFile.java | 28 +++++-----
.../provenance/store/WriteAheadStorePartition.java | 61 ++++++++++------------
.../clustered/partition/RemoteQueuePartition.java | 15 +++---
.../repository/WriteAheadFlowFileRepository.java | 35 +++++--------
.../scheduling/StandardProcessScheduler.java | 47 ++++++++---------
.../clustered/TestSwappablePriorityQueue.java | 45 ++++++----------
.../TestWriteAheadFlowFileRepository.java | 41 +++++++--------
.../java/org/apache/nifi/nar/NarClassLoader.java | 9 ++--
.../apache/nifi/web/api/DataTransferResource.java | 32 +++++-------
17 files changed, 228 insertions(+), 312 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
index 3f99b7a513..90b6aa5b8c 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
@@ -74,38 +74,32 @@ public class PutCloudWatchMetric extends
AbstractAwsSyncProcessor<CloudWatchClie
public static final Set<String> units =
Arrays.stream(StandardUnit.values())
.map(StandardUnit::toString).collect(Collectors.toSet());
- private static final Validator UNIT_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
- return (new
ValidationResult.Builder()).subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
- } else {
- String reason = null;
-
- if (!units.contains(input)) {
- reason = "not a valid Unit";
- }
- return (new
ValidationResult.Builder()).subject(subject).input(input).explanation(reason).valid(reason
== null).build();
+ private static final Validator UNIT_VALIDATOR = (subject, input, context)
-> {
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
+ return (new
ValidationResult.Builder()).subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
+ } else {
+ String reason = null;
+
+ if (!units.contains(input)) {
+ reason = "not a valid Unit";
}
+ return (new
ValidationResult.Builder()).subject(subject).input(input).explanation(reason).valid(reason
== null).build();
}
};
- private static final Validator DOUBLE_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
- return (new
ValidationResult.Builder()).subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
- } else {
- String reason = null;
+ private static final Validator DOUBLE_VALIDATOR = (subject, input,
context) -> {
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(input)) {
+ return (new
ValidationResult.Builder()).subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
+ } else {
+ String reason = null;
- try {
- Double.parseDouble(input);
- } catch (NumberFormatException e) {
- reason = "not a valid Double";
- }
-
- return (new
ValidationResult.Builder()).subject(subject).input(input).explanation(reason).valid(reason
== null).build();
+ try {
+ Double.parseDouble(input);
+ } catch (NumberFormatException e) {
+ reason = "not a valid Double";
}
+
+ return (new
ValidationResult.Builder()).subject(subject).input(input).explanation(reason).valid(reason
== null).build();
}
};
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
index c7d8939f82..b810c36af9 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/GetAwsTextractJobStatus.java
@@ -21,7 +21,6 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -50,16 +49,13 @@ import static
org.apache.nifi.processors.aws.ml.textract.StartAwsTextractJob.TEX
@SeeAlso({StartAwsTextractJob.class})
public class GetAwsTextractJobStatus extends
AbstractAwsMachineLearningJobStatusProcessor<TextractClient,
TextractClientBuilder> {
- public static final Validator TEXTRACT_TYPE_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(final String subject, final String
value, final ValidationContext context) {
- if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(value)) {
- return new
ValidationResult.Builder().subject(subject).input(value).explanation("Expression
Language Present").valid(true).build();
- } else if (TextractType.TEXTRACT_TYPES.contains(value)) {
- return new
ValidationResult.Builder().subject(subject).input(value).explanation("Supported
Value.").valid(true).build();
- } else {
- return new
ValidationResult.Builder().subject(subject).input(value).explanation("Not a
supported value, flow file attribute or context
parameter.").valid(false).build();
- }
+ public static final Validator TEXTRACT_TYPE_VALIDATOR = (subject, value,
context) -> {
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(value)) {
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation("Expression
Language Present").valid(true).build();
+ } else if (TextractType.TEXTRACT_TYPES.contains(value)) {
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation("Supported
Value.").valid(true).build();
+ } else {
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation("Not a
supported value, flow file attribute or context
parameter.").valid(false).build();
}
};
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 0b6d392b4c..ea744c34b6 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -52,7 +52,6 @@ import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
-import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
@@ -394,35 +393,29 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
}
private static Validator createRequesterPaysValidator() {
- return new Validator() {
- @Override
- public ValidationResult validate(final String subject, final
String input, final ValidationContext context) {
- boolean requesterPays = Boolean.parseBoolean(input);
- boolean useVersions =
context.getProperty(USE_VERSIONS).asBoolean();
- boolean valid = !requesterPays || !useVersions;
- return new ValidationResult.Builder()
- .input(input)
- .subject(subject)
- .valid(valid)
- .explanation(valid ? null : "'Requester Pays' cannot
be used when listing object versions.")
- .build();
- }
+ return (subject, input, context) -> {
+ boolean requesterPays = Boolean.parseBoolean(input);
+ boolean useVersions =
context.getProperty(USE_VERSIONS).asBoolean();
+ boolean valid = !requesterPays || !useVersions;
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(valid)
+ .explanation(valid ? null : "'Requester Pays' cannot be
used when listing object versions.")
+ .build();
};
}
private static Validator createMaxAgeValidator() {
- return new Validator() {
- @Override
- public ValidationResult validate(final String subject, final
String input, final ValidationContext context) {
- Double maxAge = input != null ?
FormatUtils.getPreciseTimeDuration(input, TimeUnit.MILLISECONDS) : null;
- long minAge =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- boolean valid = input != null && maxAge > minAge;
- return new ValidationResult.Builder()
- .input(input)
- .subject(subject)
- .valid(valid)
- .explanation(valid ? null : "'Maximum Age' must be
greater than 'Minimum Age' ")
- .build();
- }
+ return (subject, input, context) -> {
+ Double maxAge = input != null ?
FormatUtils.getPreciseTimeDuration(input, TimeUnit.MILLISECONDS) : null;
+ long minAge =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ boolean valid = input != null && maxAge > minAge;
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(valid)
+ .explanation(valid ? null : "'Maximum Age' must be greater
than 'Minimum Age' ")
+ .build();
};
}
diff --git
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
index 7277395fb8..b0d2578ae3 100644
---
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
+++
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
@@ -165,18 +165,13 @@ public class ConfluentSchemaRegistry extends
AbstractControllerService implement
return properties;
}
- private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(final String subject, final String
value, final ValidationContext context) {
- return new ValidationResult.Builder()
- .subject(subject)
- .input(value)
- .valid(subject.startsWith(REQUEST_HEADER_PREFIX)
- && subject.length() >
REQUEST_HEADER_PREFIX.length())
- .explanation("Dynamic property names must be of format
'request.header.*'")
- .build();
- }
- };
+ private static final Validator REQUEST_HEADER_VALIDATOR = (subject, value,
context) -> new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(subject.startsWith(REQUEST_HEADER_PREFIX)
+ && subject.length() > REQUEST_HEADER_PREFIX.length())
+ .explanation("Dynamic property names must be of format
'request.header.*'")
+ .build();
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String
propertyDescriptionName) {
diff --git
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index a87398d83e..cac5b60138 100644
---
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -532,32 +532,27 @@ public class GetHDFS extends AbstractHadoopProcessor {
}
protected PathFilter getPathFilter(final Path dir) {
- return new PathFilter() {
-
- @Override
- public boolean accept(Path path) {
- if (ignoreDottedFiles && path.getName().startsWith(".")) {
- return false;
- }
- final String pathToCompare;
- if (filterMatchBasenameOnly) {
+ return path -> {
+ if (ignoreDottedFiles && path.getName().startsWith(".")) {
+ return false;
+ }
+ final String pathToCompare;
+ if (filterMatchBasenameOnly) {
+ pathToCompare = path.getName();
+ } else {
+ // figure out portion of path that does not include the
provided root dir.
+ String relativePath = getPathDifference(dir, path);
+ if (relativePath.length() == 0) {
pathToCompare = path.getName();
} else {
- // figure out portion of path that does not include
the provided root dir.
- String relativePath = getPathDifference(dir, path);
- if (relativePath.length() == 0) {
- pathToCompare = path.getName();
- } else {
- pathToCompare = relativePath + Path.SEPARATOR +
path.getName();
- }
- }
-
- if (fileFilterPattern != null &&
!fileFilterPattern.matcher(pathToCompare).matches()) {
- return false;
+ pathToCompare = relativePath + Path.SEPARATOR +
path.getName();
}
- return true;
}
+ if (fileFilterPattern != null &&
!fileFilterPattern.matcher(pathToCompare).matches()) {
+ return false;
+ }
+ return true;
};
}
}
diff --git
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 578d0cc586..df30d777cd 100644
---
a/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++
b/nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -548,27 +548,22 @@ public class MoveHDFS extends AbstractHadoopProcessor {
}
protected PathFilter getPathFilter(final Path dir) {
- return new PathFilter() {
-
- @Override
- public boolean accept(Path path) {
- if (ignoreDottedFiles && path.getName().startsWith(".")) {
- return false;
- }
- final String pathToCompare;
- String relativePath = getPathDifference(dir, path);
- if (relativePath.length() == 0) {
- pathToCompare = path.getName();
- } else {
- pathToCompare = relativePath + Path.SEPARATOR +
path.getName();
- }
-
- if (fileFilterPattern != null &&
!fileFilterPattern.matcher(pathToCompare).matches()) {
- return false;
- }
- return true;
+ return path -> {
+ if (ignoreDottedFiles && path.getName().startsWith(".")) {
+ return false;
+ }
+ final String pathToCompare;
+ String relativePath = getPathDifference(dir, path);
+ if (relativePath.length() == 0) {
+ pathToCompare = path.getName();
+ } else {
+ pathToCompare = relativePath + Path.SEPARATOR +
path.getName();
}
+ if (fileFilterPattern != null &&
!fileFilterPattern.matcher(pathToCompare).matches()) {
+ return false;
+ }
+ return true;
};
}
}
diff --git
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
index e00d3059e1..b4ba56595d 100644
---
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
+++
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
@@ -28,7 +28,6 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Hashtable;
import java.util.Set;
@@ -121,17 +120,14 @@ public class JndiJmsConnectionFactoryHandler extends
CachedJMSConnectionFactoryH
}
private static Object instrumentWithClassLoader(final Object obj, final
ClassLoader classLoader, final Class<?>... interfaces) {
- final InvocationHandler invocationHandler = new InvocationHandler() {
- @Override
- public Object invoke(final Object proxy, final Method method,
final Object[] args) throws Throwable {
- final Thread thread = Thread.currentThread();
- final ClassLoader currentClassLoader =
thread.getContextClassLoader();
- try {
- thread.setContextClassLoader(classLoader);
- return method.invoke(obj, args);
- } finally {
- thread.setContextClassLoader(currentClassLoader);
- }
+ final InvocationHandler invocationHandler = (proxy, method, args) -> {
+ final Thread thread = Thread.currentThread();
+ final ClassLoader currentClassLoader =
thread.getContextClassLoader();
+ try {
+ thread.setContextClassLoader(classLoader);
+ return method.invoke(obj, args);
+ } finally {
+ thread.setContextClassLoader(currentClassLoader);
}
};
diff --git
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index 650eba37ab..c42a739f6e 100644
---
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -317,12 +317,7 @@ public class JMSPublisherConsumerIT {
JmsTemplate jmsTemplate =
CommonTest.buildJmsTemplateForDestination(false);
try {
- jmsTemplate.send(destinationName, new MessageCreator() {
- @Override
- public Message createMessage(Session session) throws
JMSException {
- return session.createObjectMessage();
- }
- });
+ jmsTemplate.send(destinationName, Session::createObjectMessage);
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory)
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consumeMessageSet(destinationName, null, false, false,
null, null, "UTF-8", 1, responses -> {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
index 99d47e5f1f..906d88956e 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
@@ -27,7 +27,6 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -83,23 +82,20 @@ public class PutFile extends AbstractProcessor {
public static final Pattern RWX_PATTERN =
Pattern.compile("^([r-][w-])([x-])([r-][w-])([x-])([r-][w-])([x-])$");
public static final Pattern NUM_PATTERN = Pattern.compile("^[0-7]{3}$");
- private static final Validator PERMISSIONS_VALIDATOR = new Validator() {
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- ValidationResult.Builder vr = new ValidationResult.Builder();
- if (context.isExpressionLanguagePresent(input)) {
- return new
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
- }
+ private static final Validator PERMISSIONS_VALIDATOR = (subject, input,
context) -> {
+ ValidationResult.Builder vr = new ValidationResult.Builder();
+ if (context.isExpressionLanguagePresent(input)) {
+ return new
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
+ }
- if (RWX_PATTERN.matcher(input).matches() ||
NUM_PATTERN.matcher(input).matches()) {
- return vr.valid(true).build();
- }
- return vr.valid(false)
- .subject(subject)
- .input(input)
- .explanation("This must be expressed in rwxr-x--- form or
octal triplet form.")
- .build();
+ if (RWX_PATTERN.matcher(input).matches() ||
NUM_PATTERN.matcher(input).matches()) {
+ return vr.valid(true).build();
}
+ return vr.valid(false)
+ .subject(subject)
+ .input(input)
+ .explanation("This must be expressed in rwxr-x--- form or
octal triplet form.")
+ .build();
};
public static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
b/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
index 4c5795a5d7..255a6991bf 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -623,46 +623,43 @@ public class WriteAheadStorePartition implements
EventStorePartition {
skipToEvent = false;
}
- final Runnable reindexTask = new Runnable() {
- @Override
- public void run() {
- final Map<ProvenanceEventRecord, StorageSummary>
storageMap = new HashMap<>(1000);
-
- try (final RecordReader recordReader =
recordReaderFactory.newRecordReader(eventFile, Collections.emptyList(),
Integer.MAX_VALUE)) {
- if (skipToEvent) {
- final Optional<ProvenanceEventRecord> eventOption
= recordReader.skipToEvent(minEventIdToReindex);
- if (!eventOption.isPresent()) {
- return;
- }
+ final Runnable reindexTask = () -> {
+ final Map<ProvenanceEventRecord, StorageSummary> storageMap =
new HashMap<>(1000);
+
+ try (final RecordReader recordReader =
recordReaderFactory.newRecordReader(eventFile, Collections.emptyList(),
Integer.MAX_VALUE)) {
+ if (skipToEvent) {
+ final Optional<ProvenanceEventRecord> eventOption =
recordReader.skipToEvent(minEventIdToReindex);
+ if (!eventOption.isPresent()) {
+ return;
}
+ }
- StandardProvenanceEventRecord event;
- while (true) {
- final long startBytesConsumed =
recordReader.getBytesConsumed();
-
- event = recordReader.nextRecord();
- if (event == null) {
+ StandardProvenanceEventRecord event;
+ while (true) {
+ final long startBytesConsumed =
recordReader.getBytesConsumed();
+
+ event = recordReader.nextRecord();
+ if (event == null) {
+ eventIndex.reindexEvents(storageMap);
+ reindexedCount.addAndGet(storageMap.size());
+ storageMap.clear();
+ break; // stop reading from this file
+ } else {
+ final long eventSize =
recordReader.getBytesConsumed() - startBytesConsumed;
+ storageMap.put(event, new
StorageSummary(event.getEventId(), eventFile.getName(), partitionName,
recordReader.getBlockIndex(), eventSize, 0L));
+
+ if (storageMap.size() == 1000) {
eventIndex.reindexEvents(storageMap);
reindexedCount.addAndGet(storageMap.size());
storageMap.clear();
- break; // stop reading from this file
- } else {
- final long eventSize =
recordReader.getBytesConsumed() - startBytesConsumed;
- storageMap.put(event, new
StorageSummary(event.getEventId(), eventFile.getName(), partitionName,
recordReader.getBlockIndex(), eventSize, 0L));
-
- if (storageMap.size() == 1000) {
- eventIndex.reindexEvents(storageMap);
-
reindexedCount.addAndGet(storageMap.size());
- storageMap.clear();
- }
}
}
- } catch (final EOFException | FileNotFoundException eof) {
- // Ran out of data. Continue on.
- logger.warn("Failed to find event with ID {} in Event
File {}", minEventIdToReindex, eventFile, eof);
- } catch (final Exception e) {
- logger.error("Failed to index Provenance Events found
in {}", eventFile, e);
}
+ } catch (final EOFException | FileNotFoundException eof) {
+ // Ran out of data. Continue on.
+ logger.warn("Failed to find event with ID {} in Event File
{}", minEventIdToReindex, eventFile, eof);
+ } catch (final Exception e) {
+ logger.error("Failed to index Provenance Events found in
{}", eventFile, e);
}
};
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 385e4d1d52..60f74374ab 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -216,15 +216,12 @@ public class RemoteQueuePartition implements
QueuePartition {
}
};
- final TransactionCompleteCallback successCallback = new
TransactionCompleteCallback() {
- @Override
- public void onTransactionComplete(final List<FlowFileRecord>
flowFilesSent, final NodeIdentifier nodeIdentifier) {
- // We've now completed the transaction. We must now update the
repositories and "keep the books", acknowledging the FlowFiles
- // with the queue so that its size remains accurate.
- priorityQueue.acknowledge(flowFilesSent);
- flowFileQueue.onTransfer(flowFilesSent);
- updateRepositories(flowFilesSent, Collections.emptyList(),
nodeIdentifier);
- }
+ final TransactionCompleteCallback successCallback = (flowFilesSent,
nodeIdentifier) -> {
+ // We've now completed the transaction. We must now update the
repositories and "keep the books", acknowledging the FlowFiles
+ // with the queue so that its size remains accurate.
+ priorityQueue.acknowledge(flowFilesSent);
+ flowFileQueue.onTransfer(flowFilesSent);
+ updateRepositories(flowFilesSent, Collections.emptyList(),
nodeIdentifier);
};
final BooleanSupplier emptySupplier = priorityQueue::isEmpty;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 317a146675..59f58baa99 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -52,7 +52,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -172,13 +171,10 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
checkpointDelayMillis =
FormatUtils.getTimeDuration(nifiProperties.getFlowFileRepositoryCheckpointInterval(),
TimeUnit.MILLISECONDS);
- checkpointExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactory() {
- @Override
- public Thread newThread(final Runnable r) {
- final Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setName("Checkpoint FlowFile Repository");
- return t;
- }
+ checkpointExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("Checkpoint FlowFile Repository");
+ return t;
});
}
@@ -830,19 +826,16 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
logger.info("Successfully updated FlowFile Repository with {} Drop
Records due to missing queues in {} milliseconds", dropRecords.size(),
updateMillis);
}
- final Runnable checkpointRunnable = new Runnable() {
- @Override
- public void run() {
- try {
- logger.info("Initiating checkpoint of FlowFile
Repository");
- final long start = System.nanoTime();
- final int numRecordsCheckpointed = checkpoint();
- final long end = System.nanoTime();
- final long millis = TimeUnit.MILLISECONDS.convert(end -
start, TimeUnit.NANOSECONDS);
- logger.info("Successfully checkpointed FlowFile Repository
with {} records in {} milliseconds", numRecordsCheckpointed, millis);
- } catch (final Throwable t) {
- logger.error("Unable to checkpoint FlowFile Repository",
t);
- }
+ final Runnable checkpointRunnable = () -> {
+ try {
+ logger.info("Initiating checkpoint of FlowFile Repository");
+ final long start = System.nanoTime();
+ final int numRecordsCheckpointed = checkpoint();
+ final long end = System.nanoTime();
+ final long millis = TimeUnit.MILLISECONDS.convert(end - start,
TimeUnit.NANOSECONDS);
+ logger.info("Successfully checkpointed FlowFile Repository
with {} records in {} milliseconds", numRecordsCheckpointed, millis);
+ } catch (final Throwable t) {
+ logger.error("Unable to checkpoint FlowFile Repository", t);
}
};
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index a1323deece..4104176f7f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -332,37 +332,34 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
final CompletableFuture<Void> future = new CompletableFuture<>();
- final Runnable unscheduleReportingTaskRunnable = new Runnable() {
- @Override
- public void run() {
- final ConfigurationContext configurationContext =
taskNode.getConfigurationContext();
+ final Runnable unscheduleReportingTaskRunnable = () -> {
+ final ConfigurationContext configurationContext =
taskNode.getConfigurationContext();
- synchronized (lifecycleState) {
- lifecycleState.setScheduled(false);
+ synchronized (lifecycleState) {
+ lifecycleState.setScheduled(false);
- try (final NarCloseable x =
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(),
reportingTask.getClass(), reportingTask.getIdentifier())) {
-
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask,
configurationContext);
- } catch (final Exception e) {
- final Throwable cause = e instanceof
InvocationTargetException ? e.getCause() : e;
- final ComponentLog componentLog = new
SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask, new
StandardLoggingContext(null));
- componentLog.error("Failed to invoke @OnUnscheduled
method due to {}", cause);
-
- LOG.error("Failed to invoke the @OnUnscheduled methods
of {} due to {}; administratively yielding this ReportingTask and will attempt
to schedule it again after {}",
- reportingTask, cause.toString(),
administrativeYieldDuration);
- LOG.error("", cause);
- }
+ try (final NarCloseable x =
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(),
reportingTask.getClass(), reportingTask.getIdentifier())) {
+
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask,
configurationContext);
+ } catch (final Exception e) {
+ final Throwable cause = e instanceof
InvocationTargetException ? e.getCause() : e;
+ final ComponentLog componentLog = new
SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask, new
StandardLoggingContext(null));
+ componentLog.error("Failed to invoke @OnUnscheduled method
due to {}", cause);
- agent.unschedule(taskNode, lifecycleState);
+ LOG.error("Failed to invoke the @OnUnscheduled methods of
{} due to {}; administratively yielding this ReportingTask and will attempt to
schedule it again after {}",
+ reportingTask, cause.toString(),
administrativeYieldDuration);
+ LOG.error("", cause);
+ }
- // If active thread count == 1, that indicates that all
execution threads have completed. We use 1 here instead of 0 because
- // when the Reporting Task is unscheduled, we immediately
increment the thread count to 1 as an indicator that we've not completely
finished.
- if (lifecycleState.getActiveThreadCount() == 1 &&
lifecycleState.mustCallOnStoppedMethods()) {
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
reportingTask, configurationContext);
- future.complete(null);
- }
+ agent.unschedule(taskNode, lifecycleState);
- lifecycleState.decrementActiveThreadCount();
+ // If active thread count == 1, that indicates that all
execution threads have completed. We use 1 here instead of 0 because
+ // when the Reporting Task is unscheduled, we immediately
increment the thread count to 1 as an indicator that we've not completely
finished.
+ if (lifecycleState.getActiveThreadCount() == 1 &&
lifecycleState.mustCallOnStoppedMethods()) {
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
reportingTask, configurationContext);
+ future.complete(null);
}
+
+ lifecycleState.decrementActiveThreadCount();
}
};
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index 471e1dcd59..d52dc05bf0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -27,7 +27,6 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.util.StringUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -82,13 +81,10 @@ public class TestSwappablePriorityQueue {
@Test
public void testPrioritizersBigQueue() {
- final FlowFilePrioritizer iAttributePrioritizer = new
FlowFilePrioritizer() {
- @Override
- public int compare(final FlowFile o1, final FlowFile o2) {
- final int i1 = Integer.parseInt(o1.getAttribute("i"));
- final int i2 = Integer.parseInt(o2.getAttribute("i"));
- return Integer.compare(i1, i2);
- }
+ final FlowFilePrioritizer iAttributePrioritizer = (o1, o2) -> {
+ final int i1 = Integer.parseInt(o1.getAttribute("i"));
+ final int i2 = Integer.parseInt(o2.getAttribute("i"));
+ return Integer.compare(i1, i2);
};
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
@@ -127,13 +123,10 @@ public class TestSwappablePriorityQueue {
@Test
public void testOrderingWithCornerCases() {
- final FlowFilePrioritizer iAttributePrioritizer = new
FlowFilePrioritizer() {
- @Override
- public int compare(final FlowFile o1, final FlowFile o2) {
- final int i1 = Integer.parseInt(o1.getAttribute("i"));
- final int i2 = Integer.parseInt(o2.getAttribute("i"));
- return Integer.compare(i1, i2);
- }
+ final FlowFilePrioritizer iAttributePrioritizer = (o1, o2) -> {
+ final int i1 = Integer.parseInt(o1.getAttribute("i"));
+ final int i2 = Integer.parseInt(o2.getAttribute("i"));
+ return Integer.compare(i1, i2);
};
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
@@ -157,13 +150,10 @@ public class TestSwappablePriorityQueue {
@Test
public void testPrioritizerWhenOutOfOrderDataEntersSwapQueue() {
- final FlowFilePrioritizer iAttributePrioritizer = new
FlowFilePrioritizer() {
- @Override
- public int compare(final FlowFile o1, final FlowFile o2) {
- final int i1 = Integer.parseInt(o1.getAttribute("i"));
- final int i2 = Integer.parseInt(o2.getAttribute("i"));
- return Integer.compare(i1, i2);
- }
+ final FlowFilePrioritizer iAttributePrioritizer = (o1, o2) -> {
+ final int i1 = Integer.parseInt(o1.getAttribute("i"));
+ final int i2 = Integer.parseInt(o2.getAttribute("i"));
+ return Integer.compare(i1, i2);
};
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
@@ -195,13 +185,10 @@ public class TestSwappablePriorityQueue {
@Test
public void testPrioritizersDataAddedAfterSwapOccurs() {
- final FlowFilePrioritizer iAttributePrioritizer = new
FlowFilePrioritizer() {
- @Override
- public int compare(final FlowFile o1, final FlowFile o2) {
- final int i1 = Integer.parseInt(o1.getAttribute("i"));
- final int i2 = Integer.parseInt(o2.getAttribute("i"));
- return Integer.compare(i1, i2);
- }
+ final FlowFilePrioritizer iAttributePrioritizer = (o1, o2) -> {
+ final int i1 = Integer.parseInt(o1.getAttribute("i"));
+ final int i2 = Integer.parseInt(o2.getAttribute("i"));
+ return Integer.compare(i1, i2);
};
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 85640c8cdb..94544e284a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -370,29 +370,26 @@ public class TestWriteAheadFlowFileRepository {
final Thread[] threads = new Thread[numThreads];
for (int j = 0; j < 2; j++) {
for (int i = 0; i < numThreads; i++) {
- final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- final List<SerializedRepositoryRecord> records = new
ArrayList<>();
- final int numBatches = updateCountPerThread /
batchSize;
- final MockFlowFile baseFlowFile = new MockFlowFile(0L);
-
- for (int i = 0; i < numBatches; i++) {
- records.clear();
- for (int k = 0; k < batchSize; k++) {
- final MockFlowFileRecord flowFile = new
MockFlowFileRecord(baseFlowFile.getAttributes(), baseFlowFile.getSize());
- final String uuid =
flowFile.getAttribute("uuid");
-
- final StandardRepositoryRecord record = new
StandardRepositoryRecord(null, flowFile);
- record.setDestination(queue);
- final Map<String, String> updatedAttrs =
Collections.singletonMap("uuid", uuid);
- record.setWorking(flowFile, updatedAttrs,
false);
-
- records.add(new
LiveSerializedRepositoryRecord(record));
- }
-
- assertThrows(IOException.class, () ->
repo.update(records, false));
+ final Thread t = new Thread(() -> {
+ final List<SerializedRepositoryRecord> records = new
ArrayList<>();
+ final int numBatches = updateCountPerThread / batchSize;
+ final MockFlowFile baseFlowFile = new MockFlowFile(0L);
+
+ for (int i1 = 0; i1 < numBatches; i1++) {
+ records.clear();
+ for (int k = 0; k < batchSize; k++) {
+ final MockFlowFileRecord flowFile = new
MockFlowFileRecord(baseFlowFile.getAttributes(), baseFlowFile.getSize());
+ final String uuid = flowFile.getAttribute("uuid");
+
+ final StandardRepositoryRecord record = new
StandardRepositoryRecord(null, flowFile);
+ record.setDestination(queue);
+ final Map<String, String> updatedAttrs =
Collections.singletonMap("uuid", uuid);
+ record.setWorking(flowFile, updatedAttrs, false);
+
+ records.add(new
LiveSerializedRepositoryRecord(record));
}
+
+ assertThrows(IOException.class, () ->
repo.update(records, false));
}
});
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
b/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
index f3f9b677f0..2c6cf55d0f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
@@ -126,12 +126,9 @@ public class NarClassLoader extends
AbstractNativeLibHandlingClassLoader {
private static final Logger LOGGER =
LoggerFactory.getLogger(NarClassLoader.class);
- private static final FileFilter JAR_FILTER = new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- final String nameToTest = pathname.getName().toLowerCase();
- return nameToTest.endsWith(".jar") && pathname.isFile();
- }
+ private static final FileFilter JAR_FILTER = pathname -> {
+ final String nameToTest = pathname.getName().toLowerCase();
+ return nameToTest.endsWith(".jar") && pathname.isFile();
};
/**
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index 609e0e6b5c..f8f2fe1fea 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -656,26 +656,22 @@ public class DataTransferResource extends
ApplicationResource {
try {
final HttpFlowFileServerProtocol serverProtocol =
initiateServerProtocol(req, peer, transportProtocolVersion);
- StreamingOutput flowFileContent = new StreamingOutput() {
- @Override
- public void write(OutputStream outputStream) throws
IOException, WebApplicationException {
-
- HttpOutput output = (HttpOutput)
peer.getCommunicationsSession().getOutput();
- output.setOutputStream(outputStream);
-
- try {
- int numOfFlowFiles =
serverProtocol.getPort().transferFlowFiles(peer, serverProtocol);
- logger.debug("finished transferring flow files,
numOfFlowFiles={}", numOfFlowFiles);
- if (numOfFlowFiles < 1) {
- // There was no flow file to transfer. Throw this
exception to stop responding with SEE OTHER.
- throw new
WebApplicationException(Response.Status.OK);
- }
- } catch (NotAuthorizedException | BadRequestException |
RequestExpiredException e) {
- // Handshake is done outside of write() method, so
these exception wouldn't be thrown.
- throw new IOException("Failed to process the
request.", e);
+ StreamingOutput flowFileContent = outputStream -> {
+
+ HttpOutput output = (HttpOutput)
peer.getCommunicationsSession().getOutput();
+ output.setOutputStream(outputStream);
+
+ try {
+ int numOfFlowFiles =
serverProtocol.getPort().transferFlowFiles(peer, serverProtocol);
+ logger.debug("finished transferring flow files,
numOfFlowFiles={}", numOfFlowFiles);
+ if (numOfFlowFiles < 1) {
+ // There was no flow file to transfer. Throw this
exception to stop responding with SEE OTHER.
+ throw new WebApplicationException(Response.Status.OK);
}
+ } catch (NotAuthorizedException | BadRequestException |
RequestExpiredException e) {
+ // Handshake is done outside of write() method, so these
exception wouldn't be thrown.
+ throw new IOException("Failed to process the request.", e);
}
-
};
return responseCreator.acceptedResponse(transactionManager,
flowFileContent, transportProtocolVersion);