This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/main by this push:
     new 9323f1e  fix #1332 : CameHeader value is wrongly interpreted as 
BigDecimal and causes ClassCastException.
9323f1e is described below

commit 9323f1ece25dd23d557c6b3b12b9c193a6e46f07
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Mon Jan 31 13:52:51 2022 +0100

    fix #1332 : CameHeader value is wrongly interpreted as BigDecimal and 
causes ClassCastException.
---
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java     |  4 +++-
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java   |  6 +++---
 .../org/apache/camel/kafkaconnector/CamelSinkTaskTest.java | 14 ++++++++++++--
 .../apache/camel/kafkaconnector/CamelSourceTaskTest.java   |  2 +-
 4 files changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index f5d942e..a53f298 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -240,7 +240,9 @@ public class CamelSinkTask extends SinkTask {
         final String key = StringHelper.after(header.key(), prefix, 
header.key());
         final Schema schema = header.schema();
 
-        if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && 
Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
+        if (schema.type().equals(Schema.BYTES_SCHEMA.type())
+                && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)
+                && header.value() instanceof byte[]) {
             destination.put(key, Decimal.toLogical(schema, (byte[]) 
header.value()));
         } else {
             destination.put(key, header.value());
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 05a0d96..8b993a2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -36,7 +36,6 @@ import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -357,8 +356,9 @@ public class CamelSourceTask extends SourceTask {
             } else if (value instanceof Date) {
                 record.headers().addTimestamp(keyCamelHeader, (Date)value);
             } else if (value instanceof BigDecimal) {
-                Schema schema = Decimal.schema(((BigDecimal)value).scale());
-                record.headers().add(keyCamelHeader, 
Decimal.fromLogical(schema, (BigDecimal)value), schema);
+                //XXX: kafka connect configured header converter takes care of 
the encoding,
+                //default: 
org.apache.kafka.connect.storage.SimpleHeaderConverter
+                record.headers().addDecimal(keyCamelHeader, (BigDecimal)value);
             } else if (value instanceof Double) {
                 record.headers().addDouble(keyCamelHeader, (double)value);
             } else if (value instanceof Float) {
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index bab0a5d..c08a4f7 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -118,7 +119,13 @@ public class CamelSinkTaskTest {
         int myInteger = 100;
         Long myLong = new Long("100");
         BigDecimal myBigDecimal = new BigDecimal(1234567890);
-        Schema schema = Decimal.schema(myBigDecimal.scale());
+        Schema myBigDecimalSchema = Decimal.schema(myBigDecimal.scale());
+        //reproducing bigDecimal encoding by kafka connect
+        BigDecimal kafkaBigDecimal = new BigDecimal("6.9203120E+787");
+        Schema kafkaBigDecimalSchema = Decimal.schema(kafkaBigDecimal.scale());
+        SimpleHeaderConverter shc = new SimpleHeaderConverter();
+        byte[] persistedBytes = shc.fromConnectHeader("", "MyBigDecimal", 
kafkaBigDecimalSchema, kafkaBigDecimal);
+        SchemaAndValue sav = shc.toConnectHeader("", "MyBigDecimal", 
persistedBytes);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
         SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
@@ -129,7 +136,9 @@ public class CamelSinkTaskTest {
         record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyDouble", myDouble);
         record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyInteger", myInteger);
         record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", 
myLong);
-        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyBigDecimal", Decimal.fromLogical(myBigDecimalSchema, myBigDecimal), 
myBigDecimalSchema);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"KafkaBigDecimal", sav.value(), sav.schema());
+
         records.add(record);
         sinkTask.put(records);
 
@@ -145,6 +154,7 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", 
BigDecimal.class));
+        assertEquals(kafkaBigDecimal, 
exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class));
 
         sinkTask.stop();
     }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 36ae9e2..8542a4f 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -312,7 +312,7 @@ public class CamelSourceTaskTest {
         List<SourceRecord> results = sourceTask.poll();
         assertEquals(1, results.size());
         Header bigDecimalHeader = 
results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + 
"bigdecimal").next();
-        assertEquals("[B", bigDecimalHeader.value().getClass().getName());
+        assertTrue(bigDecimalHeader.value() instanceof BigDecimal);
         assertEquals(Decimal.class.getName(), 
bigDecimalHeader.schema().name());
         assertEquals(Schema.Type.BYTES, bigDecimalHeader.schema().type());
 

Reply via email to