This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2591ccd Add KeyValueSchema to satisfy
org.apache.kafka.connect.connector.ConnectRecord (#2885)
2591ccd is described below
commit 2591ccd22faba2c149ccc08b34e84d1d4c78ae96
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Oct 31 09:04:02 2018 +0800
Add KeyValueSchema to satisfy
org.apache.kafka.connect.connector.ConnectRecord (#2885)
Kafka org.apache.kafka.connect.connector.ConnectRecord has both key and
value schema.
Currently we need a KeyValueSchema to achieve it and support debezium.
There will be another PR to do the convert between KeyValueSchema and
org.apache.kafka.connect.data.Schema.
---
.../java/org/apache/pulsar/client/api/Schema.java | 32 ++++++
.../pulsar/client/impl/schema/KeyValueSchema.java | 88 +++++++++++++++
.../pulsar/client/schema/KeyValueSchemaTest.java | 121 +++++++++++++++++++++
.../org/apache/pulsar/common/schema/KeyValue.java | 40 +++++++
.../apache/pulsar/common/schema/SchemaType.java | 7 +-
.../io/kafka/connect/KafkaConnectSourceTest.java | 14 ---
6 files changed, 287 insertions(+), 15 deletions(-)
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 5872db0..e7f32b9 100644
---
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
@@ -28,12 +30,15 @@ import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
/**
* Message schema definition
@@ -104,6 +109,33 @@ public interface Schema<T> {
return JSONSchema.of(clazz);
}
+ /**
+ * Key Value Schema using passed in schema type, support JSON and AVRO
currently.
+ */
+ static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V>
value, SchemaType type) {
+ checkArgument(SchemaType.JSON == type || SchemaType.AVRO == type);
+ if (SchemaType.JSON == type) {
+ return new KeyValueSchema(JSONSchema.of(key),
JSONSchema.of(value));
+ } else {
+ // AVRO
+ return new KeyValueSchema(AvroSchema.of(key),
AvroSchema.of(value));
+ }
+ }
+
+ /**
+ * Key Value Schema whose underneath key and value schemas are JSONSchema.
+ */
+ static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V>
value) {
+ return new KeyValueSchema(JSONSchema.of(key), JSONSchema.of(value));
+ }
+
+ /**
+ * Key Value Schema using passed in key and value schemas.
+ */
+ static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V>
value) {
+ return new KeyValueSchema(key, value);
+ }
+
@Deprecated
static Schema<GenericRecord> AUTO() {
return AUTO_CONSUME();
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
new file mode 100644
index 0000000..d0cd796
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.nio.ByteBuffer;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * [Key, Value] pair schema definition
+ */
+@Slf4j
+public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
+ @Getter
+ private final Schema<K> keySchema;
+ @Getter
+ private final Schema<V> valueSchema;
+
+ // schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
+ // [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
+ private SchemaInfo schemaInfo;
+
+ public KeyValueSchema(Schema<K> keySchema,
+ Schema<V> valueSchema) {
+ this.keySchema = keySchema;
+ this.valueSchema = valueSchema;
+
+ // set schemaInfo
+ this.schemaInfo = new SchemaInfo()
+ .setName("KeyValue")
+ .setType(SchemaType.KEY_VALUE);
+
+ byte[] keySchemaInfo = keySchema.getSchemaInfo().getSchema();
+ byte[] valueSchemaInfo = valueSchema.getSchemaInfo().getSchema();
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length +
4 + valueSchemaInfo.length);
+ byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
+ .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
+ this.schemaInfo.setSchema(byteBuffer.array());
+ }
+
+ // encode as bytes: [key.length][key.bytes][value.length][value.bytes]
+ public byte[] encode(KeyValue<K, V> message) {
+ byte[] keyBytes = keySchema.encode(message.getKey());
+ byte[] valueBytes = valueSchema.encode(message.getValue());
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 +
valueBytes.length);
+
byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
+ return byteBuffer.array();
+ }
+
+ public KeyValue<K, V> decode(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int keyLength = byteBuffer.getInt();
+ byte[] keyBytes = new byte[keyLength];
+ byteBuffer.get(keyBytes);
+
+ int valueLength = byteBuffer.getInt();
+ byte[] valueBytes = new byte[valueLength];
+ byteBuffer.get(valueBytes);
+
+ return new KeyValue<>(keySchema.decode(keyBytes),
valueSchema.decode(valueBytes));
+ }
+
+ public SchemaInfo getSchemaInfo() {
+ return this.schemaInfo;
+ }
+}
diff --git
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/KeyValueSchemaTest.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/KeyValueSchemaTest.java
new file mode 100644
index 0000000..04d0986
--- /dev/null
+++
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/KeyValueSchemaTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import static org.testng.Assert.assertEquals;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Color;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class KeyValueSchemaTest {
+
+ @Test
+ public void testAvroSchemaCreate() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(Foo.class);
+ AvroSchema<Bar> barSchema = AvroSchema.of(Bar.class);
+
+ Schema<KeyValue<Foo, Bar>> keyValueSchema1 =
Schema.KeyValue(fooSchema, barSchema);
+ Schema<KeyValue<Foo, Bar>> keyValueSchema2 =
Schema.KeyValue(Foo.class, Bar.class, SchemaType.AVRO);
+
+ assertEquals(keyValueSchema1.getSchemaInfo().getType(),
SchemaType.KEY_VALUE);
+ assertEquals(keyValueSchema2.getSchemaInfo().getType(),
SchemaType.KEY_VALUE);
+
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+
+ String schemaInfo1 = new
String(keyValueSchema1.getSchemaInfo().getSchema());
+ String schemaInfo2 = new
String(keyValueSchema2.getSchemaInfo().getSchema());
+ assertEquals(schemaInfo1, schemaInfo2);
+ }
+
+ @Test
+ public void testJsonSchemaCreate() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(Foo.class);
+ JSONSchema<Bar> barSchema = JSONSchema.of(Bar.class);
+
+ Schema<KeyValue<Foo, Bar>> keyValueSchema1 =
Schema.KeyValue(fooSchema, barSchema);
+ Schema<KeyValue<Foo, Bar>> keyValueSchema2 =
Schema.KeyValue(Foo.class, Bar.class, SchemaType.JSON);
+ Schema<KeyValue<Foo, Bar>> keyValueSchema3 =
Schema.KeyValue(Foo.class, Bar.class);
+
+ assertEquals(keyValueSchema1.getSchemaInfo().getType(),
SchemaType.KEY_VALUE);
+ assertEquals(keyValueSchema2.getSchemaInfo().getType(),
SchemaType.KEY_VALUE);
+ assertEquals(keyValueSchema3.getSchemaInfo().getType(),
SchemaType.KEY_VALUE);
+
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo,
Bar>)keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+
+ String schemaInfo1 = new
String(keyValueSchema1.getSchemaInfo().getSchema());
+ String schemaInfo2 = new
String(keyValueSchema2.getSchemaInfo().getSchema());
+ String schemaInfo3 = new
String(keyValueSchema3.getSchemaInfo().getSchema());
+ assertEquals(schemaInfo1, schemaInfo2);
+ assertEquals(schemaInfo1, schemaInfo3);
+ }
+
+ @Test
+ public void testSchemaEncodeAndDecode() {
+ Schema keyValueSchema = Schema.KeyValue(Foo.class, Bar.class);
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ foo.setField4(bar);
+ foo.setColor(Color.RED);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ Assert.assertTrue(encodeBytes.length > 0);
+
+ KeyValue<Foo, Bar> keyValue = (KeyValue<Foo,
Bar>)keyValueSchema.decode(encodeBytes);
+ Foo fooBack = keyValue.getKey();
+ Bar barBack = keyValue.getValue();
+
+ assertEquals(foo, fooBack);
+ assertEquals(bar, barBack);
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
new file mode 100644
index 0000000..f95fe7d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+/**
+ * A simple KeyValue class
+ */
+public class KeyValue<K, V> {
+ private final K key;
+ private final V value;
+
+ public KeyValue(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 4b4e576..6073ca5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -96,5 +96,10 @@ public enum SchemaType {
/**
* Auto Publish Type.
*/
- AUTO_PUBLISH
+ AUTO_PUBLISH,
+
+ /**
+ * A Schema that contains Key Schema and Value Schema.
+ */
+ KEY_VALUE
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index d0f0a63..e852c4c 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -18,33 +18,19 @@
*/
package org.apache.pulsar.io.kafka.connect;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
import static
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
-import org.apache.kafka.connect.file.FileStreamSourceTask;
import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.util.Callback;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.AfterMethod;