This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 757222d [Schema] Provide a generic record interface for representing
a typed message (#2452)
757222d is described below
commit 757222d9d232dbe03293df890609aa85ff63556c
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Aug 27 22:21:27 2018 -0700
[Schema] Provide a generic record interface for representing a typed
message (#2452)
* [Schema] Provide a generic record interface for representing a typed
message
### Motivation
In some use cases, the publishers and consumers don't know the type or
schema of the messages ahead of time.
For example, in pulsar io connector, when connecting a topic to a jdbc
table, the connector doesn't know
the tyep of the messages ahead of time; the connector can only fetch schema
info from schema registry and
that is the only information connector knows. It is impossible for mapping
the messages to a relational database
table.
So we need a way to present a generic `Struct` record with fields.
### Changes
Introduce `Field` and `GenericRecord` to represent `Struct` records
deserialized with a schema.
### NotCovered
This change only introduces the interfaces. It doesn't integrate with the
producer and consumer workflow.
That would be done in subsequent changes if we agree on the interfaces.
---
.../org/apache/pulsar/client/api/schema/Field.java | 43 +++++++++
.../pulsar/client/api/schema/GenericRecord.java | 51 +++++++++++
.../pulsar/client/impl/schema/AvroSchema.java | 17 ++--
.../client/impl/schema/GenericAvroRecord.java | 79 ++++++++++++++++
.../client/impl/schema/GenericAvroSchema.java | 102 +++++++++++++++++++++
.../pulsar/client/schema/AvroSchemaTest.java | 47 +++++++++-
6 files changed, 329 insertions(+), 10 deletions(-)
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
new file mode 100644
index 0000000..653b5d6
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/Field.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * A field in a record, consisting of a field name, index, and
+ * {@link org.apache.pulsar.client.api.Schema} for the field value.
+ */
+@Data
+@EqualsAndHashCode
+@ToString
+public class Field {
+
+ /**
+ * The field name.
+ */
+ private final String name;
+ /**
+ * The index of the field within the record.
+ */
+ private final int index;
+
+}
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
new file mode 100644
index 0000000..0a4fce4
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import java.util.List;
+
+/**
+ * An interface represents a message with schema.
+ */
+public interface GenericRecord {
+
+ /**
+ * Returns the list of fields associated with the record.
+ *
+ * @return the list of fields associated with the record.
+ */
+ List<Field> getFields();
+
+ /**
+ * Retrieve the value of the provided <tt>field</tt>.
+ *
+ * @param field the field to retrieve the value
+ * @return the value object
+ */
+ Object getField(Field field);
+
+ /**
+ * Retrieve the value of the provided <tt>fieldName</tt>.
+ *
+ * @param fieldName the field name
+ * @return the value object
+ */
+ Object getField(String fieldName);
+
+}
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 7d90d2b..6867fdc 100644
---
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -45,8 +45,9 @@ public class AvroSchema<T> implements Schema<T> {
private BinaryEncoder encoder;
private ByteArrayOutputStream byteArrayOutputStream;
- private AvroSchema(Class<T> pojo, Map<String, String> properties) {
- this.schema = ReflectData.AllowNull.get().getSchema(pojo);
+ private AvroSchema(org.apache.avro.Schema schema,
+ Map<String, String> properties) {
+ this.schema = schema;
this.schemaInfo = new SchemaInfo();
this.schemaInfo.setName("");
@@ -61,8 +62,7 @@ public class AvroSchema<T> implements Schema<T> {
}
@Override
- public byte[] encode(T message) {
-
+ public synchronized byte[] encode(T message) {
try {
datumWriter.write(message, this.encoder);
this.encoder.flush();
@@ -88,11 +88,16 @@ public class AvroSchema<T> implements Schema<T> {
return this.schemaInfo;
}
+ private static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
+ return ReflectData.AllowNull.get().getSchema(pojo);
+ }
+
public static <T> AvroSchema<T> of(Class<T> pojo) {
- return new AvroSchema<>(pojo, Collections.emptyMap());
+ return new AvroSchema<>(createAvroSchema(pojo),
Collections.emptyMap());
}
public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String>
properties) {
- return new AvroSchema<>(pojo, properties);
+ return new AvroSchema<>(createAvroSchema(pojo), properties);
}
+
}
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
new file mode 100644
index 0000000..fb65c7a
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+
+/**
+ * A generic avro record.
+ */
+@Slf4j
+class GenericAvroRecord implements GenericRecord {
+
+ private final org.apache.avro.Schema schema;
+ private final List<Field> fields;
+ private final org.apache.avro.generic.GenericRecord record;
+
+ GenericAvroRecord(org.apache.avro.Schema schema,
+ List<Field> fields,
+ org.apache.avro.generic.GenericRecord record) {
+ this.schema = schema;
+ this.fields = fields;
+ this.record = record;
+ }
+
+ @Override
+ public List<Field> getFields() {
+ return fields;
+ }
+
+ @Override
+ public Object getField(Field field) {
+ return getField(field.getName());
+ }
+
+ @Override
+ public Object getField(String fieldName) {
+ Object value = record.get(fieldName);
+ if (value instanceof Utf8) {
+ return ((Utf8) value).toString();
+ } else if (value instanceof org.apache.avro.generic.GenericRecord) {
+ org.apache.avro.generic.GenericRecord avroRecord =
+ (org.apache.avro.generic.GenericRecord) value;
+ org.apache.avro.Schema recordSchema = avroRecord.getSchema();
+ List<Field> fields = recordSchema.getFields()
+ .stream()
+ .map(f -> new Field(f.name(), f.pos()))
+ .collect(Collectors.toList());
+ return new GenericAvroRecord(schema, fields, avroRecord);
+ } else {
+ return value;
+ }
+ }
+
+ org.apache.avro.generic.GenericRecord getAvroRecord() {
+ return record;
+ }
+
+}
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
new file mode 100644
index 0000000..4ccfe55
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * A generic avro schema.
+ */
+public class GenericAvroSchema implements Schema<GenericRecord> {
+
+ private final org.apache.avro.Schema schema;
+ private final List<Field> fields;
+ private final SchemaInfo schemaInfo;
+ private final GenericDatumWriter<org.apache.avro.generic.GenericRecord>
datumWriter;
+ private BinaryEncoder encoder;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final GenericDatumReader<org.apache.avro.generic.GenericRecord>
datumReader;
+
+ public GenericAvroSchema(SchemaInfo schemaInfo) {
+ this.schemaInfo = schemaInfo;
+ this.schema = new org.apache.avro.Schema.Parser().parse(
+ new String(schemaInfo.getSchema(), UTF_8)
+ );
+ this.fields = schema.getFields()
+ .stream()
+ .map(f -> new Field(f.name(), f.pos()))
+ .collect(Collectors.toList());
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.encoder =
EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+ this.datumWriter = new GenericDatumWriter(schema);
+ this.datumReader = new GenericDatumReader(schema);
+ }
+
+ public org.apache.avro.Schema getAvroSchema() {
+ return schema;
+ }
+
+ @Override
+ public synchronized byte[] encode(GenericRecord message) {
+ checkArgument(message instanceof GenericAvroRecord);
+ GenericAvroRecord gar = (GenericAvroRecord) message;
+ try {
+ datumWriter.write(gar.getAvroRecord(), this.encoder);
+ this.encoder.flush();
+ return this.byteArrayOutputStream.toByteArray();
+ } catch (Exception e) {
+ throw new SchemaSerializationException(e);
+ } finally {
+ this.byteArrayOutputStream.reset();
+ }
+ }
+
+ @Override
+ public GenericRecord decode(byte[] bytes) {
+ try {
+ org.apache.avro.generic.GenericRecord avroRecord =
datumReader.read(
+ null,
+ DecoderFactory.get().binaryDecoder(bytes, null));
+ return new GenericAvroRecord(schema, fields, avroRecord);
+ } catch (IOException e) {
+ throw new SchemaSerializationException(e);
+ }
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return schemaInfo;
+ }
+}
diff --git
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
index 84c060e..cfe9cb7 100644
---
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
+++
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
@@ -18,12 +18,17 @@
*/
package org.apache.pulsar.client.schema;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -65,10 +70,10 @@ public class AvroSchemaTest {
@Test
public void testSchema() {
AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
- Assert.assertEquals(avroSchema.getSchemaInfo().getType(),
SchemaType.AVRO);
+ assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
- Assert.assertEquals(schemaJson, SCHEMA_JSON);
+ assertEquals(schemaJson, SCHEMA_JSON);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
@@ -103,7 +108,41 @@ public class AvroSchemaTest {
Foo object1 = avroSchema.decode(bytes1);
Foo object2 = avroSchema.decode(bytes2);
- Assert.assertEquals(object1, foo1);
- Assert.assertEquals(object2, foo2);
+ assertEquals(object1, foo1);
+ assertEquals(object2, foo2);
+ }
+
+ @Test
+ public void testEncodeAndDecodeGenericRecord() {
+ AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
+
+ GenericAvroSchema genericAvroSchema = new
GenericAvroSchema(avroSchema.getSchemaInfo());
+
+ log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
+
+ int numRecords = 10;
+ for (int i = 0; i < numRecords; i++) {
+ Foo foo = new Foo();
+ foo.setField1("field-1-" + i);
+ foo.setField2("field-2-" + i);
+ foo.setField3(i);
+ Bar bar = new Bar();
+ bar.setField1(i % 2 == 0);
+ foo.setField4(bar);
+
+ byte[] data = avroSchema.encode(foo);
+
+ GenericRecord record = genericAvroSchema.decode(data);
+ Object field1 = record.getField("field1");
+ assertEquals("field-1-" + i, field1, "Field 1 is " +
field1.getClass());
+ Object field2 = record.getField("field2");
+ assertEquals("field-2-" + i, field2, "Field 2 is " +
field2.getClass());
+ Object field3 = record.getField("field3");
+ assertEquals(i, field3, "Field 3 is " + field3.getClass());
+ Object field4 = record.getField("field4");
+ assertTrue(field4 instanceof GenericRecord);
+ GenericRecord field4Record = (GenericRecord) field4;
+ assertEquals(i % 2 == 0, field4Record.getField("field1"));
+ }
}
}