This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c21c005b53 [improve][kinesis-sink] Add unit test for Kinesis sink 
JsonConverter (#15848)
8c21c005b53 is described below

commit 8c21c005b5369fa0dbfd7232898b6a47fdddd177
Author: Christophe Bornet <[email protected]>
AuthorDate: Fri Jun 3 10:57:00 2022 +0200

    [improve][kinesis-sink] Add unit test for Kinesis sink JsonConverter 
(#15848)
---
 .../pulsar/io/kinesis/json/JsonConverterTests.java | 162 +++++++++++++++++++++
 1 file changed, 162 insertions(+)

diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
new file mode 100644
index 00000000000..cce2ea8e7d5
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kinesis.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.google.common.collect.ImmutableMap;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class JsonConverterTests {
+
+    @Test
+    public void testAvroToJson() throws IOException {
+        Schema avroArraySchema = 
SchemaBuilder.array().items(SchemaBuilder.builder().stringType());
+        Schema schema = SchemaBuilder.record("record").fields()
+                .name("n").type().longType().longDefault(10)
+                .name("l").type().longType().longDefault(10)
+                .name("i").type().intType().intDefault(10)
+                .name("b").type().booleanType().booleanDefault(true)
+                .name("bb").type().bytesType().bytesDefault("10")
+                .name("d").type().doubleType().doubleDefault(10.0)
+                .name("f").type().floatType().floatDefault(10.0f)
+                .name("s").type().stringType().stringDefault("titi")
+                .name("fi").type().fixed("fi").size(3).fixedDefault(new 
byte[]{1,2,3})
+                
.name("en").type().enumeration("en").symbols("a","b","c").enumDefault("b")
+                
.name("array").type().optional().array().items(SchemaBuilder.builder().stringType())
+                
.name("arrayavro").type().optional().array().items(SchemaBuilder.builder().stringType())
+                
.name("map").type().optional().map().values(SchemaBuilder.builder().intType())
+                
.name("maputf8").type().optional().map().values(SchemaBuilder.builder().intType())
+                .endRecord();
+        GenericRecord genericRecord = new GenericData.Record(schema);
+        genericRecord.put("n", null);
+        genericRecord.put("l", 1L);
+        genericRecord.put("i", 1);
+        genericRecord.put("b", true);
+        genericRecord.put("bb", "10".getBytes(StandardCharsets.UTF_8));
+        genericRecord.put("d", 10.0);
+        genericRecord.put("f", 10.0f);
+        genericRecord.put("s", "toto");
+        genericRecord.put("fi", GenericData.get().createFixed(null, new 
byte[]{'a','b','c'}, schema.getField("fi").schema()));
+        genericRecord.put("en", GenericData.get().createEnum("b", 
schema.getField("en").schema()));
+        genericRecord.put("array", new String[] {"toto"});
+        genericRecord.put("arrayavro", new 
GenericData.Array<>(avroArraySchema, Arrays.asList("toto")));
+        genericRecord.put("map", ImmutableMap.of("a",10));
+        genericRecord.put("maputf8", ImmutableMap.of(new 
org.apache.avro.util.Utf8("a"),10));
+        JsonNode jsonNode = JsonConverter.toJson(genericRecord);
+        assertEquals(jsonNode.get("n"), NullNode.getInstance());
+        assertEquals(jsonNode.get("l").asLong(), 1L);
+        assertEquals(jsonNode.get("i").asInt(), 1);
+        assertEquals(jsonNode.get("b").asBoolean(), true);
+        assertEquals(jsonNode.get("bb").binaryValue(), 
"10".getBytes(StandardCharsets.UTF_8));
+        assertEquals(jsonNode.get("fi").binaryValue(), 
"abc".getBytes(StandardCharsets.UTF_8));
+        assertEquals(jsonNode.get("en").textValue(), "b");
+        assertEquals(jsonNode.get("d").asDouble(), 10.0);
+        assertEquals(jsonNode.get("f").numberValue(), 10.0f);
+        assertEquals(jsonNode.get("s").asText(), "toto");
+        assertTrue(jsonNode.get("array").isArray());
+        assertEquals(jsonNode.get("array").iterator().next().asText(), "toto");
+        assertTrue(jsonNode.get("arrayavro").isArray());
+        assertEquals(jsonNode.get("arrayavro").iterator().next().asText(), 
"toto");
+        assertTrue(jsonNode.get("map").isObject());
+        assertEquals(jsonNode.get("map").elements().next().asText(), "10");
+        assertEquals(jsonNode.get("map").get("a").numberValue(), 10);
+        assertTrue(jsonNode.get("maputf8").isObject());
+        assertEquals(jsonNode.get("maputf8").elements().next().asText(), "10");
+        assertEquals(jsonNode.get("maputf8").get("a").numberValue(), 10);
+    }
+
+    @Test
+    public void testLogicalTypesToJson() throws IOException {
+        Schema dateType = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+        Schema timestampMillisType = 
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+        Schema timestampMicrosType = 
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+        Schema timeMillisType = 
LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
+        Schema timeMicrosType = 
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
+        Schema uuidType = 
LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
+        Schema schema = SchemaBuilder.record("record")
+                .fields()
+                .name("mydate").type(dateType).noDefault()
+                .name("tsmillis").type(timestampMillisType).noDefault()
+                .name("tsmicros").type(timestampMicrosType).noDefault()
+                .name("timemillis").type(timeMillisType).noDefault()
+                .name("timemicros").type(timeMicrosType).noDefault()
+                .name("myuuid").type(uuidType).noDefault()
+                .endRecord();
+
+        final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+        BigDecimal myDecimal = new BigDecimal("100.003");
+        UUID myUuid = UUID.randomUUID();
+        Calendar calendar = new 
GregorianCalendar(TimeZone.getTimeZone("Europe/Copenhagen"));
+        GenericRecord genericRecord = new GenericData.Record(schema);
+        genericRecord.put("mydate", 
(int)calendar.toInstant().getEpochSecond());
+        genericRecord.put("tsmillis", calendar.getTimeInMillis());
+        genericRecord.put("tsmicros", calendar.getTimeInMillis() * 1000);
+        genericRecord.put("timemillis", (int)(calendar.getTimeInMillis() % 
MILLIS_PER_DAY));
+        genericRecord.put("timemicros", (calendar.getTimeInMillis() 
%MILLIS_PER_DAY) * 1000);
+        genericRecord.put("myuuid", myUuid.toString());
+
+        GenericRecord genericRecord2 = deserialize(serialize(genericRecord, 
schema), schema);
+        JsonNode jsonNode = JsonConverter.toJson(genericRecord2);
+        assertEquals(jsonNode.get("mydate").asInt(), 
calendar.toInstant().getEpochSecond());
+        assertEquals(jsonNode.get("tsmillis").asInt(), 
(int)calendar.getTimeInMillis());
+        assertEquals(jsonNode.get("tsmicros").asLong(), 
calendar.getTimeInMillis() * 1000);
+        assertEquals(jsonNode.get("timemillis").asInt(), 
(int)(calendar.getTimeInMillis() % MILLIS_PER_DAY));
+        assertEquals(jsonNode.get("timemicros").asLong(), 
(calendar.getTimeInMillis() %MILLIS_PER_DAY) * 1000);
+        assertEquals(UUID.fromString(jsonNode.get("myuuid").asText()), myUuid);
+    }
+
+    public static byte[] serialize(GenericRecord record, Schema schema) throws 
IOException {
+        SpecificDatumWriter<GenericRecord> datumWriter = new 
SpecificDatumWriter<>(schema);
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        BinaryEncoder binaryEncoder = new 
EncoderFactory().binaryEncoder(byteArrayOutputStream, null);
+        datumWriter.write(record, binaryEncoder);
+        binaryEncoder.flush();
+        return byteArrayOutputStream.toByteArray();
+    }
+
+    public static GenericRecord  deserialize(byte[] recordBytes, Schema 
schema) throws IOException {
+        DatumReader<GenericRecord> datumReader = new 
SpecificDatumReader<>(schema);
+        ByteArrayInputStream stream = new ByteArrayInputStream(recordBytes);
+        BinaryDecoder binaryDecoder = new 
DecoderFactory().binaryDecoder(stream, null);
+        return datumReader.read(null, binaryDecoder);
+    }
+}
\ No newline at end of file

Reply via email to