linyu003 commented on issue #4247:
URL:
https://github.com/apache/incubator-seatunnel/issues/4247#issuecomment-1492074387
I find more problems while trying to solve this issue.
# Reproduce the Problems
seatunnel
branch: dev
revision number: b8c6bbd1e6dea6462d419d6c7adf20f04d7a2430
add code to seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
```pom
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-kafka</artifactId>
<version>${project.version}</version>
</dependency>
```
create file
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_kafka.conf
```txt
env {
execution.parallelism = 1
job.mode = "STREAMING"
execution.checkpoint.interval = 10000
}
source {
FakeSource {
parallelism = 1
result_table_name = "fake"
split.read-interval = 1000
# split.num = 60
row.num = 2
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
transform {}
sink{
kafka {
topic = "test01"
bootstrap.servers = "localhost:9092"
semantics = EXACTLY_ONCE
}
}
```
build module seatunnel-examples in idea and run
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java.main()
with first arg: /examples/fake_to_kafka.conf
wait less than one minute, you can see the problem.
A sample running log.
[issue4247.log](https://github.com/apache/incubator-seatunnel/files/11122843/issue4247.log)
# Problems
talking about KafkaTransactionSender, not KafkaNoTransactionSender.
## Problem 1
line 913 in log: 2023-03-31 20:02:25,834 INFO
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer
clientId=producer-SeaTunnel5627-1, transactionalId=SeaTunnel5627-1] Transiting
to fatal error state due to
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted
a transactional operation in an invalid state.
### reason
where a checkpoint is triggerred, KafkaSinkWriter#snapshotState is called,
in which producer.close() is called before a new producer is created.
( note: i found that producer.close() is new added code, in
KafkaTransactionSender#getTransactionProducer)
Producer.close() method implementation will try close gracefully, which
aborts the current transaction. When commiter try to commit the transaction
later, an error occurred beacause it is already aborted.
### solution
call kafkaProducer.close(Duration.ZERO) instead of kafkaProducer.close()
the former one will close immediately without close current transaction.
## Problem 2
At the end of the log file, you can see
```log
2023-03-31 20:02:57,291 DEBUG
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender -
Abort kafka transaction: SeaTunnel5627-2
2023-03-31 20:02:57,295 DEBUG
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender -
Abort kafka transaction: SeaTunnel5627-3
2023-03-31 20:02:57,296 DEBUG
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender -
Abort kafka transaction: SeaTunnel5627-4
2023-03-31 20:02:57,298 DEBUG
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender -
Abort kafka transaction: SeaTunnel5627-5
2023-03-31 20:02:57,299 DEBUG
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender -
Abort kafka transaction: SeaTunnel5627-6
2023-03-31 20:02:57,301 DEBUG
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender -
Abort kafka transaction: SeaTunnel5627-7
```
it is a dead lock.
### reason
In KafkaTransactionSender#abortTransaction(long), it try to abort all
transactions which id >= checkpointId.
It will not stop until epoch is 0. However,the epoch in transaction-manager
will not be updated even the transactionalId is set to another value, so a dead
lock will occur when the epoch is not 0 initially.
### solution
Since beginning a transaction will fence all transaction before, aborting
all transactions is not necessary.
And the KafkaTransactionSender#abortTransaction(long) just call
producer.flush() ,without calling producer.abortTransaction().
I remove the for-loop and call producer.abortTransaction() instead of
producer.flush()
## Problem 3
After fixing problem 1 and problem 2, rerun the example-application, you
will get the problem mentioned in this issue initially by @lightzhao.
As I said before:
> It seems that the KafkaSinkCommitter doesnot work well if the transaction
is empty (meaning that the trasaction has no record to commit).
>A empty transaction is a special case, so I want to just commit the
transaction in KafkaSinkWriter.snapshotState, instead of working hard to make
KafkaSinkCommitter compatitive with it.
I will make my pr later
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]