Hi Martin,
I'm using the default but I also tried with the other options and nothing
changed. In any case I reported the issue and it seems that we found the
root cause.

https://issues.apache.org/jira/browse/KAFKA-19758

Regards,
Mario.


On Tue, Oct 7, 2025 at 2:36 PM Martin Andersson <[email protected]>
wrote:

> Hello Mario.
> I didn't look into this in detail, but are you using the new plugin
> discovery mechanism?
> https://kafka.apache.org/documentation/#connectconfigs_plugin.discovery
>
> Regards,
> ________________________________
> From: Mario Fiore Vitale <[email protected]>
> Sent: Friday, October 3, 2025 17:14
> To: [email protected] <[email protected]>
> Subject: Weird behavior on Kafka Connect class loading
>
> You don't often get email from [email protected]. Learn why
> this is important<https://aka.ms/LearnAboutSenderIdentification>
>
> EXTERNAL SENDER. Do not click links or open attachments unless you
> recognize the sender and know the content is safe. DO NOT provide your
> username or password.
>
>
> Hi all,
>
> I have the DebeziumOpenLineageEmitter[1] class in the
> debezium-openlineage-api that internally has a static map to maintain the
> registered emitter, the key of this map is "connectoLogicalName-taskid"
> Then there is the OpenLineage[2] SMT, which is part of the Debezium core.
> In this SMT, I simply pass the same context to instantiate the same emitter
> via the connector.
>
> Now I'm running the following image
>
> FROM quay.io/debezium/connect:3.3.0.Final<
> http://quay.io/debezium/connect:3.3.0.Final>
>
> ENV MAVEN_REPO="https://repo1.maven.org/maven2";
> ENV GROUP_ID="io/debezium"
> ENV DEBEZIUM_VERSION="3.3.0.Final"
> ENV ARTIFACT_ID="debezium-openlineage-core"
> ENV CLASSIFIER="-libs"
>
> COPY log4j.properties /kafka/config/log4j.properties
>
> # Add OpenLineage
> RUN mkdir -p /tmp/openlineage-libs && \
>     curl
> "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
> -o /tmp/debezium-openlineage-core-libs.tar.gz && \
>     tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C
> /tmp/openlineage-libs --strip-components=1
>
> RUN cp -r /tmp/openlineage-libs/*
> /kafka/connect/debezium-connector-postgres/
> RUN cp -r /tmp/openlineage-libs/*
> /kafka/connect/debezium-connector-mongodb/
> ADD openlineage.yml /kafka/
>
> So is practically debezium connect image with just openlineage jars copied
> into postgres and mongodb connector folders.
>
> When I register the PostgreSQL connector
>
> {
>   "name": "inventory-connector-postgres",
>   "config": {
>     "connector.class":
> "io.debezium.connector.postgresql.PostgresConnector",
>     "tasks.max": "1",
>     "database.hostname": "postgres",
>     "database.port": "5432",
>     "database.user": "postgres",
>     "database.password": "postgres",
>     "database.server.id<http://database.server.id/>": "184054",
>     "database.dbname": "postgres",
>     "topic.prefix": "inventory",
>     "snapshot.mode": "initial",
>     "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
>     "schema.history.internal.kafka.topic": "schema-changes.inventory",
>     "slot.name<http://slot.name/>": "postgres",
>     "openlineage.integration.enabled": "true",
>     "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
>     "openlineage.integration.job.description": "This connector does cdc
> for products",
>     "openlineage.integration.tags": "env=prod,team=cdc",
>     "openlineage.integration.owners": "Mario=maintainer,John Doe=Data
> scientist,IronMan=superero",
>     "transforms": "openlineage",
>     "transforms.openlineage.type":
> "io.debezium.transforms.openlineage.OpenLineage"
>   }
> }
>
> I get the following error
>
> 2025-10-03T14:22:09,761 ERROR  ||
> WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught
> and unrecoverable exception. Task is being killed and will not recover
> until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in
> error handler
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> ~[?:?]
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ~[?:?]
>     at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not
> initialized for connector ConnectorContext[connectorLogicalName=inventory,
> connectorName=postgresql, taskId=0, version=null, config=null]. Call init()
> first.
>     at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at
> io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74)
> ~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
>     at
> org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
> ~[connect-runtime-4.1.0.jar:?]
>     ... 13 more
>
> Full log attached
>
> This is evidence that the emitters map is not shared between the connector
> and the SMT.
>
> The situation becomes weirder if I remove all connectors from the image
> except PostgreSQL and MongoDB.
> In that case, the PostgreSQL connector works perfectly.
>
> The plugins are in the folder /kafka/connect (that is, the only
> plugin.path configured folder), each under a dedicated folder with their
> dependencies.
>
> I then started to add more connectors, and it continued to work until I
> added the SQL Server connector.
> To summarize, the problem arises when I put one or all of [sqlserver,
> spanner,vitess].
>
> Am I correct that Kafka Connect guarantees that each connector is loaded
> with an isolated class loader with its dependencies so that the static
> emitters should be shared between the Connector and the SMT?
>
> To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all
> connectors, it works fine.
>
> Any help is very appreciated.
>
> [1]
> https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java
> [2]
> https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java
> CONFIDENTIALITY NOTICE: This email message (and any attachment) is
> intended only for the individual or entity to which it is addressed. The
> information in this email is confidential and may contain information that
> is legally privileged or exempt from disclosure under applicable law. If
> you are not the intended recipient, you are strictly prohibited from
> reading, using, publishing or disseminating such information and upon
> receipt, must permanently delete the original and destroy any copies. We
> take steps to protect against viruses and other defects but advise you to
> carry out your own checks and precautions as Kambi does not accept any
> liability for any which remain. Thank you for your co-operation.
>


-- 

Mario Fiore Vitale

Senior Software Engineer

Red Hat <https://www.redhat.com/>
<https://www.redhat.com/>

Reply via email to