This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 15245f3f2ac Revert "fix: 
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema 
with type... (#15598)"
15245f3f2ac is described below

commit 15245f3f2ac4224bcf28d3495551f565335a58fd
Author: congbobo184 <[email protected]>
AuthorDate: Mon Nov 14 19:42:32 2022 +0800

    Revert "fix: org.apache.kafka.connect.errors.DataException: Invalid Java 
object for schema with type... (#15598)"
    
    This reverts commit 417042b35d487fcffb76bcfa9b154ab9a0d11fdd.
---
 .../io/kafka/connect/KafkaConnectSource.java       |   1 +
 .../io/kafka/connect/schema/KafkaConnectData.java  |  77 +----------
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 141 ++++-----------------
 .../connect/PulsarSchemaToKafkaSchemaTest.java     |  33 -----
 4 files changed, 34 insertions(+), 218 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 8ca6daad9b0..5d30e95acef 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -32,6 +32,7 @@ import 
org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
 import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
 
 import java.util.Base64;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index ce179dab297..b649a9b0468 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -33,15 +33,17 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
 
-@Slf4j
 public class KafkaConnectData {
     public static Object getKafkaConnectData(Object nativeObject, Schema 
kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
+        }
+
         if (nativeObject instanceof JsonNode) {
             JsonNode node = (JsonNode) nativeObject;
             return jsonAsConnectData(node, kafkaSchema);
@@ -50,73 +52,6 @@ public class KafkaConnectData {
             return avroAsConnectData(avroRecord, kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
-    }
-
-    public static Object castToKafkaSchema(Object nativeObject, Schema 
kafkaSchema) {
-        if (nativeObject == null) {
-            return defaultOrThrow(kafkaSchema);
-        }
-
-        if (nativeObject instanceof Number) {
-            // This is needed in case
-            // jackson decided to fit value into some other type internally
-            // (e.g. Double instead of Float).
-            // Kafka's ConnectSchema expects exact type
-            // 
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
-            Number num = (Number) nativeObject;
-            switch (kafkaSchema.type()) {
-                case INT8:
-                    if (!(nativeObject instanceof Byte)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("nativeObject of type {} converted to 
Byte", nativeObject.getClass());
-                        }
-                        return num.byteValue();
-                    }
-                    break;
-                case INT16:
-                    if (!(nativeObject instanceof Short)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("nativeObject of type {} converted to 
Short", nativeObject.getClass());
-                        }
-                        return num.shortValue();
-                    }
-                    break;
-                case INT32:
-                    if (!(nativeObject instanceof Integer)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("nativeObject of type {} converted to 
Integer", nativeObject.getClass());
-                        }
-                        return num.intValue();
-                    }
-                    break;
-                case INT64:
-                    if (!(nativeObject instanceof Long)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("nativeObject of type {} converted to 
Long", nativeObject.getClass());
-                        }
-                        return num.longValue();
-                    }
-                    break;
-                case FLOAT32:
-                    if (!(nativeObject instanceof Float)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("nativeObject of type {} converted to 
Float", nativeObject.getClass());
-                        }
-                        return num.floatValue();
-                    }
-                    break;
-                case FLOAT64:
-                    if (!(nativeObject instanceof Double)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("nativeObject of type {} converted to 
Double", nativeObject.getClass());
-                        }
-                        return num.doubleValue();
-                    }
-                    break;
-            }
-        }
-
         return nativeObject;
     }
 
@@ -152,9 +87,9 @@ public class KafkaConnectData {
                 case BOOLEAN:
                     return jsonNode.booleanValue();
                 case NUMBER:
-                    return jsonNode.doubleValue();
+                    jsonNode.doubleValue();
                 case STRING:
-                    return jsonNode.textValue();
+                    jsonNode.textValue();
                 default:
                     throw new DataException("Don't know how to convert " + 
jsonNode +
                             " to Connect data (schema is null).");
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index aad24155a31..b561e4b11c1 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -281,28 +281,8 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         assertEquals(result.get("keySchema"), expectedKeySchema);
         assertEquals(result.get("valueSchema"), expectedSchema);
 
-        if (schema.getSchemaInfo().getType().isPrimitive()) {
-            // to test cast of primitive values
-            Message msgOut = mock(MessageImpl.class);
-            
when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"), 
schema));
-            when(msgOut.getKey()).thenReturn(result.get("key").toString());
-            when(msgOut.hasKey()).thenReturn(true);
-            when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
-
-            Record<GenericObject> recordOut = PulsarRecord.<String>builder()
-                    .topicName("fake-topic")
-                    .message(msgOut)
-                    .schema(schema)
-                    .ackFunction(status::incrementAndGet)
-                    .failFunction(status::decrementAndGet)
-                    .build();
-
-            SinkRecord sinkRecord = sink.toSinkRecord(recordOut);
-            return sinkRecord;
-        } else {
-            SinkRecord sinkRecord = sink.toSinkRecord(record);
-            return sinkRecord;
-        }
+        SinkRecord sinkRecord = sink.toSinkRecord(record);
+        return sinkRecord;
     }
 
     private GenericRecord getGenericRecord(Object value, Schema schema) {
@@ -318,135 +298,71 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         return rec;
     }
 
-
-    @Test
-    public void genericRecordCastTest() throws Exception {
-        props.put("kafkaConnectorSinkClass", 
SchemaedFileStreamSinkConnector.class.getCanonicalName());
-
-        KafkaConnectSink sink = new KafkaConnectSink();
-        sink.open(props, context);
-
-        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> 
pulsarAvroSchema
-                = 
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
-
-        final GenericData.Record obj = new 
GenericData.Record(pulsarAvroSchema.getAvroSchema());
-        // schema type INT32
-        obj.put("field1", (byte)10);
-        // schema type STRING
-        obj.put("field2", "test");
-        // schema type INT64
-        obj.put("field3", (short)100);
-
-        final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
-        Message msg = mock(MessageImpl.class);
-        when(msg.getValue()).thenReturn(rec);
-        when(msg.getKey()).thenReturn("key");
-        when(msg.hasKey()).thenReturn(true);
-        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
-
-        final AtomicInteger status = new AtomicInteger(0);
-        Record<GenericObject> record = PulsarRecord.<String>builder()
-                .topicName("fake-topic")
-                .message(msg)
-                .schema(pulsarAvroSchema)
-                .ackFunction(status::incrementAndGet)
-                .failFunction(status::decrementAndGet)
-                .build();
-
-        SinkRecord sinkRecord = sink.toSinkRecord(record);
-
-        Struct out = (Struct) sinkRecord.value();
-        Assert.assertEquals(out.get("field1").getClass(), Integer.class);
-        Assert.assertEquals(out.get("field2").getClass(), String.class);
-        Assert.assertEquals(out.get("field3").getClass(), Long.class);
-
-        Assert.assertEquals(out.get("field1"), 10);
-        Assert.assertEquals(out.get("field2"), "test");
-        Assert.assertEquals(out.get("field3"), 100L);
-
-        sink.close();
-    }
-
     @Test
     public void bytesRecordSchemaTest() throws Exception {
         byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
         SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", 
"BYTES");
-        // test/mock writes it as string
-        Assert.assertEquals(sinkRecord.value(), "val");
+        byte[] out = (byte[]) sinkRecord.value();
+        Assert.assertEquals(out, in);
     }
 
     @Test
     public void stringRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", 
"STRING");
-        Assert.assertEquals(sinkRecord.value().getClass(), String.class);
-        Assert.assertEquals(sinkRecord.value(), "val");
+        String out = (String) sinkRecord.value();
+        Assert.assertEquals(out, "val");
     }
 
     @Test
     public void booleanRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, 
"BOOLEAN");
-        Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class);
-        Assert.assertEquals(sinkRecord.value(), true);
+        boolean out = (boolean) sinkRecord.value();
+        Assert.assertEquals(out, true);
     }
 
     @Test
     public void byteRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
         SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, 
"INT8");
-        Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
-        Assert.assertEquals(sinkRecord.value(), (byte)1);
+        byte out = (byte) sinkRecord.value();
+        Assert.assertEquals(out, 1);
     }
 
     @Test
     public void shortRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
         SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, 
"INT16");
-        Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
-        Assert.assertEquals(sinkRecord.value(), (short)1);
+        short out = (short) sinkRecord.value();
+        Assert.assertEquals(out, 1);
     }
 
     @Test
     public void integerRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, 
Schema.INT32, Integer.MAX_VALUE, "INT32");
-        Assert.assertEquals(sinkRecord.value().getClass(), Integer.class);
-        Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE);
+        int out = (int) sinkRecord.value();
+        Assert.assertEquals(out, Integer.MAX_VALUE);
     }
 
     @Test
     public void longRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, 
Long.MAX_VALUE, "INT64");
-        Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
-        Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE);
-    }
-
-    @Test
-    public void longRecordSchemaTestCast() throws Exception {
-        // int 1 is coming from ObjectMapper, expect Long (as in schema) from 
sinkRecord
-        SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64");
-        Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
-        Assert.assertEquals(sinkRecord.value(), 1L);
+        long out = (long) sinkRecord.value();
+        Assert.assertEquals(out, Long.MAX_VALUE);
     }
 
     @Test
     public void floatRecordSchemaTest() throws Exception {
-        // 1.0d is coming back from ObjectMapper, expect Float (as in schema) 
from sinkRecord
+        // 1.0d is coming back from ObjectMapper
         SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, 
"FLOAT32");
-        Assert.assertEquals(sinkRecord.value().getClass(), Float.class);
-        Assert.assertEquals(sinkRecord.value(), 1.0f);
+        float out = (float) sinkRecord.value();
+        Assert.assertEquals(out, 1.0d);
     }
 
     @Test
     public void doubleRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, 
Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
-        Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
-        Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE);
-    }
-
-    @Test
-    public void doubleRecordSchemaTestCast() throws Exception {
-        SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d, 
"FLOAT64");
-        Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
-        Assert.assertEquals(sinkRecord.value(), 1.0d);
+        double out = (double) sinkRecord.value();
+        Assert.assertEquals(out, Double.MAX_VALUE);
     }
 
     @Test
@@ -473,12 +389,9 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, 
expected, "STRUCT");
 
         Struct out = (Struct) sinkRecord.value();
-        Assert.assertEquals(out.get("field1").getClass(), Integer.class);
-        Assert.assertEquals(out.get("field1"), 10);
-        Assert.assertEquals(out.get("field2").getClass(), String.class);
-        Assert.assertEquals(out.get("field2"), "test");
-        Assert.assertEquals(out.get("field3").getClass(), Long.class);
-        Assert.assertEquals(out.get("field3"), 100L);
+        Assert.assertEquals((int)out.get("field1"), 10);
+        Assert.assertEquals((String)out.get("field2"), "test");
+        Assert.assertEquals((long)out.get("field3"), 100L);
     }
 
     @Test
@@ -500,9 +413,9 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, 
expected, "STRUCT");
 
         Struct out = (Struct) sinkRecord.value();
-        Assert.assertEquals(out.get("field1"), 10);
-        Assert.assertEquals(out.get("field2"), "test");
-        Assert.assertEquals(out.get("field3"), 100L);
+        Assert.assertEquals((int)out.get("field1"), 10);
+        Assert.assertEquals((String)out.get("field2"), "test");
+        Assert.assertEquals((long)out.get("field3"), 100L);
     }
 
     @Test
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 60caa2bbe81..9075dd9c3d3 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -29,11 +29,9 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
-import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 import org.testng.annotations.Test;
 
-import java.math.BigInteger;
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
@@ -169,37 +167,6 @@ public class PulsarSchemaToKafkaSchemaTest {
         }
     }
 
-    @Test
-    public void castToKafkaSchemaTest() {
-        assertEquals(Byte.class,
-                KafkaConnectData.castToKafkaSchema(100L,
-                        
org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass());
-
-        assertEquals(Short.class,
-                KafkaConnectData.castToKafkaSchema(100.0d,
-                        
org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass());
-
-        assertEquals(Integer.class,
-                KafkaConnectData.castToKafkaSchema((byte)5,
-                        
org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass());
-
-        assertEquals(Long.class,
-                KafkaConnectData.castToKafkaSchema((short)5,
-                        
org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass());
-
-        assertEquals(Float.class,
-                KafkaConnectData.castToKafkaSchema(1.0d,
-                        
org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass());
-
-        assertEquals(Double.class,
-                KafkaConnectData.castToKafkaSchema(1.5f,
-                        
org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
-
-        assertEquals(Double.class,
-                KafkaConnectData.castToKafkaSchema(new BigInteger("100"),
-                        
org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
-    }
-
     @Test
     public void dateSchemaTest() {
         org.apache.kafka.connect.data.Schema kafkaSchema =

Reply via email to