aljoscha commented on a change in pull request #14144:
URL: https://github.com/apache/flink/pull/14144#discussion_r528805396
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -145,6 +145,12 @@
private static final long serialVersionUID = 1L;
+ /**
+ * Number of characters to truncate the taskName to for the Kafka
transactionalId.
+ * The maximum this can possibly be set to is 32,767 - (length of
operatorUniqueId).
+ */
+ private static final short maxTaskNameSize = 10_000;
Review comment:
Yes, I think even a string length of 10k is excessive. Let's maybe limit
it to 1k?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -1084,8 +1090,14 @@ public void
initializeState(FunctionInitializationContext context) throws Except
migrateNextTransactionalIdHindState(context);
}
+ String taskName = getRuntimeContext().getTaskName();
+ // Kafka transactional IDs are limited in length to be less
than the max value of a short,
+ // so we truncate here if necessary to a more reasonable length
string.
+ if (taskName.length() > maxTaskNameSize) {
+ taskName = taskName.substring(0, maxTaskNameSize);
Review comment:
```suggestion
taskName = taskName.substring(0, maxTaskNameSize);
LOG.warn("Truncated task name for Kafka TransactionalId
from {} to {}.", getRuntimeContext().getTaskName(), taskName);
```
I hope this compiles, just typed it out here in Github...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]