Obi Tetsuya created FLINK-10478: ----------------------------------- Summary: Kafka Producer wrongly formats "%" for transaction ID Key: FLINK-10478 URL: https://issues.apache.org/jira/browse/FLINK-10478 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2 Environment: Flink 1.4.2
Scala 2.11.12 jdk1.8.0_162 Running on local embedded Flink mini cluster (This happened on a standalone cluster with another code) Reporter: Obi Tetsuya Kafka Producer with exactly-once feature uses its task name for a transaction ID. Because the Producer uses the name as a format string directly, in the case it contains "%" the job fails. Code to reproduce: {code:scala} object ExampleRunner { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.enableCheckpointing(1000) env.getConfig.disableSysoutLogging() env.setRestartStrategy(RestartStrategies.noRestart) val p = new java.util.Properties Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" -> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) } env .fromCollection(100 to 200) .map(_.toString) .addSink(new FlinkKafkaProducer011( "test", new KeyedSerializationSchemaWrapper(new SimpleStringSchema), p, Semantic.EXACTLY_ONCE)).name("100%") env.execute() } } {code} Raised exception: {code} 2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO o.a.flink.runtime.taskmanager.Task - Map -> Sink: 100% (1/8) (25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED. java.util.MissingFormatWidthException: %-% at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040) at java.util.Formatter$FormatSpecifier.<init>(Formatter.java:2733) at java.util.Formatter.parse(Formatter.java:2560) at java.util.Formatter.format(Formatter.java:2501) at java.util.Formatter.format(Formatter.java:2455) at java.lang.String.format(String.java:2940) at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91) at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72) at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)