Hi all,

I have the DebeziumOpenLineageEmitter[1] class in the
debezium-openlineage-api that internally has a static map to maintain the
registered emitter, the key of this map is "connectoLogicalName-taskid"
Then there is the OpenLineage[2] SMT, which is part of the Debezium core.
In this SMT, I simply pass the same context to instantiate the same emitter
via the connector.

Now I'm running the following image

FROM quay.io/debezium/connect:3.3.0.Final
>
> ENV MAVEN_REPO="https://repo1.maven.org/maven2";
> ENV GROUP_ID="io/debezium"
> ENV DEBEZIUM_VERSION="3.3.0.Final"
> ENV ARTIFACT_ID="debezium-openlineage-core"
> ENV CLASSIFIER="-libs"
>
> COPY log4j.properties /kafka/config/log4j.properties
>
> # Add OpenLineage
> RUN mkdir -p /tmp/openlineage-libs && \
>     curl
> "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
> -o /tmp/debezium-openlineage-core-libs.tar.gz && \
>     tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C
> /tmp/openlineage-libs --strip-components=1
>
> RUN cp -r /tmp/openlineage-libs/*
> /kafka/connect/debezium-connector-postgres/
> RUN cp -r /tmp/openlineage-libs/*
> /kafka/connect/debezium-connector-mongodb/
> ADD openlineage.yml /kafka/


So is practically debezium connect image with just openlineage jars copied
into postgres and mongodb connector folders.

When I register the PostgreSQL connector

{
>   "name": "inventory-connector-postgres",
>   "config": {
>     "connector.class":
> "io.debezium.connector.postgresql.PostgresConnector",
>     "tasks.max": "1",
>     "database.hostname": "postgres",
>     "database.port": "5432",
>     "database.user": "postgres",
>     "database.password": "postgres",
>     "database.server.id": "184054",
>     "database.dbname": "postgres",
>     "topic.prefix": "inventory",
>     "snapshot.mode": "initial",
>     "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
>     "schema.history.internal.kafka.topic": "schema-changes.inventory",
>     "slot.name": "postgres",
>     "openlineage.integration.enabled": "true",
>     "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
>     "openlineage.integration.job.description": "This connector does cdc
> for products",
>     "openlineage.integration.tags": "env=prod,team=cdc",
>     "openlineage.integration.owners": "Mario=maintainer,John Doe=Data
> scientist,IronMan=superero",
>     "transforms": "openlineage",
>     "transforms.openlineage.type":
> "io.debezium.transforms.openlineage.OpenLineage"
>   }
> }


I get the following error

2025-10-03T14:22:09,761 ERROR  ||
>  WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught
> and unrecoverable exception. Task is being killed and will not recover
> until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in
> error handler
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> ~[?:?]
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ~[?:?]
>     at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not
> initialized for connector ConnectorContext[connectorLogicalName=inventory,
> connectorName=postgresql, taskId=0, version=null, config=null]. Call init()
> first.
>     at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at
> io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74)
> ~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
>     at
> org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
> ~[connect-runtime-4.1.0.jar:?]
>     at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
> ~[connect-runtime-4.1.0.jar:?]
>     ... 13 more


Full log attached

This is evidence that the emitters map is not shared between the connector
and the SMT.

The situation becomes weirder if I remove all connectors from the image
except PostgreSQL and MongoDB.
In that case, the PostgreSQL connector works perfectly.

The plugins are in the folder /kafka/connect (that is, the only plugin.path
configured folder), each under a dedicated folder with their dependencies.

I then started to add more connectors, and it continued to work until I
added the SQL Server connector.
To summarize, the problem arises when I put one or all of [sqlserver,
spanner,vitess].

Am I correct that Kafka Connect guarantees that each connector is loaded
with an isolated class loader with its dependencies so that the static
emitters should be shared between the Connector and the SMT?

To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all
connectors, it works fine.

Any help is very appreciated.

[1]
https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java
[2]
https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java

Reply via email to