This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 15245f3f2ac Revert "fix:
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema
with type... (#15598)"
15245f3f2ac is described below
commit 15245f3f2ac4224bcf28d3495551f565335a58fd
Author: congbobo184 <[email protected]>
AuthorDate: Mon Nov 14 19:42:32 2022 +0800
Revert "fix: org.apache.kafka.connect.errors.DataException: Invalid Java
object for schema with type... (#15598)"
This reverts commit 417042b35d487fcffb76bcfa9b154ab9a0d11fdd.
---
.../io/kafka/connect/KafkaConnectSource.java | 1 +
.../io/kafka/connect/schema/KafkaConnectData.java | 77 +----------
.../io/kafka/connect/KafkaConnectSinkTest.java | 141 ++++-----------------
.../connect/PulsarSchemaToKafkaSchemaTest.java | 33 -----
4 files changed, 34 insertions(+), 218 deletions(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 8ca6daad9b0..5d30e95acef 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -32,6 +32,7 @@ import
org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
import java.util.Base64;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index ce179dab297..b649a9b0468 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -33,15 +33,17 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-@Slf4j
public class KafkaConnectData {
public static Object getKafkaConnectData(Object nativeObject, Schema
kafkaSchema) {
if (kafkaSchema == null) {
return nativeObject;
}
+ if (nativeObject == null) {
+ return defaultOrThrow(kafkaSchema);
+ }
+
if (nativeObject instanceof JsonNode) {
JsonNode node = (JsonNode) nativeObject;
return jsonAsConnectData(node, kafkaSchema);
@@ -50,73 +52,6 @@ public class KafkaConnectData {
return avroAsConnectData(avroRecord, kafkaSchema);
}
- return castToKafkaSchema(nativeObject, kafkaSchema);
- }
-
- public static Object castToKafkaSchema(Object nativeObject, Schema
kafkaSchema) {
- if (nativeObject == null) {
- return defaultOrThrow(kafkaSchema);
- }
-
- if (nativeObject instanceof Number) {
- // This is needed in case
- // jackson decided to fit value into some other type internally
- // (e.g. Double instead of Float).
- // Kafka's ConnectSchema expects exact type
- //
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
- Number num = (Number) nativeObject;
- switch (kafkaSchema.type()) {
- case INT8:
- if (!(nativeObject instanceof Byte)) {
- if (log.isDebugEnabled()) {
- log.debug("nativeObject of type {} converted to
Byte", nativeObject.getClass());
- }
- return num.byteValue();
- }
- break;
- case INT16:
- if (!(nativeObject instanceof Short)) {
- if (log.isDebugEnabled()) {
- log.debug("nativeObject of type {} converted to
Short", nativeObject.getClass());
- }
- return num.shortValue();
- }
- break;
- case INT32:
- if (!(nativeObject instanceof Integer)) {
- if (log.isDebugEnabled()) {
- log.debug("nativeObject of type {} converted to
Integer", nativeObject.getClass());
- }
- return num.intValue();
- }
- break;
- case INT64:
- if (!(nativeObject instanceof Long)) {
- if (log.isDebugEnabled()) {
- log.debug("nativeObject of type {} converted to
Long", nativeObject.getClass());
- }
- return num.longValue();
- }
- break;
- case FLOAT32:
- if (!(nativeObject instanceof Float)) {
- if (log.isDebugEnabled()) {
- log.debug("nativeObject of type {} converted to
Float", nativeObject.getClass());
- }
- return num.floatValue();
- }
- break;
- case FLOAT64:
- if (!(nativeObject instanceof Double)) {
- if (log.isDebugEnabled()) {
- log.debug("nativeObject of type {} converted to
Double", nativeObject.getClass());
- }
- return num.doubleValue();
- }
- break;
- }
- }
-
return nativeObject;
}
@@ -152,9 +87,9 @@ public class KafkaConnectData {
case BOOLEAN:
return jsonNode.booleanValue();
case NUMBER:
- return jsonNode.doubleValue();
+ jsonNode.doubleValue();
case STRING:
- return jsonNode.textValue();
+ jsonNode.textValue();
default:
throw new DataException("Don't know how to convert " +
jsonNode +
" to Connect data (schema is null).");
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index aad24155a31..b561e4b11c1 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -281,28 +281,8 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
assertEquals(result.get("keySchema"), expectedKeySchema);
assertEquals(result.get("valueSchema"), expectedSchema);
- if (schema.getSchemaInfo().getType().isPrimitive()) {
- // to test cast of primitive values
- Message msgOut = mock(MessageImpl.class);
-
when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"),
schema));
- when(msgOut.getKey()).thenReturn(result.get("key").toString());
- when(msgOut.hasKey()).thenReturn(true);
- when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
-
- Record<GenericObject> recordOut = PulsarRecord.<String>builder()
- .topicName("fake-topic")
- .message(msgOut)
- .schema(schema)
- .ackFunction(status::incrementAndGet)
- .failFunction(status::decrementAndGet)
- .build();
-
- SinkRecord sinkRecord = sink.toSinkRecord(recordOut);
- return sinkRecord;
- } else {
- SinkRecord sinkRecord = sink.toSinkRecord(record);
- return sinkRecord;
- }
+ SinkRecord sinkRecord = sink.toSinkRecord(record);
+ return sinkRecord;
}
private GenericRecord getGenericRecord(Object value, Schema schema) {
@@ -318,135 +298,71 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
return rec;
}
-
- @Test
- public void genericRecordCastTest() throws Exception {
- props.put("kafkaConnectorSinkClass",
SchemaedFileStreamSinkConnector.class.getCanonicalName());
-
- KafkaConnectSink sink = new KafkaConnectSink();
- sink.open(props, context);
-
- AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>
pulsarAvroSchema
- =
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
-
- final GenericData.Record obj = new
GenericData.Record(pulsarAvroSchema.getAvroSchema());
- // schema type INT32
- obj.put("field1", (byte)10);
- // schema type STRING
- obj.put("field2", "test");
- // schema type INT64
- obj.put("field3", (short)100);
-
- final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
- Message msg = mock(MessageImpl.class);
- when(msg.getValue()).thenReturn(rec);
- when(msg.getKey()).thenReturn("key");
- when(msg.hasKey()).thenReturn(true);
- when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
-
- final AtomicInteger status = new AtomicInteger(0);
- Record<GenericObject> record = PulsarRecord.<String>builder()
- .topicName("fake-topic")
- .message(msg)
- .schema(pulsarAvroSchema)
- .ackFunction(status::incrementAndGet)
- .failFunction(status::decrementAndGet)
- .build();
-
- SinkRecord sinkRecord = sink.toSinkRecord(record);
-
- Struct out = (Struct) sinkRecord.value();
- Assert.assertEquals(out.get("field1").getClass(), Integer.class);
- Assert.assertEquals(out.get("field2").getClass(), String.class);
- Assert.assertEquals(out.get("field3").getClass(), Long.class);
-
- Assert.assertEquals(out.get("field1"), 10);
- Assert.assertEquals(out.get("field2"), "test");
- Assert.assertEquals(out.get("field3"), 100L);
-
- sink.close();
- }
-
@Test
public void bytesRecordSchemaTest() throws Exception {
byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val",
"BYTES");
- // test/mock writes it as string
- Assert.assertEquals(sinkRecord.value(), "val");
+ byte[] out = (byte[]) sinkRecord.value();
+ Assert.assertEquals(out, in);
}
@Test
public void stringRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val",
"STRING");
- Assert.assertEquals(sinkRecord.value().getClass(), String.class);
- Assert.assertEquals(sinkRecord.value(), "val");
+ String out = (String) sinkRecord.value();
+ Assert.assertEquals(out, "val");
}
@Test
public void booleanRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true,
"BOOLEAN");
- Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class);
- Assert.assertEquals(sinkRecord.value(), true);
+ boolean out = (boolean) sinkRecord.value();
+ Assert.assertEquals(out, true);
}
@Test
public void byteRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1,
"INT8");
- Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
- Assert.assertEquals(sinkRecord.value(), (byte)1);
+ byte out = (byte) sinkRecord.value();
+ Assert.assertEquals(out, 1);
}
@Test
public void shortRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1,
"INT16");
- Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
- Assert.assertEquals(sinkRecord.value(), (short)1);
+ short out = (short) sinkRecord.value();
+ Assert.assertEquals(out, 1);
}
@Test
public void integerRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE,
Schema.INT32, Integer.MAX_VALUE, "INT32");
- Assert.assertEquals(sinkRecord.value().getClass(), Integer.class);
- Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE);
+ int out = (int) sinkRecord.value();
+ Assert.assertEquals(out, Integer.MAX_VALUE);
}
@Test
public void longRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64,
Long.MAX_VALUE, "INT64");
- Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
- Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE);
- }
-
- @Test
- public void longRecordSchemaTestCast() throws Exception {
- // int 1 is coming from ObjectMapper, expect Long (as in schema) from
sinkRecord
- SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64");
- Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
- Assert.assertEquals(sinkRecord.value(), 1L);
+ long out = (long) sinkRecord.value();
+ Assert.assertEquals(out, Long.MAX_VALUE);
}
@Test
public void floatRecordSchemaTest() throws Exception {
- // 1.0d is coming back from ObjectMapper, expect Float (as in schema)
from sinkRecord
+ // 1.0d is coming back from ObjectMapper
SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d,
"FLOAT32");
- Assert.assertEquals(sinkRecord.value().getClass(), Float.class);
- Assert.assertEquals(sinkRecord.value(), 1.0f);
+ float out = (float) sinkRecord.value();
+ Assert.assertEquals(out, 1.0d);
}
@Test
public void doubleRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE,
Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
- Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
- Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE);
- }
-
- @Test
- public void doubleRecordSchemaTestCast() throws Exception {
- SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d,
"FLOAT64");
- Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
- Assert.assertEquals(sinkRecord.value(), 1.0d);
+ double out = (double) sinkRecord.value();
+ Assert.assertEquals(out, Double.MAX_VALUE);
}
@Test
@@ -473,12 +389,9 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema,
expected, "STRUCT");
Struct out = (Struct) sinkRecord.value();
- Assert.assertEquals(out.get("field1").getClass(), Integer.class);
- Assert.assertEquals(out.get("field1"), 10);
- Assert.assertEquals(out.get("field2").getClass(), String.class);
- Assert.assertEquals(out.get("field2"), "test");
- Assert.assertEquals(out.get("field3").getClass(), Long.class);
- Assert.assertEquals(out.get("field3"), 100L);
+ Assert.assertEquals((int)out.get("field1"), 10);
+ Assert.assertEquals((String)out.get("field2"), "test");
+ Assert.assertEquals((long)out.get("field3"), 100L);
}
@Test
@@ -500,9 +413,9 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema,
expected, "STRUCT");
Struct out = (Struct) sinkRecord.value();
- Assert.assertEquals(out.get("field1"), 10);
- Assert.assertEquals(out.get("field2"), "test");
- Assert.assertEquals(out.get("field3"), 100L);
+ Assert.assertEquals((int)out.get("field1"), 10);
+ Assert.assertEquals((String)out.get("field2"), "test");
+ Assert.assertEquals((long)out.get("field3"), 100L);
}
@Test
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 60caa2bbe81..9075dd9c3d3 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -29,11 +29,9 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
-import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.testng.annotations.Test;
-import java.math.BigInteger;
import java.util.List;
import static org.testng.Assert.assertEquals;
@@ -169,37 +167,6 @@ public class PulsarSchemaToKafkaSchemaTest {
}
}
- @Test
- public void castToKafkaSchemaTest() {
- assertEquals(Byte.class,
- KafkaConnectData.castToKafkaSchema(100L,
-
org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass());
-
- assertEquals(Short.class,
- KafkaConnectData.castToKafkaSchema(100.0d,
-
org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass());
-
- assertEquals(Integer.class,
- KafkaConnectData.castToKafkaSchema((byte)5,
-
org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass());
-
- assertEquals(Long.class,
- KafkaConnectData.castToKafkaSchema((short)5,
-
org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass());
-
- assertEquals(Float.class,
- KafkaConnectData.castToKafkaSchema(1.0d,
-
org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass());
-
- assertEquals(Double.class,
- KafkaConnectData.castToKafkaSchema(1.5f,
-
org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
-
- assertEquals(Double.class,
- KafkaConnectData.castToKafkaSchema(new BigInteger("100"),
-
org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
- }
-
@Test
public void dateSchemaTest() {
org.apache.kafka.connect.data.Schema kafkaSchema =