Hi Martin, I'm using the default but I also tried with the other options and nothing changed. In any case I reported the issue and it seems that we found the root cause.
https://issues.apache.org/jira/browse/KAFKA-19758 Regards, Mario. On Tue, Oct 7, 2025 at 2:36 PM Martin Andersson <[email protected]> wrote: > Hello Mario. > I didn't look into this in detail, but are you using the new plugin > discovery mechanism? > https://kafka.apache.org/documentation/#connectconfigs_plugin.discovery > > Regards, > ________________________________ > From: Mario Fiore Vitale <[email protected]> > Sent: Friday, October 3, 2025 17:14 > To: [email protected] <[email protected]> > Subject: Weird behavior on Kafka Connect class loading > > You don't often get email from [email protected]. Learn why > this is important<https://aka.ms/LearnAboutSenderIdentification> > > EXTERNAL SENDER. Do not click links or open attachments unless you > recognize the sender and know the content is safe. DO NOT provide your > username or password. > > > Hi all, > > I have the DebeziumOpenLineageEmitter[1] class in the > debezium-openlineage-api that internally has a static map to maintain the > registered emitter, the key of this map is "connectoLogicalName-taskid" > Then there is the OpenLineage[2] SMT, which is part of the Debezium core. > In this SMT, I simply pass the same context to instantiate the same emitter > via the connector. > > Now I'm running the following image > > FROM quay.io/debezium/connect:3.3.0.Final< > http://quay.io/debezium/connect:3.3.0.Final> > > ENV MAVEN_REPO="https://repo1.maven.org/maven2" > ENV GROUP_ID="io/debezium" > ENV DEBEZIUM_VERSION="3.3.0.Final" > ENV ARTIFACT_ID="debezium-openlineage-core" > ENV CLASSIFIER="-libs" > > COPY log4j.properties /kafka/config/log4j.properties > > # Add OpenLineage > RUN mkdir -p /tmp/openlineage-libs && \ > curl > "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz" > -o /tmp/debezium-openlineage-core-libs.tar.gz && \ > tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C > /tmp/openlineage-libs --strip-components=1 > > RUN cp -r /tmp/openlineage-libs/* > /kafka/connect/debezium-connector-postgres/ > RUN cp -r /tmp/openlineage-libs/* > /kafka/connect/debezium-connector-mongodb/ > ADD openlineage.yml /kafka/ > > So is practically debezium connect image with just openlineage jars copied > into postgres and mongodb connector folders. > > When I register the PostgreSQL connector > > { > "name": "inventory-connector-postgres", > "config": { > "connector.class": > "io.debezium.connector.postgresql.PostgresConnector", > "tasks.max": "1", > "database.hostname": "postgres", > "database.port": "5432", > "database.user": "postgres", > "database.password": "postgres", > "database.server.id<http://database.server.id/>": "184054", > "database.dbname": "postgres", > "topic.prefix": "inventory", > "snapshot.mode": "initial", > "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", > "schema.history.internal.kafka.topic": "schema-changes.inventory", > "slot.name<http://slot.name/>": "postgres", > "openlineage.integration.enabled": "true", > "openlineage.integration.config.file.path": "/kafka/openlineage.yml", > "openlineage.integration.job.description": "This connector does cdc > for products", > "openlineage.integration.tags": "env=prod,team=cdc", > "openlineage.integration.owners": "Mario=maintainer,John Doe=Data > scientist,IronMan=superero", > "transforms": "openlineage", > "transforms.openlineage.type": > "io.debezium.transforms.openlineage.OpenLineage" > } > } > > I get the following error > > 2025-10-03T14:22:09,761 ERROR || > WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught > and unrecoverable exception. Task is being killed and will not recover > until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in > error handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254) > ~[connect-runtime-4.1.0.jar:?] > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) > ~[?:?] > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) > ~[?:?] > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > ~[?:?] > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > ~[?:?] > at java.base/java.lang.Thread.run(Thread.java:1583) [?:?] > Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not > initialized for connector ConnectorContext[connectorLogicalName=inventory, > connectorName=postgresql, taskId=0, version=null, config=null]. Call init() > first. > at > io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176) > ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final] > at > io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153) > ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final] > at > io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74) > ~[debezium-core-3.3.0.Final.jar:3.3.0.Final] > at > org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208) > ~[connect-runtime-4.1.0.jar:?] > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244) > ~[connect-runtime-4.1.0.jar:?] > ... 13 more > > Full log attached > > This is evidence that the emitters map is not shared between the connector > and the SMT. > > The situation becomes weirder if I remove all connectors from the image > except PostgreSQL and MongoDB. > In that case, the PostgreSQL connector works perfectly. > > The plugins are in the folder /kafka/connect (that is, the only > plugin.path configured folder), each under a dedicated folder with their > dependencies. > > I then started to add more connectors, and it continued to work until I > added the SQL Server connector. > To summarize, the problem arises when I put one or all of [sqlserver, > spanner,vitess]. > > Am I correct that Kafka Connect guarantees that each connector is loaded > with an isolated class loader with its dependencies so that the static > emitters should be shared between the Connector and the SMT? > > To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all > connectors, it works fine. > > Any help is very appreciated. > > [1] > https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java > [2] > https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java > CONFIDENTIALITY NOTICE: This email message (and any attachment) is > intended only for the individual or entity to which it is addressed. The > information in this email is confidential and may contain information that > is legally privileged or exempt from disclosure under applicable law. If > you are not the intended recipient, you are strictly prohibited from > reading, using, publishing or disseminating such information and upon > receipt, must permanently delete the original and destroy any copies. We > take steps to protect against viruses and other defects but advise you to > carry out your own checks and precautions as Kambi does not accept any > liability for any which remain. Thank you for your co-operation. > -- Mario Fiore Vitale Senior Software Engineer Red Hat <https://www.redhat.com/> <https://www.redhat.com/>
