w4rdy opened a new issue, #1506:
URL: https://github.com/apache/camel-kafka-connector/issues/1506

   Hi @oscerd & @orpiske,
   
   I'm trying to insert a `timestamp` value into one of my Cassandra columns as 
an epoch millisecond Long. This works when `prepareStatements` is set to false, 
however I'm receiving a codec not found error when it's set to true. Here is 
the error log:
   ```
   2023-02-01 17:42:31,259 ERROR [my-topic-sink|task-0] Error encountered in 
task my-topic-sink-0. Executing stage 'TASK_PUT' with class 
'org.apache.kafka.connect.sink.SinkTask'. 
(org.apache.kafka.connect.runtime.errors.LogReporter) 
[task-thread-my-topic-sink-0]
   com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec 
not found for requested operation: [TIMESTAMP <-> java.lang.Long]
           at 
com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:609)
           at 
com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:95)
           at 
com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry$1.load(DefaultCodecRegistry.java:92)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2276)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.get(LocalCache.java:3951)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache.getOrLoad(LocalCache.java:3973)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957)
           at 
com.datastax.oss.driver.shaded.guava.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4963)
           at 
com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry.getCachedCodec(DefaultCodecRegistry.java:117)
           at 
com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.codecFor(CachingCodecRegistry.java:258)
           at 
com.datastax.oss.driver.internal.core.data.ValuesHelper.encodePreparedValues(ValuesHelper.java:112)
           at 
com.datastax.oss.driver.internal.core.cql.DefaultPreparedStatement.bind(DefaultPreparedStatement.java:159)
           at 
org.apache.camel.component.cassandra.CassandraProducer.executePreparedStatement(CassandraProducer.java:132)
           at 
org.apache.camel.component.cassandra.CassandraProducer.execute(CassandraProducer.java:104)
           at 
org.apache.camel.component.cassandra.CassandraProducer.process(CassandraProducer.java:172)
           at 
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
           at 
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
           at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
           at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:189)
           at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:61)
           at org.apache.camel.processor.Pipeline.process(Pipeline.java:182)
           at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
           at 
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
           at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214)
           at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
           at 
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
           at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
           at 
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199)
           at 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
           at 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
           at 
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)
           at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
           at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
           at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
           at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
           at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
           at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
           at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
           at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           at java.base/java.lang.Thread.run(Thread.java:833)
   ```
   After some digging, I assume I somehow need to register the 
`TIMESTAMP_MILLIS_UTC` codec, found within 
[ExtraTypeCodecs](https://docs.datastax.com/en/drivers/java/4.9/com/datastax/oss/driver/api/core/type/codec/ExtraTypeCodecs.html).
 How can this be done / is there an alternative to inserting timestamps via the 
connector. I've also tried using the built-in `TimestampConverter` SMT, to 
convert the epoch millis to a timestamp string and parsing it that way - 
however this returns a similar error: `Codec not found for requested operation: 
[TIMESTAMP <-> java.lang.String]`. It seems that a codec is also required for 
this? Any hints would be greatly appreciated.
   
   ---
   For reference, here is the table I'm trying to insert to:
   ```cql
   USE connect;
   CREATE TABLE person (
       id          TEXT PRIMARY KEY, 
       name        TEXT, 
       created_at  TIMESTAMP
   );
   ```
   The connector properties:
   ```properties
   name=my-topic-sink
   topics=my-topic
   tasks.max=1
   
connector.class=org.apache.camel.kafkaconnector.cassandrasink.CamelCassandrasinkSinkConnector
   value.converter=org.apache.kafka.connect.storage.StringConverter
   
   camel.kamelet.cassandra-sink.connectionHost=cassandra
   camel.kamelet.cassandra-sink.connectionPort=9042
   camel.kamelet.cassandra-sink.keyspace=connect
   camel.kamelet.cassandra-sink.query=insert into person (id, name, created_at) 
values (?, ?, ?)
   camel.kamelet.cassandra-sink.prepareStatements=true
   ```
   And the message that is being consumed from Kafka:
   ```
   ["1","John",1670428382089]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to