KuKuDeCheng opened a new pull request #4239:
URL: https://github.com/apache/nifi/pull/4239
NIFI-7403
Put.java improvement(PutSQL's transactions support)
#### Description of PR
PutSQL processor support `<Support Fragmented Transactions>`,if we set this
property true, I think it means The PutSQL processor will excute these sqls of
one transaction Transactionally!!
But we find that when we set the `<Rollback On Failure>` false, those sqls
of one transaction do not excute transactionally,some sucess and some failure.
I think it's wrong.
I think, if we set `<Support Fragmented Transactions>` true, it should be
executed Transactionally, no matter `<Rollback On Failure>` is true or false.
I see the code, only PutSQL has the `<Support Fragmented Transactions>`, it
maybe improve this feature at a small cost.
modify code design:
step1: Maybe other Processors would support the `<Support Fragmented
Transactions>`(such as PutDatabaseRecord), we should move the `<Support
Fragmented Transactions>` from PutSQL.java to Put.java( I think it's a rational
design that `Put.java` define the `<Support Fragmented Transactions>` property )
```java
public static final PropertyDescriptor SUPPORT_TRANSACTIONS = new
PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
...
```
step2: Additionally, I think the Put.java can extract the RelationShips of
the processors those use the Put.java(PutSQL PutDatabaseRecord, PutHiveQL...We
can see that these processors who use the Put.java have the same Relationships,
I this this is the `Put`'s common feature)
```java
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after
the database is successfully updated")
.build();
static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the
database cannot be updated and retrying the operation will also fail, "
+ "such as an invalid query or an integrity constraint
violation")
.build();
```
step3: in Put.java `onTrigger` method, after the `putFlowFiles` and before
the `onCompleted.apply`, we try to get the value of `<Rollback On Failure>`, if
true , check the `transferredFlowFiles` , if there are flowfiles which don't
route to `Success`, we should reroute these `transferredFlowFiles`(retry >
failure), and do `onFailed` function(if it's not null)
```java
try {
putFlowFiles(context, session, functionContext,
connection, flowFiles, result);
} catch (DiscontinuedException e) {
}
...
if (context.getProperties().containsKey(SUPPORT_TRANSACTIONS)
&& context.getProperty(SUPPORT_TRANSACTIONS).asBoolean()) {
//TODO do sth
}
if (onCompleted != null) {
onCompleted.apply(context, session, functionContext,
connection);
}
```
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
### For all changes:
- [ √] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
- [ √] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA
number you are trying to resolve? Pay particular attention to the hyphen "-"
character.
- [ √] Has your PR been rebased against the latest commit within the target
branch (typically `master`)?
- [√ ] Is your initial contribution a single, squashed commit? _Additional
commits in response to PR reviewer feedback should be made on this branch and
pushed to allow change tracking. Do not `squash` or use `--force` when pushing
to allow for clean monitoring of changes._
### For code changes:
- [ √] Have you ensured that the full suite of tests is executed via `mvn
-Pcontrib-check clean install` at the root `nifi` folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ √] Have you verified that the full build is successful on both JDK 8 and
JDK 11?
- [ ] If adding new dependencies to the code, are these dependencies
licensed in a way that is compatible for inclusion under [ASF
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the `LICENSE` file, including the main
`LICENSE` file under `nifi-assembly`?
- [ ] If applicable, have you updated the `NOTICE` file, including the main
`NOTICE` file found under `nifi-assembly`?
- [ ] If adding new Properties, have you added `.displayName` in addition to
.name (programmatic access) for each of the new properties?
### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in which
it is rendered?
### Note:
Please ensure that once the PR is submitted, you check travis-ci for build
issues and submit an update to your PR as soon as possible.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]