This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2591ccd  Add KeyValueSchema to satisfy 
org.apache.kafka.connect.connector.ConnectRecord (#2885)
2591ccd is described below

commit 2591ccd22faba2c149ccc08b34e84d1d4c78ae96
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Oct 31 09:04:02 2018 +0800

    Add KeyValueSchema to satisfy 
org.apache.kafka.connect.connector.ConnectRecord (#2885)
    
    Kafka org.apache.kafka.connect.connector.ConnectRecord has both key and 
value schema.
    Currently we need a KeyValueSchema to achieve it and support debezium.
    There will be another PR to do the convert between KeyValueSchema and 
org.apache.kafka.connect.data.Schema.
---
 .../java/org/apache/pulsar/client/api/Schema.java  |  32 ++++++
 .../pulsar/client/impl/schema/KeyValueSchema.java  |  88 +++++++++++++++
 .../pulsar/client/schema/KeyValueSchemaTest.java   | 121 +++++++++++++++++++++
 .../org/apache/pulsar/common/schema/KeyValue.java  |  40 +++++++
 .../apache/pulsar/common/schema/SchemaType.java    |   7 +-
 .../io/kafka/connect/KafkaConnectSourceTest.java   |  14 ---
 6 files changed, 287 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 5872db0..e7f32b9 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
@@ -28,12 +30,15 @@ import org.apache.pulsar.client.impl.schema.DoubleSchema;
 import org.apache.pulsar.client.impl.schema.FloatSchema;
 import org.apache.pulsar.client.impl.schema.IntSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.client.impl.schema.LongSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.ShortSchema;
 import org.apache.pulsar.client.impl.schema.StringSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * Message schema definition
@@ -104,6 +109,33 @@ public interface Schema<T> {
         return JSONSchema.of(clazz);
     }
 
+    /**
+     * Key Value Schema using passed in schema type, support JSON and AVRO 
currently.
+     */
+    static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> 
value, SchemaType type) {
+        checkArgument(SchemaType.JSON == type || SchemaType.AVRO == type);
+        if (SchemaType.JSON == type) {
+            return new KeyValueSchema(JSONSchema.of(key), 
JSONSchema.of(value));
+        } else {
+            // AVRO
+            return new KeyValueSchema(AvroSchema.of(key), 
AvroSchema.of(value));
+        }
+    }
+
+    /**
+     * Key Value Schema whose underneath key and value schemas are JSONSchema.
+     */
+    static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> 
value) {
+        return new KeyValueSchema(JSONSchema.of(key), JSONSchema.of(value));
+    }
+
+    /**
+     * Key Value Schema using passed in key and value schemas.
+     */
+    static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> 
value) {
+        return new KeyValueSchema(key, value);
+    }
+
     @Deprecated
     static Schema<GenericRecord> AUTO() {
         return AUTO_CONSUME();
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
new file mode 100644
index 0000000..d0cd796
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.nio.ByteBuffer;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * [Key, Value] pair schema definition
+ */
+@Slf4j
+public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
+    @Getter
+    private final Schema<K> keySchema;
+    @Getter
+    private final Schema<V> valueSchema;
+
+    // schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
+    //   [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
+    private SchemaInfo schemaInfo;
+
+    public KeyValueSchema(Schema<K> keySchema,
+                          Schema<V> valueSchema) {
+        this.keySchema = keySchema;
+        this.valueSchema = valueSchema;
+
+        // set schemaInfo
+        this.schemaInfo = new SchemaInfo()
+            .setName("KeyValue")
+            .setType(SchemaType.KEY_VALUE);
+
+        byte[] keySchemaInfo = keySchema.getSchemaInfo().getSchema();
+        byte[] valueSchemaInfo = valueSchema.getSchemaInfo().getSchema();
+
+        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length + 
4 + valueSchemaInfo.length);
+        byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
+            .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
+        this.schemaInfo.setSchema(byteBuffer.array());
+    }
+
+    // encode as bytes: [key.length][key.bytes][value.length][value.bytes]
+    public byte[] encode(KeyValue<K, V> message) {
+        byte[] keyBytes = keySchema.encode(message.getKey());
+        byte[] valueBytes = valueSchema.encode(message.getValue());
+
+        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + 
valueBytes.length);
+        
byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
+        return byteBuffer.array();
+    }
+
+    public KeyValue<K, V> decode(byte[] bytes) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+        int keyLength = byteBuffer.getInt();
+        byte[] keyBytes = new byte[keyLength];
+        byteBuffer.get(keyBytes);
+
+        int valueLength = byteBuffer.getInt();
+        byte[] valueBytes = new byte[valueLength];
+        byteBuffer.get(valueBytes);
+
+        return new KeyValue<>(keySchema.decode(keyBytes), 
valueSchema.decode(valueBytes));
+    }
+
+    public SchemaInfo getSchemaInfo() {
+        return this.schemaInfo;
+    }
+}
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/KeyValueSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/KeyValueSchemaTest.java
new file mode 100644
index 0000000..04d0986
--- /dev/null
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/KeyValueSchemaTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import static org.testng.Assert.assertEquals;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Color;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class KeyValueSchemaTest {
+
+    @Test
+    public void testAvroSchemaCreate() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(Foo.class);
+        AvroSchema<Bar> barSchema = AvroSchema.of(Bar.class);
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema1 = 
Schema.KeyValue(fooSchema, barSchema);
+        Schema<KeyValue<Foo, Bar>> keyValueSchema2 = 
Schema.KeyValue(Foo.class, Bar.class, SchemaType.AVRO);
+
+        assertEquals(keyValueSchema1.getSchemaInfo().getType(), 
SchemaType.KEY_VALUE);
+        assertEquals(keyValueSchema2.getSchemaInfo().getType(), 
SchemaType.KEY_VALUE);
+
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+            SchemaType.AVRO);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+            SchemaType.AVRO);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+            SchemaType.AVRO);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+            SchemaType.AVRO);
+
+        String schemaInfo1 = new 
String(keyValueSchema1.getSchemaInfo().getSchema());
+        String schemaInfo2 = new 
String(keyValueSchema2.getSchemaInfo().getSchema());
+        assertEquals(schemaInfo1, schemaInfo2);
+    }
+
+    @Test
+    public void testJsonSchemaCreate() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(Foo.class);
+        JSONSchema<Bar> barSchema = JSONSchema.of(Bar.class);
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema1 = 
Schema.KeyValue(fooSchema, barSchema);
+        Schema<KeyValue<Foo, Bar>> keyValueSchema2 = 
Schema.KeyValue(Foo.class, Bar.class, SchemaType.JSON);
+        Schema<KeyValue<Foo, Bar>> keyValueSchema3 = 
Schema.KeyValue(Foo.class, Bar.class);
+
+        assertEquals(keyValueSchema1.getSchemaInfo().getType(), 
SchemaType.KEY_VALUE);
+        assertEquals(keyValueSchema2.getSchemaInfo().getType(), 
SchemaType.KEY_VALUE);
+        assertEquals(keyValueSchema3.getSchemaInfo().getType(), 
SchemaType.KEY_VALUE);
+
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+            SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+            SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+            SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+            SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+            SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, 
Bar>)keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+            SchemaType.JSON);
+
+        String schemaInfo1 = new 
String(keyValueSchema1.getSchemaInfo().getSchema());
+        String schemaInfo2 = new 
String(keyValueSchema2.getSchemaInfo().getSchema());
+        String schemaInfo3 = new 
String(keyValueSchema3.getSchemaInfo().getSchema());
+        assertEquals(schemaInfo1, schemaInfo2);
+        assertEquals(schemaInfo1, schemaInfo3);
+    }
+
+    @Test
+    public void testSchemaEncodeAndDecode() {
+        Schema keyValueSchema = Schema.KeyValue(Foo.class, Bar.class);
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(Color.RED);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        Assert.assertTrue(encodeBytes.length > 0);
+
+        KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, 
Bar>)keyValueSchema.decode(encodeBytes);
+        Foo fooBack = keyValue.getKey();
+        Bar barBack = keyValue.getValue();
+
+        assertEquals(foo, fooBack);
+        assertEquals(bar, barBack);
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/KeyValue.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
new file mode 100644
index 0000000..f95fe7d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+/**
+ * A simple KeyValue class
+ */
+public class KeyValue<K, V> {
+    private final K key;
+    private final V value;
+
+    public KeyValue(K key, V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public K getKey() {
+        return key;
+    }
+
+    public V getValue() {
+        return value;
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 4b4e576..6073ca5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -96,5 +96,10 @@ public enum SchemaType {
     /**
      * Auto Publish Type.
      */
-    AUTO_PUBLISH
+    AUTO_PUBLISH,
+
+    /**
+     * A Schema that contains Key Schema and Value Schema.
+     */
+    KEY_VALUE
 }
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index d0f0a63..e852c4c 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -18,33 +18,19 @@
  */
 package org.apache.pulsar.io.kafka.connect;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
 import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.Unpooled;
 import java.io.File;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.file.FileStreamSourceConnector;
-import org.apache.kafka.connect.file.FileStreamSourceTask;
 import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.util.Callback;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.functions.api.Record;
 import org.testng.annotations.AfterMethod;

Reply via email to