This is an automated email from the ASF dual-hosted git repository.
rfellows pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2fde6eea411 NIFI-15677: Restore special treatment of trigger serially
in the edit… (#10976)
2fde6eea411 is described below
commit 2fde6eea41132d8a5448a1277d1bdb4d6ee41d63
Author: Matt Gilman <[email protected]>
AuthorDate: Fri Mar 6 12:27:38 2026 -0500
NIFI-15677: Restore special treatment of trigger serially in the edit…
(#10976)
* NIFI-15677: Restore special treatment of trigger serially in the edit
processor dialog.
* NIFI-15677: Addressing review feedback.
---
.../edit-processor/edit-processor.component.html | 2 +-
.../edit-processor.component.spec.ts | 1371 ++++++++++----------
.../edit-processor/edit-processor.component.ts | 48 +-
3 files changed, 752 insertions(+), 669 deletions(-)
diff --git
a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html
b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html
index 43d211e94bf..3439493853b 100644
---
a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html
+++
b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html
@@ -171,7 +171,7 @@
class="fa fa-info-circle"
nifiTooltip
[tooltipComponentType]="TextTip"
- tooltipInputData="The number of
tasks that should be concurrently scheduled for this processor. Must be an
integer greater than 0."></i>
+
[tooltipInputData]="concurrentTasksTooltip()"></i>
</mat-label>
<input
matInput
diff --git
a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.spec.ts
b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.spec.ts
index 0c971a39e71..96e212c052d 100644
---
a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.spec.ts
+++
b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.spec.ts
@@ -28,701 +28,699 @@ import { MockComponent } from 'ng-mocks';
import { CanvasUtils } from '../../../../../service/canvas-utils.service';
import { ContextErrorBanner } from
'../../../../../../../ui/common/context-error-banner/context-error-banner.component';
-describe('EditProcessor', () => {
- let component: EditProcessor;
- let fixture: ComponentFixture<EditProcessor>;
-
- const data: EditComponentDialogRequest = {
- type: ComponentType.Processor,
+const data: EditComponentDialogRequest = {
+ type: ComponentType.Processor,
+ uri:
'https://localhost:4200/nifi-api/processors/d90ac264-018b-1000-1827-a86c8156fd9e',
+ entity: {
+ revision: {
+ clientId: 'd8e8a955-018b-1000-915e-a59d0e7933ef',
+ version: 1
+ },
+ id: 'd90ac264-018b-1000-1827-a86c8156fd9e',
uri:
'https://localhost:4200/nifi-api/processors/d90ac264-018b-1000-1827-a86c8156fd9e',
- entity: {
- revision: {
- clientId: 'd8e8a955-018b-1000-915e-a59d0e7933ef',
- version: 1
- },
+ position: {
+ x: 554.8456153681711,
+ y: -690.0400701011749
+ },
+ permissions: {
+ canRead: true,
+ canWrite: true
+ },
+ bulletins: [],
+ component: {
id: 'd90ac264-018b-1000-1827-a86c8156fd9e',
- uri:
'https://localhost:4200/nifi-api/processors/d90ac264-018b-1000-1827-a86c8156fd9e',
+ parentGroupId: '95a4b210-018b-1000-772a-5a9ebfa03287',
position: {
x: 554.8456153681711,
y: -690.0400701011749
},
- permissions: {
- canRead: true,
- canWrite: true
+ name: 'ConsumeKafka_2_6',
+ type: 'org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6',
+ bundle: {
+ group: 'org.apache.nifi',
+ artifact: 'nifi-kafka-2-6-nar',
+ version: '2.0.0-SNAPSHOT'
},
- bulletins: [],
- component: {
- id: 'd90ac264-018b-1000-1827-a86c8156fd9e',
- parentGroupId: '95a4b210-018b-1000-772a-5a9ebfa03287',
- position: {
- x: 554.8456153681711,
- y: -690.0400701011749
- },
- name: 'ConsumeKafka_2_6',
- type:
'org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6',
- bundle: {
- group: 'org.apache.nifi',
- artifact: 'nifi-kafka-2-6-nar',
- version: '2.0.0-SNAPSHOT'
+ state: 'STOPPED',
+ style: {},
+ relationships: [
+ {
+ name: 'success',
+ description:
+ 'FlowFiles received from Kafka. Depending on
demarcation strategy it is a flow file per message or a bundle of messages
grouped by topic and partition.',
+ autoTerminate: false,
+ retry: false
+ }
+ ],
+ supportsParallelProcessing: true,
+ supportsBatching: false,
+ supportsSensitiveDynamicProperties: false,
+ persistsState: false,
+ restricted: false,
+ deprecated: false,
+ executionNodeRestricted: false,
+ multipleVersionsAvailable: false,
+ inputRequirement: 'INPUT_FORBIDDEN',
+ config: {
+ properties: {
+ 'bootstrap.servers': 'localhost:9092',
+ topic: null,
+ topic_type: 'names',
+ 'group.id': null,
+ 'Commit Offsets': 'true',
+ 'max-uncommit-offset-wait': '1 secs',
+ 'honor-transactions': 'true',
+ 'message-demarcator': null,
+ 'separate-by-key': 'false',
+ 'security.protocol': 'PLAINTEXT',
+ 'sasl.mechanism': 'GSSAPI',
+ 'kerberos-user-service': null,
+ 'sasl.kerberos.service.name': null,
+ 'sasl.username': null,
+ 'sasl.password': null,
+ 'sasl.token.auth': 'false',
+ 'aws.profile.name': null,
+ 'ssl.context.service': null,
+ 'key-attribute-encoding': 'utf-8',
+ 'auto.offset.reset': 'latest',
+ 'message-header-encoding': 'UTF-8',
+ 'header-name-regex': null,
+ 'max.poll.records': '10000',
+ 'Communications Timeout': '60 secs'
},
- state: 'STOPPED',
- style: {},
- relationships: [
- {
- name: 'success',
+ descriptors: {
+ 'bootstrap.servers': {
+ name: 'bootstrap.servers',
+ displayName: 'Kafka Brokers',
+ description: 'Comma-separated list of Kafka Brokers in
the format host:port',
+ defaultValue: 'localhost:9092',
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
+ dependencies: []
+ },
+ topic: {
+ name: 'topic',
+ displayName: 'Topic Name(s)',
description:
- 'FlowFiles received from Kafka. Depending on
demarcation strategy it is a flow file per message or a bundle of messages
grouped by topic and partition.',
- autoTerminate: false,
- retry: false
- }
- ],
- supportsParallelProcessing: true,
- supportsBatching: false,
- supportsSensitiveDynamicProperties: false,
- persistsState: false,
- restricted: false,
- deprecated: false,
- executionNodeRestricted: false,
- multipleVersionsAvailable: false,
- inputRequirement: 'INPUT_FORBIDDEN',
- config: {
- properties: {
- 'bootstrap.servers': 'localhost:9092',
- topic: null,
- topic_type: 'names',
- 'group.id': null,
- 'Commit Offsets': 'true',
- 'max-uncommit-offset-wait': '1 secs',
- 'honor-transactions': 'true',
- 'message-demarcator': null,
- 'separate-by-key': 'false',
- 'security.protocol': 'PLAINTEXT',
- 'sasl.mechanism': 'GSSAPI',
- 'kerberos-user-service': null,
- 'sasl.kerberos.service.name': null,
- 'sasl.username': null,
- 'sasl.password': null,
- 'sasl.token.auth': 'false',
- 'aws.profile.name': null,
- 'ssl.context.service': null,
- 'key-attribute-encoding': 'utf-8',
- 'auto.offset.reset': 'latest',
- 'message-header-encoding': 'UTF-8',
- 'header-name-regex': null,
- 'max.poll.records': '10000',
- 'Communications Timeout': '60 secs'
+ 'The name of the Kafka Topic(s) to pull from. More
than one can be supplied if comma separated.',
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
+ dependencies: []
},
- descriptors: {
- 'bootstrap.servers': {
- name: 'bootstrap.servers',
- displayName: 'Kafka Brokers',
- description: 'Comma-separated list of Kafka
Brokers in the format host:port',
- defaultValue: 'localhost:9092',
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
- dependencies: []
- },
- topic: {
- name: 'topic',
- displayName: 'Topic Name(s)',
- description:
- 'The name of the Kafka Topic(s) to pull from.
More than one can be supplied if comma separated.',
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
- dependencies: []
- },
- topic_type: {
- name: 'topic_type',
- displayName: 'Topic Name Format',
- description:
- 'Specifies whether the Topic(s) provided are a
comma separated list of names or a single regular expression',
- defaultValue: 'names',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'names',
- value: 'names',
- description: 'Topic is a full topic
name or comma separated list of names'
- },
- canRead: true
+ topic_type: {
+ name: 'topic_type',
+ displayName: 'Topic Name Format',
+ description:
+ 'Specifies whether the Topic(s) provided are a
comma separated list of names or a single regular expression',
+ defaultValue: 'names',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'names',
+ value: 'names',
+ description: 'Topic is a full topic name
or comma separated list of names'
},
- {
- allowableValue: {
- displayName: 'pattern',
- value: 'pattern',
- description: 'Topic is a regex using
the Java Pattern syntax'
- },
- canRead: true
- }
- ],
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'group.id': {
- name: 'group.id',
- displayName: 'Group ID',
- description:
- "A Group ID is used to identify consumers that
are within the same consumer group. Corresponds to Kafka's 'group.id'
property.",
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
- dependencies: []
- },
- 'Commit Offsets': {
- name: 'Commit Offsets',
- displayName: 'Commit Offsets',
- description:
- "Specifies whether or not this Processor
should commit the offsets to Kafka after receiving messages. Typically, we want
this value set to true so that messages that are received are not duplicated.
However, in certain scenarios, we may want to avoid committing the offsets,
that the data can be processed and later acknowledged by PublishKafkaRecord in
order to provide Exactly Once semantics. See Processor's Usage / Additional
Details for more information.",
- defaultValue: 'true',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'true',
- value: 'true'
- },
- canRead: true
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'pattern',
+ value: 'pattern',
+ description: 'Topic is a regex using the
Java Pattern syntax'
},
- {
- allowableValue: {
- displayName: 'false',
- value: 'false'
- },
- canRead: true
- }
- ],
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'max-uncommit-offset-wait': {
- name: 'max-uncommit-offset-wait',
- displayName: 'Max Uncommitted Time',
- description:
- "Specifies the maximum amount of time allowed
to pass before offsets must be committed. This value impacts how often offsets
will be committed. Committing offsets less often increases throughput but also
increases the window of potential data duplication in the event of a rebalance
or JVM restart between commits. This value is also related to maximum poll
records and the use of a message demarcator. When using a message demarcator
we can have far more u [...]
- defaultValue: '1 secs',
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: [
- {
- propertyName: 'Commit Offsets',
- dependentValues: ['true']
- }
- ]
- },
- 'honor-transactions': {
- name: 'honor-transactions',
- displayName: 'Honor Transactions',
- description:
- 'Specifies whether or not NiFi should honor
transactional guarantees when communicating with Kafka. If false, the Processor
will use an "isolation level" of read_uncomitted. This means that messages will
be received as soon as they are written to Kafka but will be pulled, even if
the producer cancels the transactions. If this value is true, NiFi will not
receive any messages for which the producer\'s transaction was canceled, but
this can result in some la [...]
- defaultValue: 'true',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'true',
- value: 'true'
- },
- canRead: true
+ canRead: true
+ }
+ ],
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'group.id': {
+ name: 'group.id',
+ displayName: 'Group ID',
+ description:
+ "A Group ID is used to identify consumers that are
within the same consumer group. Corresponds to Kafka's 'group.id' property.",
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
+ dependencies: []
+ },
+ 'Commit Offsets': {
+ name: 'Commit Offsets',
+ displayName: 'Commit Offsets',
+ description:
+ "Specifies whether or not this Processor should
commit the offsets to Kafka after receiving messages. Typically, we want this
value set to true so that messages that are received are not duplicated.
However, in certain scenarios, we may want to avoid committing the offsets,
that the data can be processed and later acknowledged by PublishKafkaRecord in
order to provide Exactly Once semantics. See Processor's Usage / Additional
Details for more information.",
+ defaultValue: 'true',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'true',
+ value: 'true'
},
- {
- allowableValue: {
- displayName: 'false',
- value: 'false'
- },
- canRead: true
- }
- ],
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'message-demarcator': {
- name: 'message-demarcator',
- displayName: 'Message Demarcator',
- description:
- "Since KafkaConsumer receives messages in
batches, you have an option to output FlowFiles which contains all Kafka
messages in a single batch for a given topic and partition and this property
allows you to provide a string (interpreted as UTF-8) to use for demarcating
apart multiple Kafka messages. This is an optional property and if not provided
each Kafka message received will result in a single FlowFile which time it is
triggered. To enter special char [...]
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
- dependencies: []
- },
- 'separate-by-key': {
- name: 'separate-by-key',
- displayName: 'Separate By Key',
- description:
- 'If true, and the <Message Demarcator>
property is set, two messages will only be added to the same FlowFile if both
of the Kafka Messages have identical keys.',
- defaultValue: 'false',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'true',
- value: 'true'
- },
- canRead: true
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'false',
+ value: 'false'
},
- {
- allowableValue: {
- displayName: 'false',
- value: 'false'
- },
- canRead: true
- }
- ],
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'security.protocol': {
- name: 'security.protocol',
- displayName: 'Security Protocol',
- description:
- 'Security protocol used to communicate with
brokers. Corresponds to Kafka Client security.protocol property',
- defaultValue: 'PLAINTEXT',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'PLAINTEXT',
- value: 'PLAINTEXT'
- },
- canRead: true
+ canRead: true
+ }
+ ],
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'max-uncommit-offset-wait': {
+ name: 'max-uncommit-offset-wait',
+ displayName: 'Max Uncommitted Time',
+ description:
+ "Specifies the maximum amount of time allowed to
pass before offsets must be committed. This value impacts how often offsets
will be committed. Committing offsets less often increases throughput but also
increases the window of potential data duplication in the event of a rebalance
or JVM restart between commits. This value is also related to maximum poll
records and the use of a message demarcator. When using a message demarcator
we can have far more uncom [...]
+ defaultValue: '1 secs',
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: [
+ {
+ propertyName: 'Commit Offsets',
+ dependentValues: ['true']
+ }
+ ]
+ },
+ 'honor-transactions': {
+ name: 'honor-transactions',
+ displayName: 'Honor Transactions',
+ description:
+ 'Specifies whether or not NiFi should honor
transactional guarantees when communicating with Kafka. If false, the Processor
will use an "isolation level" of read_uncomitted. This means that messages will
be received as soon as they are written to Kafka but will be pulled, even if
the producer cancels the transactions. If this value is true, NiFi will not
receive any messages for which the producer\'s transaction was canceled, but
this can result in some latenc [...]
+ defaultValue: 'true',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'true',
+ value: 'true'
},
- {
- allowableValue: {
- displayName: 'SSL',
- value: 'SSL'
- },
- canRead: true
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'false',
+ value: 'false'
},
- {
- allowableValue: {
- displayName: 'SASL_PLAINTEXT',
- value: 'SASL_PLAINTEXT'
- },
- canRead: true
+ canRead: true
+ }
+ ],
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'message-demarcator': {
+ name: 'message-demarcator',
+ displayName: 'Message Demarcator',
+ description:
+ "Since KafkaConsumer receives messages in batches,
you have an option to output FlowFiles which contains all Kafka messages in a
single batch for a given topic and partition and this property allows you to
provide a string (interpreted as UTF-8) to use for demarcating apart multiple
Kafka messages. This is an optional property and if not provided each Kafka
message received will result in a single FlowFile which time it is triggered.
To enter special characte [...]
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
+ dependencies: []
+ },
+ 'separate-by-key': {
+ name: 'separate-by-key',
+ displayName: 'Separate By Key',
+ description:
+ 'If true, and the <Message Demarcator> property is
set, two messages will only be added to the same FlowFile if both of the Kafka
Messages have identical keys.',
+ defaultValue: 'false',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'true',
+ value: 'true'
},
- {
- allowableValue: {
- displayName: 'SASL_SSL',
- value: 'SASL_SSL'
- },
- canRead: true
- }
- ],
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'sasl.mechanism': {
- name: 'sasl.mechanism',
- displayName: 'SASL Mechanism',
- description:
- 'SASL mechanism used for authentication.
Corresponds to Kafka Client sasl.mechanism property',
- defaultValue: 'GSSAPI',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'GSSAPI',
- value: 'GSSAPI',
- description: 'General Security
Services API for Kerberos authentication'
- },
- canRead: true
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'false',
+ value: 'false'
},
- {
- allowableValue: {
- displayName: 'PLAIN',
- value: 'PLAIN',
- description: 'Plain username and
password authentication'
- },
- canRead: true
+ canRead: true
+ }
+ ],
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'security.protocol': {
+ name: 'security.protocol',
+ displayName: 'Security Protocol',
+ description:
+ 'Security protocol used to communicate with
brokers. Corresponds to Kafka Client security.protocol property',
+ defaultValue: 'PLAINTEXT',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'PLAINTEXT',
+ value: 'PLAINTEXT'
},
- {
- allowableValue: {
- displayName: 'SCRAM-SHA-256',
- value: 'SCRAM-SHA-256',
- description:
- 'Salted Challenge Response
Authentication Mechanism using SHA-512 with username and password'
- },
- canRead: true
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'SSL',
+ value: 'SSL'
},
- {
- allowableValue: {
- displayName: 'SCRAM-SHA-512',
- value: 'SCRAM-SHA-512',
- description:
- 'Salted Challenge Response
Authentication Mechanism using SHA-256 with username and password'
- },
- canRead: true
- }
- ],
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'kerberos-user-service': {
- name: 'kerberos-user-service',
- displayName: 'Kerberos User Service',
- description: 'Service supporting user
authentication with Kerberos',
- allowableValues: [],
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- identifiesControllerService:
'org.apache.nifi.kerberos.SelfContainedKerberosUserService',
- identifiesControllerServiceBundle: {
- group: 'org.apache.nifi',
- artifact: 'nifi-standard-services-api-nar',
- version: '2.0.0-SNAPSHOT'
+ canRead: true
},
- dependencies: []
- },
- 'sasl.kerberos.service.name': {
- name: 'sasl.kerberos.service.name',
- displayName: 'Kerberos Service Name',
- description:
- 'The service name that matches the primary
name of the Kafka server configured in the broker JAAS configuration',
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
- dependencies: []
- },
- 'sasl.username': {
- name: 'sasl.username',
- displayName: 'Username',
- description:
- 'Username provided with configured password
when using PLAIN or SCRAM SASL Mechanisms',
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
- dependencies: [
- {
- propertyName: 'sasl.mechanism',
- dependentValues: ['PLAIN',
'SCRAM-SHA-512', 'SCRAM-SHA-256']
- }
- ]
- },
- 'sasl.password': {
- name: 'sasl.password',
- displayName: 'Password',
- description:
- 'Password provided with configured username
when using PLAIN or SCRAM SASL Mechanisms',
- required: false,
- sensitive: true,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
- dependencies: [
- {
- propertyName: 'sasl.mechanism',
- dependentValues: ['PLAIN',
'SCRAM-SHA-512', 'SCRAM-SHA-256']
- }
- ]
- },
- 'sasl.token.auth': {
- name: 'sasl.token.auth',
- displayName: 'Token Authentication',
- description: 'Enables or disables Token
authentication when using SCRAM SASL Mechanisms',
- defaultValue: 'false',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'true',
- value: 'true'
- },
- canRead: true
+ {
+ allowableValue: {
+ displayName: 'SASL_PLAINTEXT',
+ value: 'SASL_PLAINTEXT'
},
- {
- allowableValue: {
- displayName: 'false',
- value: 'false'
- },
- canRead: true
- }
- ],
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: [
- {
- propertyName: 'sasl.mechanism',
- dependentValues: ['SCRAM-SHA-512',
'SCRAM-SHA-256']
- }
- ]
- },
- 'aws.profile.name': {
- name: 'aws.profile.name',
- displayName: 'AWS Profile Name',
- description:
- 'The Amazon Web Services Profile to select
when multiple profiles are available.',
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: true,
- expressionLanguageScope: 'Environment variables
and FlowFile Attributes',
- dependencies: [
- {
- propertyName: 'sasl.mechanism',
- dependentValues: ['AWS_MSK_IAM']
- }
- ]
- },
- 'ssl.context.service': {
- name: 'ssl.context.service',
- displayName: 'SSL Context Service',
- description: 'Service supporting SSL communication
with Kafka brokers',
- allowableValues: [],
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- identifiesControllerService:
'org.apache.nifi.ssl.SSLContextService',
- identifiesControllerServiceBundle: {
- group: 'org.apache.nifi',
- artifact: 'nifi-standard-services-api-nar',
- version: '2.0.0-SNAPSHOT'
+ canRead: true
},
- dependencies: []
- },
- 'key-attribute-encoding': {
- name: 'key-attribute-encoding',
- displayName: 'Key Attribute Encoding',
- description:
- "FlowFiles that are emitted have an attribute
named 'kafka.key'. This property dictates how the value of the attribute should
be encoded.",
- defaultValue: 'utf-8',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'UTF-8 Encoded',
- value: 'utf-8',
- description: 'The key is interpreted
as a UTF-8 Encoded string.'
- },
- canRead: true
+ {
+ allowableValue: {
+ displayName: 'SASL_SSL',
+ value: 'SASL_SSL'
},
- {
- allowableValue: {
- displayName: 'Hex Encoded',
- value: 'hex',
- description:
- 'The key is interpreted as
arbitrary binary data and is encoded using hexadecimal characters with
uppercase letters'
- },
- canRead: true
+ canRead: true
+ }
+ ],
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'sasl.mechanism': {
+ name: 'sasl.mechanism',
+ displayName: 'SASL Mechanism',
+ description:
+ 'SASL mechanism used for authentication.
Corresponds to Kafka Client sasl.mechanism property',
+ defaultValue: 'GSSAPI',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'GSSAPI',
+ value: 'GSSAPI',
+ description: 'General Security Services
API for Kerberos authentication'
},
- {
- allowableValue: {
- displayName: 'Do Not Add Key as
Attribute',
- value: 'do-not-add',
- description: 'The key will not be
added as an Attribute'
- },
- canRead: true
- }
- ],
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'auto.offset.reset': {
- name: 'auto.offset.reset',
- displayName: 'Offset Reset',
- description:
- "Allows you to manage the condition when there
is no initial offset in Kafka or if the current offset does not exist any more
on the server (e.g. because that data has been deleted). Corresponds to Kafka's
'auto.offset.reset' property.",
- defaultValue: 'latest',
- allowableValues: [
- {
- allowableValue: {
- displayName: 'earliest',
- value: 'earliest',
- description: 'Automatically reset the
offset to the earliest offset'
- },
- canRead: true
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'PLAIN',
+ value: 'PLAIN',
+ description: 'Plain username and password
authentication'
},
- {
- allowableValue: {
- displayName: 'latest',
- value: 'latest',
- description: 'Automatically reset the
offset to the latest offset'
- },
- canRead: true
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'SCRAM-SHA-256',
+ value: 'SCRAM-SHA-256',
+ description:
+ 'Salted Challenge Response
Authentication Mechanism using SHA-512 with username and password'
},
- {
- allowableValue: {
- displayName: 'none',
- value: 'none',
- description:
- "Throw exception to the consumer
if no previous offset is found for the consumer's group"
- },
- canRead: true
- }
- ],
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'message-header-encoding': {
- name: 'message-header-encoding',
- displayName: 'Message Header Encoding',
- description:
- 'Any message header that is found on a Kafka
message will be added to the outbound FlowFile as an attribute. This property
indicates the Character Encoding to use for deserializing the headers.',
- defaultValue: 'UTF-8',
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- },
- 'header-name-regex': {
- name: 'header-name-regex',
- displayName: 'Headers to Add as Attributes
(Regex)',
- description:
- 'A Regular Expression that is matched against
all message headers. Any message header whose name matches the regex will be
added to the FlowFile as an Attribute. If not specified, no Header values will
be added as FlowFile attributes. If two messages have a different value for the
same header and that header is selected by the provided regex, then those two
messages must be added to different FlowFiles. As a result, users should be
cautious about using a r [...]
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'SCRAM-SHA-512',
+ value: 'SCRAM-SHA-512',
+ description:
+ 'Salted Challenge Response
Authentication Mechanism using SHA-256 with username and password'
+ },
+ canRead: true
+ }
+ ],
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'kerberos-user-service': {
+ name: 'kerberos-user-service',
+ displayName: 'Kerberos User Service',
+ description: 'Service supporting user authentication
with Kerberos',
+ allowableValues: [],
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ identifiesControllerService:
'org.apache.nifi.kerberos.SelfContainedKerberosUserService',
+ identifiesControllerServiceBundle: {
+ group: 'org.apache.nifi',
+ artifact: 'nifi-standard-services-api-nar',
+ version: '2.0.0-SNAPSHOT'
},
- 'max.poll.records': {
- name: 'max.poll.records',
- displayName: 'Max Poll Records',
- description:
- 'Specifies the maximum number of records Kafka
should return in a single poll.',
- defaultValue: '10000',
- required: false,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
+ dependencies: []
+ },
+ 'sasl.kerberos.service.name': {
+ name: 'sasl.kerberos.service.name',
+ displayName: 'Kerberos Service Name',
+ description:
+ 'The service name that matches the primary name of
the Kafka server configured in the broker JAAS configuration',
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
+ dependencies: []
+ },
+ 'sasl.username': {
+ name: 'sasl.username',
+ displayName: 'Username',
+ description:
+ 'Username provided with configured password when
using PLAIN or SCRAM SASL Mechanisms',
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
+ dependencies: [
+ {
+ propertyName: 'sasl.mechanism',
+ dependentValues: ['PLAIN', 'SCRAM-SHA-512',
'SCRAM-SHA-256']
+ }
+ ]
+ },
+ 'sasl.password': {
+ name: 'sasl.password',
+ displayName: 'Password',
+ description:
+ 'Password provided with configured username when
using PLAIN or SCRAM SASL Mechanisms',
+ required: false,
+ sensitive: true,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables
defined at JVM level and system properties',
+ dependencies: [
+ {
+ propertyName: 'sasl.mechanism',
+ dependentValues: ['PLAIN', 'SCRAM-SHA-512',
'SCRAM-SHA-256']
+ }
+ ]
+ },
+ 'sasl.token.auth': {
+ name: 'sasl.token.auth',
+ displayName: 'Token Authentication',
+ description: 'Enables or disables Token authentication
when using SCRAM SASL Mechanisms',
+ defaultValue: 'false',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'true',
+ value: 'true'
+ },
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'false',
+ value: 'false'
+ },
+ canRead: true
+ }
+ ],
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: [
+ {
+ propertyName: 'sasl.mechanism',
+ dependentValues: ['SCRAM-SHA-512',
'SCRAM-SHA-256']
+ }
+ ]
+ },
+ 'aws.profile.name': {
+ name: 'aws.profile.name',
+ displayName: 'AWS Profile Name',
+ description: 'The Amazon Web Services Profile to
select when multiple profiles are available.',
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: true,
+ expressionLanguageScope: 'Environment variables and
FlowFile Attributes',
+ dependencies: [
+ {
+ propertyName: 'sasl.mechanism',
+ dependentValues: ['AWS_MSK_IAM']
+ }
+ ]
+ },
+ 'ssl.context.service': {
+ name: 'ssl.context.service',
+ displayName: 'SSL Context Service',
+ description: 'Service supporting SSL communication
with Kafka brokers',
+ allowableValues: [],
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ identifiesControllerService:
'org.apache.nifi.ssl.SSLContextService',
+ identifiesControllerServiceBundle: {
+ group: 'org.apache.nifi',
+ artifact: 'nifi-standard-services-api-nar',
+ version: '2.0.0-SNAPSHOT'
},
- 'Communications Timeout': {
- name: 'Communications Timeout',
- displayName: 'Communications Timeout',
- description:
- 'Specifies the timeout that the consumer
should use when communicating with the Kafka Broker',
- defaultValue: '60 secs',
- required: true,
- sensitive: false,
- dynamic: false,
- supportsEl: false,
- expressionLanguageScope: 'Not Supported',
- dependencies: []
- }
+ dependencies: []
},
- schedulingPeriod: '0 sec',
- schedulingStrategy: 'TIMER_DRIVEN',
- executionNode: 'ALL',
- penaltyDuration: '30 sec',
- yieldDuration: '1 sec',
- bulletinLevel: 'WARN',
- runDurationMillis: 0,
- concurrentlySchedulableTaskCount: 1,
- autoTerminatedRelationships: [],
- comments: '',
- lossTolerant: false,
- defaultConcurrentTasks: {
- TIMER_DRIVEN: '1',
- CRON_DRIVEN: '1'
+ 'key-attribute-encoding': {
+ name: 'key-attribute-encoding',
+ displayName: 'Key Attribute Encoding',
+ description:
+ "FlowFiles that are emitted have an attribute
named 'kafka.key'. This property dictates how the value of the attribute should
be encoded.",
+ defaultValue: 'utf-8',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'UTF-8 Encoded',
+ value: 'utf-8',
+ description: 'The key is interpreted as a
UTF-8 Encoded string.'
+ },
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'Hex Encoded',
+ value: 'hex',
+ description:
+ 'The key is interpreted as arbitrary
binary data and is encoded using hexadecimal characters with uppercase letters'
+ },
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'Do Not Add Key as Attribute',
+ value: 'do-not-add',
+ description: 'The key will not be added as
an Attribute'
+ },
+ canRead: true
+ }
+ ],
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'auto.offset.reset': {
+ name: 'auto.offset.reset',
+ displayName: 'Offset Reset',
+ description:
+ "Allows you to manage the condition when there is
no initial offset in Kafka or if the current offset does not exist any more on
the server (e.g. because that data has been deleted). Corresponds to Kafka's
'auto.offset.reset' property.",
+ defaultValue: 'latest',
+ allowableValues: [
+ {
+ allowableValue: {
+ displayName: 'earliest',
+ value: 'earliest',
+ description: 'Automatically reset the
offset to the earliest offset'
+ },
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'latest',
+ value: 'latest',
+ description: 'Automatically reset the
offset to the latest offset'
+ },
+ canRead: true
+ },
+ {
+ allowableValue: {
+ displayName: 'none',
+ value: 'none',
+ description:
+ "Throw exception to the consumer if no
previous offset is found for the consumer's group"
+ },
+ canRead: true
+ }
+ ],
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
},
- defaultSchedulingPeriod: {
- TIMER_DRIVEN: '0 sec',
- CRON_DRIVEN: '* * * * * ?'
+ 'message-header-encoding': {
+ name: 'message-header-encoding',
+ displayName: 'Message Header Encoding',
+ description:
+ 'Any message header that is found on a Kafka
message will be added to the outbound FlowFile as an attribute. This property
indicates the Character Encoding to use for deserializing the headers.',
+ defaultValue: 'UTF-8',
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
},
- retryCount: 10,
- retriedRelationships: [],
- backoffMechanism: 'PENALIZE_FLOWFILE',
- maxBackoffPeriod: '10 mins'
+ 'header-name-regex': {
+ name: 'header-name-regex',
+ displayName: 'Headers to Add as Attributes (Regex)',
+ description:
+ 'A Regular Expression that is matched against all
message headers. Any message header whose name matches the regex will be added
to the FlowFile as an Attribute. If not specified, no Header values will be
added as FlowFile attributes. If two messages have a different value for the
same header and that header is selected by the provided regex, then those two
messages must be added to different FlowFiles. As a result, users should be
cautious about using a regex [...]
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'max.poll.records': {
+ name: 'max.poll.records',
+ displayName: 'Max Poll Records',
+ description: 'Specifies the maximum number of records
Kafka should return in a single poll.',
+ defaultValue: '10000',
+ required: false,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ },
+ 'Communications Timeout': {
+ name: 'Communications Timeout',
+ displayName: 'Communications Timeout',
+ description:
+ 'Specifies the timeout that the consumer should
use when communicating with the Kafka Broker',
+ defaultValue: '60 secs',
+ required: true,
+ sensitive: false,
+ dynamic: false,
+ supportsEl: false,
+ expressionLanguageScope: 'Not Supported',
+ dependencies: []
+ }
+ },
+ schedulingPeriod: '0 sec',
+ schedulingStrategy: 'TIMER_DRIVEN',
+ executionNode: 'ALL',
+ penaltyDuration: '30 sec',
+ yieldDuration: '1 sec',
+ bulletinLevel: 'WARN',
+ runDurationMillis: 0,
+ concurrentlySchedulableTaskCount: 1,
+ autoTerminatedRelationships: [],
+ comments: '',
+ lossTolerant: false,
+ defaultConcurrentTasks: {
+ TIMER_DRIVEN: '1',
+ CRON_DRIVEN: '1'
+ },
+ defaultSchedulingPeriod: {
+ TIMER_DRIVEN: '0 sec',
+ CRON_DRIVEN: '* * * * * ?'
},
- validationErrors: [
- "'Topic Name(s)' is invalid because Topic Name(s) is
required",
- "'Group ID' is invalid because Group ID is required",
- "'Relationship success' is invalid because Relationship
'success' is not connected to any component and is not auto-terminated"
- ],
- validationStatus: 'INVALID',
- extensionMissing: false
+ retryCount: 10,
+ retriedRelationships: [],
+ backoffMechanism: 'PENALIZE_FLOWFILE',
+ maxBackoffPeriod: '10 mins'
},
- inputRequirement: 'INPUT_FORBIDDEN',
- status: {
- groupId: '95a4b210-018b-1000-772a-5a9ebfa03287',
+ validationErrors: [
+ "'Topic Name(s)' is invalid because Topic Name(s) is required",
+ "'Group ID' is invalid because Group ID is required",
+ "'Relationship success' is invalid because Relationship
'success' is not connected to any component and is not auto-terminated"
+ ],
+ validationStatus: 'INVALID',
+ extensionMissing: false
+ },
+ inputRequirement: 'INPUT_FORBIDDEN',
+ status: {
+ groupId: '95a4b210-018b-1000-772a-5a9ebfa03287',
+ id: 'd90ac264-018b-1000-1827-a86c8156fd9e',
+ name: 'ConsumeKafka_2_6',
+ runStatus: 'Invalid',
+ statsLastRefreshed: '14:54:21 EST',
+ aggregateSnapshot: {
id: 'd90ac264-018b-1000-1827-a86c8156fd9e',
+ groupId: '95a4b210-018b-1000-772a-5a9ebfa03287',
name: 'ConsumeKafka_2_6',
+ type: 'ConsumeKafka_2_6',
runStatus: 'Invalid',
- statsLastRefreshed: '14:54:21 EST',
- aggregateSnapshot: {
- id: 'd90ac264-018b-1000-1827-a86c8156fd9e',
- groupId: '95a4b210-018b-1000-772a-5a9ebfa03287',
- name: 'ConsumeKafka_2_6',
- type: 'ConsumeKafka_2_6',
- runStatus: 'Invalid',
- executionNode: 'ALL',
- bytesRead: 0,
- bytesWritten: 0,
- read: '0 bytes',
- written: '0 bytes',
- flowFilesIn: 0,
- bytesIn: 0,
- input: '0 (0 bytes)',
- flowFilesOut: 0,
- bytesOut: 0,
- output: '0 (0 bytes)',
- taskCount: 0,
- tasksDurationNanos: 0,
- tasks: '0',
- tasksDuration: '00:00:00.000',
- activeThreadCount: 0,
- terminatedThreadCount: 0
- }
- },
- operatePermissions: {
- canRead: false,
- canWrite: false
+ executionNode: 'ALL',
+ bytesRead: 0,
+ bytesWritten: 0,
+ read: '0 bytes',
+ written: '0 bytes',
+ flowFilesIn: 0,
+ bytesIn: 0,
+ input: '0 (0 bytes)',
+ flowFilesOut: 0,
+ bytesOut: 0,
+ output: '0 (0 bytes)',
+ taskCount: 0,
+ tasksDurationNanos: 0,
+ tasks: '0',
+ tasksDuration: '00:00:00.000',
+ activeThreadCount: 0,
+ terminatedThreadCount: 0
}
+ },
+ operatePermissions: {
+ canRead: false,
+ canWrite: false
}
- };
+ }
+};
+
+describe('EditProcessor', () => {
+ let component: EditProcessor;
+ let fixture: ComponentFixture<EditProcessor>;
beforeEach(() => {
TestBed.configureTestingModule({
@@ -752,4 +750,65 @@ describe('EditProcessor', () => {
it('should create', () => {
expect(component).toBeTruthy();
});
+
+ it('should enable concurrent tasks when supportsParallelProcessing is
true', () => {
+ const concurrentTasks =
component.editProcessorForm.get('concurrentTasks');
+ expect(concurrentTasks?.enabled).toBe(true);
+ });
+
+ it('should return standard tooltip when supportsParallelProcessing is
true', () => {
+ expect(component.concurrentTasksTooltip()).toBe(
+ 'The number of tasks that should be concurrently scheduled for
this processor. Must be an integer greater than 0.'
+ );
+ });
+});
+
+describe('EditProcessor with TriggerSerially', () => {
+ let component: EditProcessor;
+ let fixture: ComponentFixture<EditProcessor>;
+
+ const serialData: EditComponentDialogRequest =
JSON.parse(JSON.stringify(data));
+ (serialData.entity as any).component.supportsParallelProcessing = false;
+
+ beforeEach(() => {
+ TestBed.configureTestingModule({
+ imports: [EditProcessor, MockComponent(ContextErrorBanner),
NoopAnimationsModule],
+ providers: [
+ { provide: MAT_DIALOG_DATA, useValue: serialData },
+ {
+ provide: ClusterConnectionService,
+ useValue: {
+ isDisconnectionAcknowledged: jest.fn()
+ }
+ },
+ {
+ provide: CanvasUtils,
+ useValue: {
+ runnableSupportsModification: jest.fn()
+ }
+ },
+ { provide: MatDialogRef, useValue: null }
+ ]
+ });
+ fixture = TestBed.createComponent(EditProcessor);
+ component = fixture.componentInstance;
+ fixture.detectChanges();
+ });
+
+ it('should disable concurrent tasks when supportsParallelProcessing is
false', () => {
+ const concurrentTasks =
component.editProcessorForm.get('concurrentTasks');
+ expect(concurrentTasks?.disabled).toBe(true);
+ });
+
+ it('should return tooltip explaining lack of parallel processing support',
() => {
+ expect(component.concurrentTasksTooltip()).toBe('This processor does
not support parallel processing.');
+ });
+
+ it('should keep concurrent tasks disabled after processRunStateUpdates',
() => {
+ component.processorUpdates = serialData.entity;
+ fixture.detectChanges();
+
+ const concurrentTasks =
component.editProcessorForm.get('concurrentTasks');
+ expect(concurrentTasks?.disabled).toBe(true);
+ });
});
diff --git
a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts
b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts
index bc915597072..4331df213bd 100644
---
a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts
+++
b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts
@@ -281,6 +281,10 @@ export class EditProcessor extends TabbedDialog {
comments: new FormControl(request.entity.component.config.comments)
});
+ if (!this.supportsParallelProcessing()) {
+ this.editProcessorForm.get('concurrentTasks')?.disable();
+ }
+
if (this.supportsBatching()) {
this.editProcessorForm.addControl(
'runDuration',
@@ -313,6 +317,10 @@ export class EditProcessor extends TabbedDialog {
this.editProcessorForm.get('runDuration')?.enable();
}
}
+
+ if (!this.supportsParallelProcessing()) {
+ this.editProcessorForm.get('concurrentTasks')?.disable();
+ }
}
private relationshipConfigurationValidator(): ValidatorFn {
@@ -348,6 +356,17 @@ export class EditProcessor extends TabbedDialog {
return this.request.entity.component.supportsBatching == true;
}
+ supportsParallelProcessing(): boolean {
+ return this.request.entity.component.supportsParallelProcessing ===
true;
+ }
+
+ concurrentTasksTooltip(): string {
+ if (this.supportsParallelProcessing()) {
+ return 'The number of tasks that should be concurrently scheduled
for this processor. Must be an integer greater than 0.';
+ }
+ return 'This processor does not support parallel processing.';
+ }
+
formatType(): string {
return this.nifiCommon.formatType(this.request.entity.component);
}
@@ -410,6 +429,22 @@ export class EditProcessor extends TabbedDialog {
.filter((relationship) => relationship.retry)
.map((relationship) => relationship.name);
+ const config: any = {
+ penaltyDuration:
this.editProcessorForm.get('penaltyDuration')?.value,
+ yieldDuration: this.editProcessorForm.get('yieldDuration')?.value,
+ bulletinLevel: this.editProcessorForm.get('bulletinLevel')?.value,
+ schedulingStrategy:
this.editProcessorForm.get('schedulingStrategy')?.value,
+ schedulingPeriod:
this.editProcessorForm.get('schedulingPeriod')?.value,
+ executionNode: this.editProcessorForm.get('executionNode')?.value,
+ autoTerminatedRelationships: autoTerminated,
+ retriedRelationships: retried,
+ comments: this.editProcessorForm.get('comments')?.value
+ };
+
+ if (this.supportsParallelProcessing()) {
+ config.concurrentlySchedulableTaskCount =
this.editProcessorForm.get('concurrentTasks')?.value;
+ }
+
const payload: any = {
revision: this.client.getRevision({
...this.request.entity,
@@ -419,18 +454,7 @@ export class EditProcessor extends TabbedDialog {
component: {
id: this.request.entity.id,
name: this.editProcessorForm.get('name')?.value,
- config: {
- penaltyDuration:
this.editProcessorForm.get('penaltyDuration')?.value,
- yieldDuration:
this.editProcessorForm.get('yieldDuration')?.value,
- bulletinLevel:
this.editProcessorForm.get('bulletinLevel')?.value,
- schedulingStrategy:
this.editProcessorForm.get('schedulingStrategy')?.value,
- concurrentlySchedulableTaskCount:
this.editProcessorForm.get('concurrentTasks')?.value,
- schedulingPeriod:
this.editProcessorForm.get('schedulingPeriod')?.value,
- executionNode:
this.editProcessorForm.get('executionNode')?.value,
- autoTerminatedRelationships: autoTerminated,
- retriedRelationships: retried,
- comments: this.editProcessorForm.get('comments')?.value
- }
+ config
}
};