[ 
https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045510#comment-17045510
 ] 

Jürgen Kreileder commented on FLINK-16262:
------------------------------------------

Hi [~maguowei],

1. We're using the universal connector, i.e. flink-connector-kafka_2.12

2. 
{code:java}
flink@2194d2cf5949:~$ ls -al lib usrlib
lib:
total 105236
drwxr-xr-x 2 flink flink      4096 Feb 24 13:47 .
drwxr-xr-x 1 flink flink      4096 Feb 25 12:35 ..
-rw-r--r-- 1 flink flink 101157478 Feb 12 22:39 
flink-dist_2.12-1.10.0-empolis.jar
-rw-r--r-- 1 flink flink   1051320 Feb 12 22:36 
flink-metrics-influxdb-1.10.0-empolis.jar
-rw-r--r-- 1 flink flink    489884 Feb 12 21:52 log4j-1.2.17.jar
-rw-r--r-- 1 flink flink   4210517 Feb  9 10:45 ojdbc8-19.3.0.0.jar
-rw-r--r-- 1 flink flink    825943 Feb  9 10:45 postgresql-42.2.5.jar
-rw-r--r-- 1 flink flink      9931 Feb 12 21:52 slf4j-log4j12-1.7.15.jar


usrlib:
total 40176
drwxr-xr-x 2 flink flink     4096 Feb 25 12:35 .
drwxr-xr-x 1 flink flink     4096 Feb 25 12:35 ..
-rw-r--r-- 1 flink flink 41129331 Feb 25 12:35 iana-stack.jar{code}
flink-1.10.0-empolis is a Java 11/Scala 2.12 build.
 iana-stack.jar is an uber JAR which has flink-connector-kafka_2.12 and 
kafka-clients:2.2.0 included.
{code:java}
[info]   | +-org.apache.flink:flink-connector-kafka-base_2.12:1.10.0-empolis
[info]   | | +-com.google.code.findbugs:jsr305:1.3.9
[info]   | | +-org.apache.flink:force-shading:1.10.0-empolis
[info]   | | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | 
[info]   | +-org.apache.flink:force-shading:1.10.0-empolis
[info]   | +-org.apache.kafka:kafka-clients:2.2.0
[info]   | | +-com.github.luben:zstd-jni:1.3.8-1
[info]   | | +-org.lz4:lz4-java:1.5.0
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | +-org.xerial.snappy:snappy-java:1.1.7.2
{code}
3. Yes, we're use a customized docker entrypoint based on 
[https://github.com/docker-flink/docker-flink/blob/master/docker-entrypoint.sh].
 The only changes are env-variable based modifications of 
conf/log4j-console.properties and generation of savepoint options.

 

 

> Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib 
> directory
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-16262
>                 URL: https://issues.apache.org/jira/browse/FLINK-16262
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Deployment / Docker
>    Affects Versions: 1.10.0
>         Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 
> build (nothing changed regarding Kafka and/or class loading).
>            Reporter: Jürgen Kreileder
>            Priority: Blocker
>             Fix For: 1.10.1, 1.11.0
>
>
> We're using Docker images modeled after 
> [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile]
>  (using Java 11)
> When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the 
> taskmanager startup fails with:
> {code:java}
> 2020-02-24 18:25:16.389 INFO  o.a.f.r.t.Task                           Create 
> Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) 
> (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED.
> org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
>  at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown 
> Source)
>  at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown 
> Source)
>  at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
>  at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
>  at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>  at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>  at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
> Source){code}
> This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR 
> instead of FLINK_USR_LIB_DIR, everything works fine.
> (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to