This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 960498f76b NIFI-12291: Added additional Use Case Docs
960498f76b is described below
commit 960498f76bf1f9b3dbe1d344d9672c22a89a0c89
Author: Mark Payne <[email protected]>
AuthorDate: Mon Oct 30 14:15:16 2023 -0400
NIFI-12291: Added additional Use Case Docs
This closes #7954
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/slack/ListenSlack.java | 29 ++++++----
.../nifi/processors/standard/MergeRecord.java | 66 ++++++++++++++++++++++
.../nifi/processors/standard/PackageFlowFile.java | 4 +-
.../standard/QueryDatabaseTableRecord.java | 21 ++++++-
.../processors/attributes/UpdateAttribute.java | 38 +++++++++++++
5 files changed, 143 insertions(+), 15 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ListenSlack.java
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ListenSlack.java
index e97f511d98..30864c8e40 100644
---
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ListenSlack.java
+++
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ListenSlack.java
@@ -27,19 +27,9 @@ import
com.slack.api.bolt.context.builtin.SlashCommandContext;
import com.slack.api.bolt.request.builtin.SlashCommandRequest;
import com.slack.api.bolt.response.Response;
import com.slack.api.bolt.socket_mode.SocketModeApp;
+import com.slack.api.model.event.FileSharedEvent;
import com.slack.api.model.event.MessageEvent;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TransferQueue;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
+import com.slack.api.model.event.MessageFileShareEvent;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -61,6 +51,19 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TransferQueue;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
@PrimaryNodeOnly
@DefaultSettings(yieldDuration = "250 millis")
@@ -149,6 +152,8 @@ public class ListenSlack extends AbstractProcessor {
// Register the event types that make sense
if
(context.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_MESSAGE_EVENTS.getValue()))
{
slackApp.event(MessageEvent.class, this::handleEvent);
+ slackApp.event(MessageFileShareEvent.class, this::handleEvent);
+ slackApp.event(FileSharedEvent.class, this::handleEvent);
} else {
slackApp.command(Pattern.compile(".*"), this::handleCommand);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index f9a1fc58e4..de1e910758 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -26,8 +26,11 @@ import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
+import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
@@ -97,6 +100,69 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "<Attributes from Record Writer>",
description = "Any Attribute that the configured Record Writer returns will be
added to the FlowFile.")
})
@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
+@UseCase(
+ description = "Combine together many arbitrary Records in order to create
a single, larger file",
+ configuration = """
+ Configure the "Record Reader" to specify a Record Reader that is
appropriate for the incoming data type.
+ Configure the "Record Writer" to specify a Record Writer that is
appropriate for the desired output data type.
+ Set "Merge Strategy" to `Bin-Packing Algorithm`.
+ Set the "Minimum Bin Size" to desired file size of the merged output
file. For example, a value of `1 MB` will result in not merging data until at
least
+ 1 MB of data is available (unless the Max Bin Age is reached
first). If there is no desired minimum file size, leave the default value of `0
B`.
+ Set the "Minimum Number of Records" property to the minimum number of
Records that should be included in the merged output file. For example, setting
the value
+ to `10000` ensures that the output file will have at least 10,000
Records in it (unless the Max Bin Age is reached first).
+ Set the "Max Bin Age" to specify the maximum amount of time to hold
data before merging. This can be thought of as a "timeout" at which time the
Processor will
+ merge whatever data it is, even if the "Minimum Bin Size" and
"Minimum Number of Records" has not been reached. It is always recommended to
set the value.
+ A reasonable default might be `10 mins` if there is no other
latency requirement.
+
+ Connect the 'merged' Relationship to the next component in the flow.
Auto-terminate the 'original' Relationship.
+ """
+)
+@MultiProcessorUseCase(
+ description = "Combine together many Records that have the same value for
a particular field in the data, in order to create a single, larger file",
+ keywords = {"merge", "combine", "aggregate", "like records", "similar
data"},
+ configurations = {
+ @ProcessorConfiguration(
+ processorClass = PartitionRecord.class,
+ configuration = """
+ Configure the "Record Reader" to specify a Record Reader that
is appropriate for the incoming data type.
+ Configure the "Record Writer" to specify a Record Writer that
is appropriate for the desired output data type.
+
+ Add a single additional property. The name of the property
should describe the field on which the data is being merged together.
+ The property's value should be a RecordPath that specifies
which output FlowFile the Record belongs to.
+
+ For example, to merge together data that has the same value
for the "productSku" field, add a property named `productSku` with a value of
`/productSku`.
+
+ Connect the "success" Relationship to MergeRecord.
+ Auto-terminate the "original" Relationship.
+ """
+ ),
+ @ProcessorConfiguration(
+ processorClass = MergeRecord.class,
+ configuration = """
+ Configure the "Record Reader" to specify a Record Reader that
is appropriate for the incoming data type.
+ Configure the "Record Writer" to specify a Record Writer that
is appropriate for the desired output data type.
+ Set "Merge Strategy" to `Bin-Packing Algorithm`.
+ Set the "Minimum Bin Size" to desired file size of the merged
output file. For example, a value of `1 MB` will result in not merging data
until at least
+ 1 MB of data is available (unless the Max Bin Age is
reached first). If there is no desired minimum file size, leave the default
value of `0 B`.
+ Set the "Minimum Number of Records" property to the minimum
number of Records that should be included in the merged output file. For
example, setting the value
+ to `10000` ensures that the output file will have at least
10,000 Records in it (unless the Max Bin Age is reached first).
+ Set the "Maximum Number of Records" property to a value at
least as large as the "Minimum Number of Records." If there is no need to limit
the maximum number of
+ records per file, this number can be set to a value that
will never be reached such as `1000000000`.
+ Set the "Max Bin Age" to specify the maximum amount of time to
hold data before merging. This can be thought of as a "timeout" at which time
the Processor will
+ merge whatever data it is, even if the "Minimum Bin Size"
and "Minimum Number of Records" has not been reached. It is always recommended
to set the value.
+ A reasonable default might be `10 mins` if there is no
other latency requirement.
+ Set the value of the "Correlation Attribute Name" property to
the name of the property that you added in the PartitionRecord Processor. For
example, if merging data
+ based on the "productSku" field, the property in
PartitionRecord was named `productSku` so the value of the "Correlation
Attribute Name" property should
+ be `productSku`.
+ Set the "Maximum Number of Bins" property to a value that is
at least as large as the different number of values that will be present for
the Correlation Attribute.
+ For example, if you expect 1,000 different SKUs, set this
value to at least `1001`. It is not advisable, though, to set the value above
10,000.
+
+ Connect the 'merged' Relationship to the next component in the
flow.
+ Auto-terminate the 'original' Relationship.
+ """
+ )
+ }
+)
public class MergeRecord extends AbstractSessionFactoryProcessor {
// attributes for defragmentation
public static final String FRAGMENT_ID_ATTRIBUTE =
FragmentAttributes.FRAGMENT_ID.key();
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
index 45fc010232..7b00924b86 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
@@ -89,7 +89,7 @@ import java.util.Set;
"Listening Port" = a unique port number.
ListenHTTP automatically unpacks files that have attribute
mime.type=application/flowfile-v3.
- If PackageFlowFile batches 99 FlowFiles into 1 file that
InvokeHTTP sends, then the original 99 FlowFiles will be output by ListenHTTP!
+ If PackageFlowFile batches 99 FlowFiles into 1 file that
InvokeHTTP sends, then the original 99 FlowFiles will be output by ListenHTTP.
"""
)
}
@@ -112,7 +112,7 @@ import java.util.Set;
"Packaging Format" = "application/flowfile-v3".
Connect the output of a NiFi ingress processor that reads
files stored offline to the input of UnpackContent.
- If PackageFlowFile batches 99 FlowFiles into 1 file that
is read from storage, then the original 99 FlowFiles will be output by
UnpackContent!
+ If PackageFlowFile batches 99 FlowFiles into 1 file that
is read from storage, then the original 99 FlowFiles will be output by
UnpackContent.
"""
)
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 9ed978c716..0d68e7e7de 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -28,6 +28,7 @@ import
org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -46,9 +47,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
import static
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
-import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
@TriggerSerially
@@ -91,6 +92,24 @@ import static
org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+ "be added in the format `initial.maxvalue.<max_value_column>`. This
value is only used the first time the table is accessed (when a Maximum Value
Column is specified).")
@PrimaryNodeOnly
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
+@UseCase(
+ description = "Retrieve all rows from a database table.",
+ keywords = {"jdbc", "rdbms", "cdc", "database", "table", "stream"},
+ configuration = """
+ Configure the "Database Connection Pooling Service" to specify a
Connection Pooling Service so that the Processor knows how to connect to the
database.
+ Set the "Database Type" property to the type of database to query, or
"Generic" if the database vendor is not listed.
+ Set the "Table Name" property to the name of the table to retrieve
records from.
+ Configure the "Record Writer" to specify a Record Writer that is
appropriate for the desired output format.
+ Set the "Maximum-value Columns" property to a comma-separated list of
columns whose values can be used to determine which values are new. For
example, this might be set to
+ an `id` column that is a one-up number, or a `last_modified`
column that is a timestamp of when the row was last modified.
+ Set the "Initial Load Strategy" property to "Start at Beginning".
+ Set the "Fetch Size" to a number that avoids loading too much data
into memory on the NiFi side. For example, a value of `1000` will load up to
1,000 rows of data.
+ Set the "Max Rows Per Flow File" to a value that allows efficient
processing, such as `1000` or `10000`.
+ Set the "Output Batch Size" property to a value greater than `0`. A
smaller value, such as `1` or even `20` will result in lower latency but also
slightly lower throughput.
+ A larger value such as `1000` will result in higher throughput but
also higher latency. It is not recommended to set the value larger than `1000`
as it can cause significant
+ memory utilization.
+ """
+)
public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
public static final PropertyDescriptor RECORD_WRITER_FACTORY = new
PropertyDescriptor.Builder()
diff --git
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 22de5ae9e3..1e3fcfc226 100644
---
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@@ -85,6 +86,43 @@ import java.util.regex.Pattern;
description = "Updates a FlowFile attribute specified by the Dynamic
Property's key with the value specified by the Dynamic Property's value")
@WritesAttribute(attribute = "See additional details", description = "This
processor may write or remove zero or more attributes as described in
additional details")
@Stateful(scopes = {Scope.LOCAL}, description = "Gives the option to store
values not only on the FlowFile but as stateful variables to be referenced in a
recursive manner.")
+@UseCase(
+ description = "Add a new FlowFile attribute",
+ configuration = """
+ Leave "Delete Attributes Expression" and "Stateful Variables Initial
Value" unset.
+ Set "Store State" to "Do not store state".
+
+ Add a new property. The name of the property will become the name of
the newly added attribute.
+ The value of the property will become the value of the newly added
attribute. The value may use the NiFi Expression Language in order to reference
other
+ attributes or call Expression Language functions.
+ """
+)
+@UseCase(
+ description = "Overwrite a FlowFile attribute with a new value",
+ configuration = """
+ Leave "Delete Attributes Expression" and "Stateful Variables Initial
Value" unset.
+ Set "Store State" to "Do not store state".
+
+ Add a new property. The name of the property will become the name of
the attribute whose value will be overwritten.
+ The value of the property will become the new value of the attribute.
The value may use the NiFi Expression Language in order to reference other
+ attributes or call Expression Language functions.
+
+ For example, to change the `txId` attribute to the uppercase version
of its current value, add a property named `txId` with a value of
`${txId:toUpper()}`
+ """
+)
+@UseCase(
+ description = "Rename a file",
+ configuration = """
+ Leave "Delete Attributes Expression" and "Stateful Variables Initial
Value" unset.
+ Set "Store State" to "Do not store state".
+
+ Add a new property whose name is `filename` and whose value is the
desired filename.
+
+ For example, to set the filename to `abc.txt`, add a property named
`filename` with a value of `abc.txt`.
+ To add the `txId` attribute as a prefix to the filename, add a
property named `filename` with a value of `${txId}${filename}`.
+ Or, to make the filename more readable, separate the txId from the
rest of the filename with a hyphen by using a value of `${txId}-${filename}`.
+ """
+)
public class UpdateAttribute extends AbstractProcessor implements Searchable {