Mario Fiore Vitale created KAFKA-19758:
------------------------------------------

             Summary: Weird behavior on Kafka Connect 4.1 class loading
                 Key: KAFKA-19758
                 URL: https://issues.apache.org/jira/browse/KAFKA-19758
             Project: Kafka
          Issue Type: Bug
          Components: connect
    Affects Versions: 4.1.0
            Reporter: Mario Fiore Vitale
         Attachments: connect-service.log

I have the 
[DebeziumOpenLineageEmitter|[https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-|https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java]]
 class in the *debezium-openlineage-api* that internally has a static map to 
maintain the registered emitter, the key of this map is 
"connectoLogicalName-taskid"
Then there is the [OpenLineage 
SMT|[https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java]],
 which is part of the *debezium-core.* In this SMT, I simply pass the same 
context to instantiate the same emitter via the connector.

Now I'm running the following image


{quote}FROM 
[quay.io/debezium/connect:3.3.0.Final|http://quay.io/debezium/connect:3.3.0.Final]

ENV MAVEN_REPO="[https://repo1.maven.org/maven2]";
ENV GROUP_ID="io/debezium"
ENV DEBEZIUM_VERSION="3.3.0.Final"
ENV ARTIFACT_ID="debezium-openlineage-core"
ENV CLASSIFIER="-libs"

COPY log4j.properties /kafka/config/log4j.properties

# Add OpenLineage
RUN mkdir -p /tmp/openlineage-libs && \
    curl 
"$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
 -o /tmp/debezium-openlineage-core-libs.tar.gz && \
    tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C 
/tmp/openlineage-libs --strip-components=1

RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-postgres/
RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-mongodb/
ADD openlineage.yml /kafka/{quote}

So is practically debezium connect image with just openlineage jars copied into 
postgres and mongodb connector folders.

When I register the PostgreSQL connector


{quote}{
  "name": "inventory-connector-postgres",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "[database.server.id|http://database.server.id/]": "184054",
    "database.dbname": "postgres",
    "topic.prefix": "inventory",
    "snapshot.mode": "initial",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "[slot.name|http://slot.name/]": "postgres",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "This connector does cdc for 
products",
    "openlineage.integration.tags": "env=prod,team=cdc",
    "openlineage.integration.owners": "Mario=maintainer,John Doe=Data 
scientist,IronMan=superero",
    "transforms": "openlineage",
    "transforms.openlineage.type": 
"io.debezium.transforms.openlineage.OpenLineage"
  }
}{quote} 
I get the following error


{quote}2025-10-03T14:22:09,761 ERROR  ||  
WorkerSourceTask\{id=inventory-connector-postgres-0} Task threw an uncaught and 
unrecoverable exception. Task is being killed and will not recover until 
manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
handler
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
 ~[connect-runtime-4.1.0.jar:?]
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) 
~[connect-runtime-4.1.0.jar:?]
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) 
~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
 ~[connect-runtime-4.1.0.jar:?]
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
 ~[?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
 ~[?:?]
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
 ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not 
initialized for connector ConnectorContext[connectorLogicalName=inventory, 
connectorName=postgresql, taskId=0, version=null, config=null]. Call init() 
first.
    at 
io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
 ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
    at 
io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
 ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
    at 
io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74) 
~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
    at 
org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
 ~[connect-runtime-4.1.0.jar:?]
    ... 13 more{quote}
 
Full logs [^connect-service.log]


This is evidence that the emitters map is not shared between the connector and 
the SMT.

The situation becomes weirder if I remove all connectors from the image except 
PostgreSQL and MongoDB.
In that case, the PostgreSQL connector works perfectly.

The plugins are in the folder */kafka/connect* (that is, the only `plugin.path` 
configured folder), each under a dedicated folder with their dependencies. 

I then started to add more connectors, and it continued to work until I added 
the SQL Server connector.
To summarize, the problem arises when I put one or all of [sqlserver, 
spanner,vitess].
 
The commonality for these connectors seems to be that they support multi-task. 
The others don't. 

Am I correct that Kafka Connect guarantees that each connector is loaded with 
an isolated class loader with its dependencies so that the static emitters 
should be shared between the Connector and the SMT?

To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all 
connectors, it works fine.


I did other tests, and things are more and more weird. All tests were done with 
*{{plugin.path=/kafka/connect}}* and *KC 4.1*

My original tests were with this directory structure

 
{code:java}
/kafka/connect
|___ debezium-connector-postgres
|___ debezium-connector-mongodb
|___ debezium-connector-sqlserver{code}
 

In this case, each connector should be isolated from each others (having a 
dedicated class loader). In that case, the sharing between the connector and 
SMT does not work for KC 4.0

Then I tried with

 
{code:java}
/kafka/connect
|___ debezium-connectors
     |___ debezium-connector-postgres
     |___ debezium-connector-mongodb
     |___ debezium-connector-sqlserver{code}
 

So all connectors are not isolated and share the same class loader. In this 
case, no issue. And I'll say that this is expected.

Then I tried with

 
{code:java}
/kafka/connect
|___ debezium-connectors
|    |___ debezium-connector-postgres
|    |___ debezium-connector-mongodb
|___ debezium-connector-sqlserver{code}
 

where *{{postgres}}* and *{{mongodb}}* are not isolated (same classloader) and 
*{{sqlserver}}* is isolated (different classloader), and in this case, it still 
works. I expected this to fail as with the first setup.

The SMT is in the *debezium-core* jar that and each connector has its own copy
So in each connector folder, there are:


{code:java}
debezium-api-3.3.0.Final.jar
debezium-common-3.3.0.Final.jar
debezium-connector-[connectorName]-3.3.0.Final.jar
debezium-core-3.3.0.Final.jar
debezium-openlineage-api-3.3.0.Final.jar{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to