[
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253451#comment-17253451
]
Nazar Volynets commented on FLINK-17691:
----------------------------------------
Hi [~freezhan] & [~jmathews3773],
Looks still exists or/and I am missing something. Below are details.
Basically have bumped with two issues:
* first one directly related with this one
* second - indirectly related
*Issue #1*
+Regarding+
_>> Do md5 on the transactional.id prefix_
+Details+
Flink version:
// build.gradle
{code:java}
ext {
...
flinkVersion = '1.12.0'
scalaBinaryVersion = '2.11'
...
}
dependencies {
...
implementation
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
...
}{code}
// App
{code:java}
public static void main(String[] args) {
...
env.enableCheckpointing(10000);
env.setStateBackend(new
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
...
FlinkKafkaConsumer<Record> consumer = createConsumer(conf);
FlinkKafkaProducer<Record> producer = createProducer(conf);
env
.addSource(consumer)
.uid("kafka-consumer")
.addSink(producer)
.uid("kafka-producer")
;
env.execute();
}
public static FlinkKafkaProducer<Record> createProducer(Configuration conf) {
...
FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault,
new RecordKafkaSerSchema(true), props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
...
return producer;
}{code}
// Logs (of App executed/submitted locally from IDE)
{code:java}
// code placeholder
2020-12-22 13:52:08 [ForkJoinPool.commonPool-worker-9] INFO ProducerConfig:347
- ProducerConfig values:
...
transactional.id = Source: Custom Source -> Sink:
Unnamed-e2b2f358d45860e6d949c8f7417842d6-24
...{code}
+Summary+
As we can see transaction-id is not md5 as stated above (or I am missing
something). It looks that issue should be reopened as it is expected to be
fixed in 1.12.0.
*Issue #2*
+Regarding+
> 1. use the {color:#ff0000}taskName + "-" + operatorUniqueID{color} as
>transactional.id prefix (may be too long)
In reality `uid` specified after `source` & `sink` are ignored. But specifying
of them are highly recommended in Flink official documentation:
[https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#assigning-operator-ids]
Moreover as a workaround there is a possibility to specify `source` name & it
is NOT ignored.
But there is NO possibility provided by Flink Java API to specify `sink` name.
+Details+
// build.gradle
{code:java}
ext {
...
flinkVersion = '1.12.0'
scalaBinaryVersion = '2.11'
...
}
dependencies {
...
implementation
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
...
}{code}
// App - `uid` are ignored
{code:java}
public static void main(String[] args) {
...
env.enableCheckpointing(10000);
env.setStateBackend(new
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
...
FlinkKafkaConsumer<Record> consumer = createConsumer(conf);
FlinkKafkaProducer<Record> producer = createProducer(conf);
env
.addSource(consumer)
.uid("kafka-consumer") // is ignored
.addSink(producer)
.uid("kafka-producer") // is ignored
;
env.execute();
}
public static FlinkKafkaProducer<Record> createProducer(Configuration conf) {
...
FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault,
new RecordKafkaSerSchema(true), props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
...
return producer;
}{code}
// Logs (of App executed/submitted locally from IDE) - specify `source`/`sink`
names
{code:java}
2020-12-22 13:52:08 [Source: Custom Source -> Sink: Unnamed (1/1)#0] INFO
ProducerConfig:347 - ProducerConfig values:
...
transactional.id = Source: Custom Source -> Sink:
Unnamed-e2b2f358d45860e6d949c8f7417842d6-20
...{code}
// App - specify `source`/`sink` names
{code:java}
public static void main(String[] args) {
...
env.enableCheckpointing(10000);
env.setStateBackend(new
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
...
FlinkKafkaConsumer<Record> consumer = createConsumer(conf);
FlinkKafkaProducer<Record> producer = createProducer(conf);
env
.addSource(consumer, "kafka-consumer")
.addSink(producer) // NO way to specify name
;
env.execute();
}
public static FlinkKafkaProducer<Record> createProducer(Configuration conf) {
...
FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault,
new RecordKafkaSerSchema(true), props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
...
return producer;
}{code}
// Logs (of App executed/submitted locally from IDE) - specify `source`/`sink`
names
{code:java}
2020-12-22 13:52:08 [Source: kafka-consumer -> Sink: Unnamed (1/1)#0] INFO
ProducerConfig:347 - ProducerConfig values:
...
transactional.id = Source: kafka-consumer -> Sink:
Unnamed-e2b2f358d45860e6d949c8f7417842d6-20
...{code}
+Summary+
As we can see `operatorUniqueID` specified after `source` and/or `sink` are
ignored.
Moreover we can specify `source` name & it is taken into account but there is
no possibility to do the same for `sink` via Flink Java API.
Should I create new/separate Jira issue for this use case or it is expect
behaviour ?
> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -----------------------------------------------------------------------------
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.10.0, 1.11.0
> Reporter: freezhan
> Assignee: John Mathews
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-05-14-20-43-57-414.png,
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png,
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png,
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF0000}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the
> {color:#FF0000}transactional.id{color}, and the user - defined value are
> ignored.
>
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka {color:#FF0000}coordinator_key{color}
> limit
> !image-2020-05-14-21-09-01-906.png!
>
> *The flink Kafka Connector policy for automatic generation of transaction.id
> is as follows*
>
> 1. use the {color:#FF0000}taskName + "-" + operatorUniqueID{color} as
> transactional.id prefix (may be too long)
> getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism *
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
> !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>
> *The Kafka transaction.id check policy as follows:*
>
> {color:#FF0000}string bytes.length can't larger than Short.MAX_VALUE
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>
> *To reproduce this bug, the following conditions must be met:*
>
> # send msg to kafka with exactly once mode
> # the task TaskName' length + TaskName's length is lager than the 32767 (A
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>
> 1. Allows users to customize transactional.id 's prefix
> or
> 2. Do md5 on the prefix before returning the real transactional.id
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)