[
https://issues.apache.org/jira/browse/NIFI-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ZhangCheng updated NIFI-7403:
-----------------------------
Description:
PutSQL processor support <Support Fragmented Transactions>,if we set this
property true, I think it means The PutSQL processor will excute these sqls of
one transaction Transactionally!!
But we find that when we set the <Rollback On Failure> false, those sqls of one
transaction do not excute transactionally,some sucess and some failure. I think
it's wrong.
I think, if we set <Support Fragmented Transactions> true, it should be
executed Transactionally, no matter <Rollback On Failure> is true or false.
I see the code, only PutSQL has the <Support Fragmented Transactions>, it
maybe improve this feature at a small cost.
modify code design:
step1: Maybe other Processors would support the <Support Fragmented
Transactions>(such as PutDatabaseRecord), we should move the <Support
Fragmented Transactions> from PutSQL.java to Put.java( I think it's a rational
design that `Put.java` define the <Support Fragmented Transactions> property )
{code:java}
public static final PropertyDescriptor SUPPORT_TRANSACTIONS = new
PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
...
{code}
step2: in Put.java `onTrigger` method, after the `putFlowFiles` and before the
`onCompleted.apply`, we try to get the value of <Rollback On Failure>, if true
, check the `transferredFlowFiles` , if there are flowfiles don't route to
`Success`, we should reroute these `transferredFlowFiles`(retry > failure),and
do `onFailed`(if it's not null)
{code:java}
try {
putFlowFiles(context, session, functionContext, connection,
flowFiles, result);
} catch (DiscontinuedException e) {
// Whether it was an error or semi normal is depends on the
implementation and reason why it wanted to discontinue.
// So, no logging is needed here.
}
...
if(context.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){
//TODO do sth
}
// OnCompleted processing.
if (onCompleted != null) {
onCompleted.apply(context, session, functionContext,
connection);
}
// Transfer FlowFiles.
transferFlowFiles.apply(context, session, functionContext,
result);
{code}
step3 Additionally, I think the Put.java can extract the RelationShips of the
processors those use the Put.java(PutSQL PutDatabaseRecord, PutHiveQL...We can
see that these processors who use the Put.java have the same Relationships, I
this this is the `Put`'s common feature)
{code:java}
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the
database is successfully updated")
.build();
static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the
database cannot be updated and retrying the operation will also fail, "
+ "such as an invalid query or an integrity constraint
violation")
.build();
{code}
was:
PutSQL processor support <Support Fragmented Transactions>,if we set this
property true, I think it means The PutSQL processor will excute these sqls of
one transaction Transactionally!!
But we find that when we set the <Rollback On Failure> false, those sqls of one
transaction do not excute transactionally,some sucess and some failure. I think
it's wrong.
I think, if we set <Support Fragmented Transactions> true, it should be
executed Transactionally, no matter <Rollback On Failure> is true or false.
I see the code, only PutSQL has the <Support Fragmented Transactions>, it
maybe improve this feature at a small cost.
modify code design:
step1: at account of that maybe Other Processors support the <Support
Fragmented Transactions>(such as PutDatabaseRecord), we should move the
<Support Fragmented Transactions> from PutSQL.java to Put.java
{code:java}
public static final PropertyDescriptor SUPPORT_TRANSACTIONS = new
PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
...
{code}
step2: in Put.java onTrigger, after the `putFlowFiles` and before the
`onCompleted.apply`, we try to get the value of <Rollback On Failure>, if true
, check the `transferredFlowFiles` , if there are flowfiles don't route to
`Success`, we should reroute these `transferredFlowFiles`(retry > failure),and
do `onFailed`(if it's not null)
{code:java}
try {
putFlowFiles(context, session, functionContext, connection,
flowFiles, result);
} catch (DiscontinuedException e) {
// Whether it was an error or semi normal is depends on the
implementation and reason why it wanted to discontinue.
// So, no logging is needed here.
}
...
if(context.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){
//TODO do sth
}
// OnCompleted processing.
if (onCompleted != null) {
onCompleted.apply(context, session, functionContext,
connection);
}
// Transfer FlowFiles.
transferFlowFiles.apply(context, session, functionContext,
result);
{code}
step3 Additionally, I think the Put.java can extract the RelationShips of the
processors those use the Put.java(PutSQL PutDatabaseRecord, PutHiveQL...We can
see that these processors who use the Put.java have the same Relationships, I
this this is the `Put`'s common feature)
{code:java}
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the
database is successfully updated")
.build();
static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the
database cannot be updated and retrying the operation will also fail, "
+ "such as an invalid query or an integrity constraint
violation")
.build();
{code}
> Put.java improvement(PutSQL's transactions support)
> ---------------------------------------------------
>
> Key: NIFI-7403
> URL: https://issues.apache.org/jira/browse/NIFI-7403
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 1.11.4
> Reporter: ZhangCheng
> Priority: Major
>
> PutSQL processor support <Support Fragmented Transactions>,if we set this
> property true, I think it means The PutSQL processor will excute these sqls
> of one transaction Transactionally!!
> But we find that when we set the <Rollback On Failure> false, those sqls of
> one transaction do not excute transactionally,some sucess and some failure. I
> think it's wrong.
> I think, if we set <Support Fragmented Transactions> true, it should be
> executed Transactionally, no matter <Rollback On Failure> is true or false.
> I see the code, only PutSQL has the <Support Fragmented Transactions>, it
> maybe improve this feature at a small cost.
> modify code design:
> step1: Maybe other Processors would support the <Support Fragmented
> Transactions>(such as PutDatabaseRecord), we should move the <Support
> Fragmented Transactions> from PutSQL.java to Put.java( I think it's a
> rational design that `Put.java` define the <Support Fragmented Transactions>
> property )
> {code:java}
> public static final PropertyDescriptor SUPPORT_TRANSACTIONS = new
> PropertyDescriptor.Builder()
> .name("Support Fragmented Transactions")
> ...
> {code}
> step2: in Put.java `onTrigger` method, after the `putFlowFiles` and before
> the `onCompleted.apply`, we try to get the value of <Rollback On Failure>, if
> true , check the `transferredFlowFiles` , if there are flowfiles don't route
> to `Success`, we should reroute these `transferredFlowFiles`(retry >
> failure),and do `onFailed`(if it's not null)
> {code:java}
> try {
> putFlowFiles(context, session, functionContext,
> connection, flowFiles, result);
> } catch (DiscontinuedException e) {
> // Whether it was an error or semi normal is depends on
> the implementation and reason why it wanted to discontinue.
> // So, no logging is needed here.
> }
> ...
>
> if(context.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){
> //TODO do sth
> }
> // OnCompleted processing.
> if (onCompleted != null) {
> onCompleted.apply(context, session, functionContext,
> connection);
> }
> // Transfer FlowFiles.
> transferFlowFiles.apply(context, session, functionContext,
> result);
> {code}
> step3 Additionally, I think the Put.java can extract the RelationShips of the
> processors those use the Put.java(PutSQL PutDatabaseRecord, PutHiveQL...We
> can see that these processors who use the Put.java have the same
> Relationships, I this this is the `Put`'s common feature)
> {code:java}
> static final Relationship REL_SUCCESS = new Relationship.Builder()
> .name("success")
> .description("A FlowFile is routed to this relationship after the
> database is successfully updated")
> .build();
> static final Relationship REL_RETRY = new Relationship.Builder()
> .name("retry")
> .description("A FlowFile is routed to this relationship if the
> database cannot be updated but attempting the operation again may succeed")
> .build();
> static final Relationship REL_FAILURE = new Relationship.Builder()
> .name("failure")
> .description("A FlowFile is routed to this relationship if the
> database cannot be updated and retrying the operation will also fail, "
> + "such as an invalid query or an integrity constraint
> violation")
> .build();
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)