w4rdy opened a new issue, #1506:
URL: https://github.com/apache/camel-kafka-connector/issues/1506
Hi @oscerd & @orpiske,
I'm trying to insert a `timestamp` value into one of my Cassandra columns as
an epoch millisecond Long. This works when `prepareStatements` is set to false,
however I'm receiving a codec not found error when it's set to true. Here is
the error log:
```
2023-02-01 17:42:31,259 ERROR [my-topic-sink|task-0] Error encountered in
task my-topic-sink-0. Executing stage 'TASK_PUT' with class
'org.apache.kafka.connect.sink.SinkTask'.
(org.apache.kafka.connect.runtime.errors.LogReporter)
[task-thread-my-topic-sink-0]
com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec
not found for requested operation: [TIMESTAMP <-> java.lang.Long]
at
com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:609)
at
com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95)
at
com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957)
at
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963)
at
com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117)
at
com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:258)
at
com.datastax.oss.driver.internal.core.data.ValuesHelper.encodePreparedValues(ValuesHelper.java:112)
at
com.datastax.oss.driver.internal.core.cql.DefaultPreparedStatement.bind(DefaultPreparedStatement.java:159)
at
org.apache.camel.component.cassandra.CassandraProducer.executePreparedStatement(CassandraProducer.java:132)
at
org.apache.camel.component.cassandra.CassandraProducer.execute(CassandraProducer.java:104)
at
org.apache.camel.component.cassandra.CassandraProducer.process(CassandraProducer.java:172)
at
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
at
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
at
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214)
at
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
at
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
at
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199)
at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
at
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
```
After some digging, I assume I somehow need to register the
`TIMESTAMP_MILLIS_UTC` codec, found within
[ExtraTypeCodecs](https://docs.datastax.com/en/drivers/java/4.9/com/datastax/oss/driver/api/core/type/codec/ExtraTypeCodecs.html).
How can this be done / is there an alternative to inserting timestamps via the
connector. I've also tried using the built-in `TimestampConverter` SMT, to
convert the epoch millis to a timestamp string and parsing it that way -
however this returns a similar error: `Codec not found for requested operation:
[TIMESTAMP <-> java.lang.String]`. It seems that a codec is also required for
this? Any hints would be greatly appreciated.
---
For reference, here is the table I'm trying to insert to:
```cql
USE connect;
CREATE TABLE person (
id TEXT PRIMARY KEY,
name TEXT,
created_at TIMESTAMP
);
```
The connector properties:
```properties
name=my-topic-sink
topics=my-topic
tasks.max=1
connector.class=org.apache.camel.kafkaconnector.cassandrasink.CamelCassandrasinkSinkConnector
value.converter=org.apache.kafka.connect.storage.StringConverter
camel.kamelet.cassandra-sink.connectionHost=cassandra
camel.kamelet.cassandra-sink.connectionPort=9042
camel.kamelet.cassandra-sink.keyspace=connect
camel.kamelet.cassandra-sink.query=insert into person (id, name, created_at)
values (?, ?, ?)
camel.kamelet.cassandra-sink.prepareStatements=true
```
And the message that is being consumed from Kafka:
```
["1","John",1670428382089]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]