This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e36458c  add java8 date and time type to pulsar's primitive schemas 
(#7874)
e36458c is described below

commit e36458c3f43db933ce86127f5e55f8ddca855c5b
Author: jianyun <[email protected]>
AuthorDate: Fri Aug 28 21:50:32 2020 +0800

    add java8 date and time type to pulsar's primitive schemas (#7874)
    
    ### Motivation
    
    *Compatible with flink 1.11 need to use java8 date api in pulsar's 
primitive schemas.*
    
    ### Modifications
    
    *Add Instant, LocalDate, LocalTime, LocalDateTime to pulsar's primitive 
schemas*
    
    ### Verifying this change
    
    Add Instant, LocalDate, LocalTime, LocalDateTime types to the Schema type 
test
---
 .github/workflows/ci-integration-schema.yaml       |  7 +-
 .../schema/validator/SchemaDataValidator.java      |  4 +
 .../src/main/proto/SchemaRegistryFormat.proto      |  4 +
 .../pulsar/broker/admin/AdminApiSchemaTest.java    |  4 +
 .../schema/validator/SchemaDataValidatorTest.java  |  4 +
 .../java/org/apache/pulsar/client/api/Schema.java  | 21 ++++++
 .../client/internal/DefaultImplementation.java     | 28 +++++++
 .../apache/pulsar/common/schema/SchemaType.java    | 28 +++++++
 .../client/impl/schema/AutoConsumeSchema.java      |  8 ++
 .../pulsar/client/impl/schema/InstantSchema.java   | 84 +++++++++++++++++++++
 .../pulsar/client/impl/schema/LocalDateSchema.java | 80 ++++++++++++++++++++
 .../client/impl/schema/LocalDateTimeSchema.java    | 88 ++++++++++++++++++++++
 .../pulsar/client/impl/schema/LocalTimeSchema.java | 80 ++++++++++++++++++++
 .../client/impl/schema/InstantSchemaTest.java      | 83 ++++++++++++++++++++
 .../client/impl/schema/LocalDateSchemaTest.java    | 88 ++++++++++++++++++++++
 .../impl/schema/LocalDateTimeSchemaTest.java       | 83 ++++++++++++++++++++
 .../client/impl/schema/LocalTimeSchemaTest.java    | 88 ++++++++++++++++++++++
 .../client/impl/schema/PrimitiveSchemaTest.java    | 16 ++++
 .../apache/pulsar/common/schema/KeyValueTest.java  | 12 +++
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 30 ++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |  4 +
 .../tests/integration/schema/SchemaTest.java       |  4 +
 22 files changed, 847 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci-integration-schema.yaml 
b/.github/workflows/ci-integration-schema.yaml
index 944c74f..e608af7 100644
--- a/.github/workflows/ci-integration-schema.yaml
+++ b/.github/workflows/ci-integration-schema.yaml
@@ -72,7 +72,12 @@ jobs:
       - name: run install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn -q -B -ntp clean install -DskipTests
-
+      - name: build pulsar image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests 
-Ddocker.nocache=true
+      - name: build pulsar-all image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker 
-DskipTests -Ddocker.nocache=true
       - name: build artifacts and docker image
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker 
-DskipTests
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
index 0c44612..5e5f846 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -54,6 +54,10 @@ public interface SchemaDataValidator {
             case DATE:
             case TIME:
             case TIMESTAMP:
+            case INSTANT:
+            case LOCAL_DATE:
+            case LOCAL_TIME:
+            case LOCAL_DATE_TIME:
                 PrimitiveSchemaDataValidator.of().validate(schemaData);
                 break;
             case NONE:
diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto 
b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
index 9d2c756..32790b2 100644
--- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -40,6 +40,10 @@ message SchemaInfo {
                TIME = 14;
                TIMESTAMP = 15;
                KEYVALUE = 16;
+               INSTANT = 17;
+               LOCALDATE = 18;
+               LOCALTIME = 19;
+               LOCALDATETIME = 20;
     }
     message KeyValuePair {
         required string key = 1;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index 9cc8105..fa09b7d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -98,6 +98,10 @@ public class AdminApiSchemaTest extends 
MockedPulsarServiceBaseTest {
             { Schema.DATE },
             { Schema.TIME },
             { Schema.TIMESTAMP },
+            { Schema.INSTANT },
+            { Schema.LOCAL_DATE},
+            { Schema.LOCAL_TIME},
+            { Schema.LOCAL_DATE_TIME},
             { Schema.AVRO(
                 SchemaDefinition.builder()
                     .withPojo(Foo.class)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
index 998d363..1f85655 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -50,6 +50,10 @@ public class SchemaDataValidatorTest {
             { SchemaType.DATE },
             { SchemaType.TIME },
             { SchemaType.TIMESTAMP },
+            { SchemaType.INSTANT },
+            { SchemaType.LOCAL_DATE },
+            { SchemaType.LOCAL_TIME },
+            { SchemaType.LOCAL_DATE_TIME },
         };
     }
 
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 0c871b3..e7e5bbe 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.client.api;
 import java.nio.ByteBuffer;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Date;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -208,6 +212,23 @@ public interface Schema<T> extends Cloneable{
      */
     Schema<Timestamp> TIMESTAMP = DefaultImplementation.newTimestampSchema();
 
+    /**
+     * Instant Schema.
+     */
+    Schema<Instant> INSTANT = DefaultImplementation.newInstantSchema();
+    /**
+     * LocalDate Schema.
+     */
+    Schema<LocalDate> LOCAL_DATE = DefaultImplementation.newLocalDateSchema();
+    /**
+     * LocalTime Schema.
+     */
+    Schema<LocalTime> LOCAL_TIME = DefaultImplementation.newLocalTimeSchema();
+    /**
+     * LocalDateTime Schema.
+     */
+    Schema<LocalDateTime> LOCAL_DATE_TIME = 
DefaultImplementation.newLocalDateTimeSchema();
+
     // CHECKSTYLE.OFF: MethodName
 
     /**
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 0171be5..589ac69 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -29,6 +29,10 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Date;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -219,6 +223,30 @@ public class DefaultImplementation {
                   "org.apache.pulsar.client.impl.schema.TimestampSchema", 
"of", null)
                     .invoke(null, null));
     }
+    public static Schema<Instant> newInstantSchema() {
+        return catchExceptions(
+              () -> (Schema<Instant>) getStaticMethod(
+                  "org.apache.pulsar.client.impl.schema.InstantSchema", "of", 
null)
+                    .invoke(null, null));
+    }
+    public static Schema<LocalDate> newLocalDateSchema() {
+        return catchExceptions(
+              () -> (Schema<LocalDate>) getStaticMethod(
+                  "org.apache.pulsar.client.impl.schema.LocalDateSchema", 
"of", null)
+                    .invoke(null, null));
+    }
+    public static Schema<LocalTime> newLocalTimeSchema() {
+        return catchExceptions(
+              () -> (Schema<LocalTime>) getStaticMethod(
+                  "org.apache.pulsar.client.impl.schema.LocalTimeSchema", 
"of", null)
+                    .invoke(null, null));
+    }
+    public static Schema<LocalDateTime> newLocalDateTimeSchema() {
+        return catchExceptions(
+              () -> (Schema<LocalDateTime>) getStaticMethod(
+                  "org.apache.pulsar.client.impl.schema.LocalDateTimeSchema", 
"of", null)
+                    .invoke(null, null));
+    }
 
     public static <T> Schema<T> newAvroSchema(SchemaDefinition 
schemaDefinition) {
         return catchExceptions(
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index dd8fc4a..3b16e73 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -114,6 +114,26 @@ public enum SchemaType {
      */
     KEY_VALUE(15),
 
+    /**
+     * Instant.
+     */
+    INSTANT(16),
+
+    /**
+     * LocalDate.
+     */
+    LOCAL_DATE(17),
+
+    /**
+     * LocalTime.
+     */
+    LOCAL_TIME(18),
+
+    /**
+     * LocalDateTime.
+     */
+    LOCAL_DATE_TIME(19),
+
     //
     // Schemas that don't have schema info. the value should be negative.
     //
@@ -167,6 +187,10 @@ public enum SchemaType {
           case 13: return TIME;
           case 14: return TIMESTAMP;
           case 15: return KEY_VALUE;
+          case 16: return INSTANT;
+          case 17: return LOCAL_DATE;
+          case 18: return LOCAL_TIME;
+          case 19: return LOCAL_DATE_TIME;
           case -1: return BYTES;
           case -2: return AUTO;
           case -3: return AUTO_CONSUME;
@@ -198,6 +222,10 @@ public enum SchemaType {
             case TIME:
             case TIMESTAMP:
             case BYTES:
+            case INSTANT:
+            case LOCAL_DATE:
+            case LOCAL_TIME:
+            case LOCAL_DATE_TIME:
             case NONE:
                 return true;
             default:
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 049b0f5..fb3ac59 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -182,6 +182,14 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
                 return TimeSchema.of();
             case TIMESTAMP:
                 return TimestampSchema.of();
+            case INSTANT:
+                return InstantSchema.of();
+            case LOCAL_DATE:
+                return LocalDateSchema.of();
+            case LOCAL_TIME:
+                return LocalTimeSchema.of();
+            case LOCAL_DATE_TIME:
+                return LocalDateTimeSchema.of();
             case JSON:
             case AVRO:
                 return GenericSchemaImpl.of(schemaInfo);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
new file mode 100644
index 0000000..5830cea
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+
+/**
+ * A schema for `java.time.Instant`.
+ */
+public class InstantSchema extends AbstractSchema<Instant> {
+
+   private static final InstantSchema INSTANCE;
+   private static final SchemaInfo SCHEMA_INFO;
+
+   static {
+       SCHEMA_INFO = new SchemaInfo()
+             .setName("Instant")
+             .setType(SchemaType.INSTANT)
+             .setSchema(new byte[0]);
+       INSTANCE = new InstantSchema();
+   }
+
+   public static InstantSchema of() {
+      return INSTANCE;
+   }
+
+   @Override
+   public byte[] encode(Instant message) {
+      if (null == message) {
+         return null;
+      }
+      // Instant is accurate to nanoseconds and requires two value storage.
+      ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+      buffer.putLong(message.getEpochSecond());
+      buffer.putInt(message.getNano());
+      return buffer.array();
+   }
+
+   @Override
+   public Instant decode(byte[] bytes) {
+      if (null == bytes) {
+         return null;
+      }
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      long epochSecond = buffer.getLong();
+      int nanos = buffer.getInt();
+      return Instant.ofEpochSecond(epochSecond, nanos);
+   }
+
+   @Override
+   public Instant decode(ByteBuf byteBuf) {
+      if (null == byteBuf) {
+         return null;
+      }
+      long epochSecond = byteBuf.readLong();
+      int nanos = byteBuf.readInt();
+      return Instant.ofEpochSecond(epochSecond, nanos);
+   }
+
+   @Override
+   public SchemaInfo getSchemaInfo() {
+      return SCHEMA_INFO;
+   }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
new file mode 100644
index 0000000..add6fd2
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import java.time.LocalDate;
+
+/**
+ * A schema for `java.time.LocalDate`.
+ */
+public class LocalDateSchema extends AbstractSchema<LocalDate> {
+
+   private static final LocalDateSchema INSTANCE;
+   private static final SchemaInfo SCHEMA_INFO;
+
+   static {
+       SCHEMA_INFO = new SchemaInfo()
+             .setName("LocalDate")
+             .setType(SchemaType.LOCAL_DATE)
+             .setSchema(new byte[0]);
+       INSTANCE = new LocalDateSchema();
+   }
+
+   public static LocalDateSchema of() {
+      return INSTANCE;
+   }
+
+   @Override
+   public byte[] encode(LocalDate message) {
+      if (null == message) {
+         return null;
+      }
+
+      Long epochDay = message.toEpochDay();
+      return LongSchema.of().encode(epochDay);
+   }
+
+   @Override
+   public LocalDate decode(byte[] bytes) {
+      if (null == bytes) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(bytes);
+      return LocalDate.ofEpochDay(decode);
+   }
+
+   @Override
+   public LocalDate decode(ByteBuf byteBuf) {
+      if (null == byteBuf) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(byteBuf);
+      return LocalDate.ofEpochDay(decode);
+   }
+
+   @Override
+   public SchemaInfo getSchemaInfo() {
+      return SCHEMA_INFO;
+   }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
new file mode 100644
index 0000000..8a6c4fb
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+/**
+ * A schema for `java.time.LocalDateTime`.
+ */
+public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
+
+   private static final LocalDateTimeSchema INSTANCE;
+   private static final SchemaInfo SCHEMA_INFO;
+   public static final String DELIMITER = ":";
+
+   static {
+       SCHEMA_INFO = new SchemaInfo()
+             .setName("LocalDateTime")
+             .setType(SchemaType.LOCAL_DATE_TIME)
+             .setSchema(new byte[0]);
+       INSTANCE = new LocalDateTimeSchema();
+   }
+
+   public static LocalDateTimeSchema of() {
+      return INSTANCE;
+   }
+
+   @Override
+   public byte[] encode(LocalDateTime message) {
+      if (null == message) {
+         return null;
+      }
+      //LocalDateTime is accurate to nanoseconds and requires two value 
storage.
+      ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2);
+      buffer.putLong(message.toLocalDate().toEpochDay());
+      buffer.putLong(message.toLocalTime().toNanoOfDay());
+      return buffer.array();
+   }
+
+   @Override
+   public LocalDateTime decode(byte[] bytes) {
+      if (null == bytes) {
+         return null;
+      }
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      long epochDay = buffer.getLong();
+      long nanoOfDay = buffer.getLong();
+      return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), 
LocalTime.ofNanoOfDay(nanoOfDay));
+   }
+
+   @Override
+   public LocalDateTime decode(ByteBuf byteBuf) {
+      if (null == byteBuf) {
+         return null;
+      }
+      long epochDay = byteBuf.readLong();
+      long nanoOfDay = byteBuf.readLong();
+      return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), 
LocalTime.ofNanoOfDay(nanoOfDay));
+   }
+
+   @Override
+   public SchemaInfo getSchemaInfo() {
+      return SCHEMA_INFO;
+   }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
new file mode 100644
index 0000000..6e2bf62
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import java.time.LocalTime;
+
+/**
+ * A schema for `java.time.LocalTime`.
+ */
+public class LocalTimeSchema extends AbstractSchema<LocalTime> {
+
+   private static final LocalTimeSchema INSTANCE;
+   private static final SchemaInfo SCHEMA_INFO;
+
+   static {
+       SCHEMA_INFO = new SchemaInfo()
+             .setName("LocalTime")
+             .setType(SchemaType.LOCAL_TIME)
+             .setSchema(new byte[0]);
+       INSTANCE = new LocalTimeSchema();
+   }
+
+   public static LocalTimeSchema of() {
+      return INSTANCE;
+   }
+
+   @Override
+   public byte[] encode(LocalTime message) {
+      if (null == message) {
+         return null;
+      }
+
+      Long nanoOfDay = message.toNanoOfDay();
+      return LongSchema.of().encode(nanoOfDay);
+   }
+
+   @Override
+   public LocalTime decode(byte[] bytes) {
+      if (null == bytes) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(bytes);
+      return LocalTime.ofNanoOfDay(decode);
+   }
+
+   @Override
+   public LocalTime decode(ByteBuf byteBuf) {
+      if (null == byteBuf) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(byteBuf);
+      return LocalTime.ofNanoOfDay(decode);
+   }
+
+   @Override
+   public SchemaInfo getSchemaInfo() {
+      return SCHEMA_INFO;
+   }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java
new file mode 100644
index 0000000..ca7856a
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+
+public class InstantSchemaTest {
+
+    @Test
+    public void testSchemaEncode() {
+        InstantSchema schema = InstantSchema.of();
+        Instant instant = Instant.now();
+        ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + 
Integer.BYTES);
+        byteBuffer.putLong(instant.getEpochSecond());
+        byteBuffer.putInt(instant.getNano());
+        byte[] expected = byteBuffer.array();
+        Assert.assertEquals(expected, schema.encode(instant));
+    }
+
+    @Test
+    public void testSchemaEncodeDecodeFidelity() {
+        InstantSchema schema = InstantSchema.of();
+        Instant instant = Instant.now();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + 
Integer.BYTES);
+        byte[] bytes = schema.encode(instant);
+        byteBuf.writeBytes(bytes);
+        Assert.assertEquals(instant, schema.decode(bytes));
+        Assert.assertEquals(instant, schema.decode(byteBuf));
+    }
+
+    @Test
+    public void testSchemaDecode() {
+        Instant instant = Instant.now();
+        ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + 
Integer.BYTES);
+        byteBuffer.putLong(instant.getEpochSecond());
+        byteBuffer.putInt(instant.getNano());
+        byte[] byteData = byteBuffer.array();
+        long epochSecond = instant.getEpochSecond();
+        long nano = instant.getNano();
+
+        InstantSchema schema = InstantSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + 
Integer.BYTES);
+        byteBuf.writeBytes(byteData);
+        Instant decode = schema.decode(byteData);
+        Assert.assertEquals(epochSecond, decode.getEpochSecond());
+        Assert.assertEquals(nano, decode.getNano());
+        decode = schema.decode(byteBuf);
+        Assert.assertEquals(epochSecond, decode.getEpochSecond());
+        Assert.assertEquals(nano, decode.getNano());
+    }
+
+    @Test
+    public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
+
+        Assert.assertNull(InstantSchema.of().encode(null));
+        Assert.assertNull(InstantSchema.of().decode(byteBuf));
+        Assert.assertNull(InstantSchema.of().decode(bytes));
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java
new file mode 100644
index 0000000..31b48b2
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.time.LocalDate;
+
+public class LocalDateSchemaTest {
+
+    @Test
+    public void testSchemaEncode() {
+        LocalDateSchema schema = LocalDateSchema.of();
+        LocalDate localDate = LocalDate.now();
+        byte[] expected = new byte[] {
+                (byte) (localDate.toEpochDay() >>> 56),
+                (byte) (localDate.toEpochDay() >>> 48),
+                (byte) (localDate.toEpochDay() >>> 40),
+                (byte) (localDate.toEpochDay() >>> 32),
+                (byte) (localDate.toEpochDay() >>> 24),
+                (byte) (localDate.toEpochDay() >>> 16),
+                (byte) (localDate.toEpochDay() >>> 8),
+                ((Long)localDate.toEpochDay()).byteValue()
+        };
+        Assert.assertEquals(expected, schema.encode(localDate));
+    }
+
+    @Test
+    public void testSchemaEncodeDecodeFidelity() {
+        LocalDateSchema schema = LocalDateSchema.of();
+        LocalDate localDate = LocalDate.now();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byte[] bytes = schema.encode(localDate);
+        byteBuf.writeBytes(bytes);
+        Assert.assertEquals(localDate, schema.decode(bytes));
+        Assert.assertEquals(localDate, schema.decode(byteBuf));
+    }
+
+    @Test
+    public void testSchemaDecode() {
+        byte[] byteData = new byte[] {
+               0,
+               0,
+               0,
+               0,
+               0,
+               10,
+               24,
+               42
+        };
+        long expected = 10*65536 + 24*256 + 42;
+
+        LocalDateSchema schema = LocalDateSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byteBuf.writeBytes(byteData);
+        Assert.assertEquals(expected, schema.decode(byteData).toEpochDay());
+        Assert.assertEquals(expected, schema.decode(byteBuf).toEpochDay());
+    }
+
+    @Test
+    public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
+
+        Assert.assertNull(LocalDateSchema.of().encode(null));
+        Assert.assertNull(LocalDateSchema.of().decode(byteBuf));
+        Assert.assertNull(LocalDateSchema.of().decode(bytes));
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java
new file mode 100644
index 0000000..1214560
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+
+public class LocalDateTimeSchemaTest {
+
+    @Test
+    public void testSchemaEncode() {
+        LocalDateTimeSchema schema = LocalDateTimeSchema.of();
+        LocalDateTime localDateTime = LocalDateTime.now();
+        ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2);
+        byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay());
+        byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay());
+        byte[] expected = byteBuffer.array();
+        Assert.assertEquals(expected, schema.encode(localDateTime));
+    }
+
+    @Test
+    public void testSchemaEncodeDecodeFidelity() {
+        LocalDateTimeSchema schema = LocalDateTimeSchema.of();
+        LocalDateTime localDateTime = LocalDateTime.now();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2);
+        byte[] bytes = schema.encode(localDateTime);
+        byteBuf.writeBytes(bytes);
+        Assert.assertEquals(localDateTime, schema.decode(bytes));
+        Assert.assertEquals(localDateTime, schema.decode(byteBuf));
+    }
+
+    @Test
+    public void testSchemaDecode() {
+        LocalDateTime localDateTime = LocalDateTime.of(2020, 8, 22, 2, 0, 0);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2);
+        byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay());
+        byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay());
+        byte[] byteData = byteBuffer.array();
+        long expectedEpochDay = localDateTime.toLocalDate().toEpochDay();
+        long expectedNanoOfDay = localDateTime.toLocalTime().toNanoOfDay();
+
+        LocalDateTimeSchema schema = LocalDateTimeSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2);
+        byteBuf.writeBytes(byteData);
+        LocalDateTime decode = schema.decode(byteData);
+        Assert.assertEquals(expectedEpochDay, 
decode.toLocalDate().toEpochDay());
+        Assert.assertEquals(expectedNanoOfDay, 
decode.toLocalTime().toNanoOfDay());
+        decode = schema.decode(byteBuf);
+        Assert.assertEquals(expectedEpochDay, 
decode.toLocalDate().toEpochDay());
+        Assert.assertEquals(expectedNanoOfDay, 
decode.toLocalTime().toNanoOfDay());
+    }
+
+    @Test
+    public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
+
+        Assert.assertNull(LocalDateSchema.of().encode(null));
+        Assert.assertNull(LocalDateSchema.of().decode(byteBuf));
+        Assert.assertNull(LocalDateSchema.of().decode(bytes));
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java
new file mode 100644
index 0000000..ed729de
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.time.LocalTime;
+
+public class LocalTimeSchemaTest {
+
+    @Test
+    public void testSchemaEncode() {
+        LocalTimeSchema schema = LocalTimeSchema.of();
+        LocalTime localTime = LocalTime.now();
+        byte[] expected = new byte[] {
+                (byte) (localTime.toNanoOfDay() >>> 56),
+                (byte) (localTime.toNanoOfDay() >>> 48),
+                (byte) (localTime.toNanoOfDay() >>> 40),
+                (byte) (localTime.toNanoOfDay() >>> 32),
+                (byte) (localTime.toNanoOfDay() >>> 24),
+                (byte) (localTime.toNanoOfDay() >>> 16),
+                (byte) (localTime.toNanoOfDay() >>> 8),
+                ((Long)localTime.toNanoOfDay()).byteValue()
+        };
+        Assert.assertEquals(expected, schema.encode(localTime));
+    }
+
+    @Test
+    public void testSchemaEncodeDecodeFidelity() {
+        LocalTimeSchema schema = LocalTimeSchema.of();
+        LocalTime localTime = LocalTime.now();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byte[] bytes = schema.encode(localTime);
+        byteBuf.writeBytes(bytes);
+        Assert.assertEquals(localTime, schema.decode(bytes));
+        Assert.assertEquals(localTime, schema.decode(byteBuf));
+    }
+
+    @Test
+    public void testSchemaDecode() {
+        byte[] byteData = new byte[] {
+               0,
+               0,
+               0,
+               0,
+               0,
+               10,
+               24,
+               42
+        };
+        long expected = 10*65536 + 24*256 + 42;
+       
+        LocalTimeSchema schema = LocalTimeSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byteBuf.writeBytes(byteData);
+        Assert.assertEquals(expected, schema.decode(byteData).toNanoOfDay());
+        Assert.assertEquals(expected, schema.decode(byteBuf).toNanoOfDay());
+    }
+
+    @Test
+    public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
+
+        Assert.assertNull(LocalTimeSchema.of().encode(null));
+        Assert.assertNull(LocalTimeSchema.of().decode(byteBuf));
+        Assert.assertNull(LocalTimeSchema.of().decode(bytes));
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
index dceebbd..a2efb25 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
@@ -28,6 +28,10 @@ import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -60,6 +64,10 @@ public class PrimitiveSchemaTest {
             put(DateSchema.of(), Arrays.asList(new Date(new 
java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
             put(TimeSchema.of(), Arrays.asList(new Time(new 
java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
             put(TimestampSchema.of(), Arrays.asList(new Timestamp(new 
java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
+            put(InstantSchema.of(), Arrays.asList(Instant.now(), 
Instant.now().minusSeconds(60*23L)));
+            put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(), 
LocalDate.now().minusDays(2)));
+            put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(), 
LocalTime.now().minusHours(2)));
+            put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(), 
LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
         }
     };
 
@@ -78,6 +86,10 @@ public class PrimitiveSchemaTest {
             put(Schema.DATE, Arrays.asList(new Date(new 
java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
             put(Schema.TIME, Arrays.asList(new Time(new 
java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
             put(Schema.TIMESTAMP, Arrays.asList(new Timestamp(new 
java.util.Date().getTime() - 10000), new Timestamp(new 
java.util.Date().getTime())));
+            put(Schema.INSTANT, Arrays.asList(Instant.now(), 
Instant.now().minusSeconds(60*23L)));
+            put(Schema.LOCAL_DATE, Arrays.asList(LocalDate.now(), 
LocalDate.now().minusDays(2)));
+            put(Schema.LOCAL_TIME, Arrays.asList(LocalTime.now(), 
LocalTime.now().minusHours(2)));
+            put(Schema.LOCAL_DATE_TIME, Arrays.asList(LocalDateTime.now(), 
LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
         }
     };
 
@@ -139,6 +151,10 @@ public class PrimitiveSchemaTest {
         assertEquals(SchemaType.DATE, 
DateSchema.of().getSchemaInfo().getType());
         assertEquals(SchemaType.TIME, 
TimeSchema.of().getSchemaInfo().getType());
         assertEquals(SchemaType.TIMESTAMP, 
TimestampSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.INSTANT, 
InstantSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.LOCAL_DATE, 
LocalDateSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.LOCAL_TIME, 
LocalTimeSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.LOCAL_DATE_TIME, 
LocalDateTimeSchema.of().getSchemaInfo().getType());
     }
 
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
index 7ee270b..1011e0e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
@@ -25,6 +25,10 @@ import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
@@ -39,7 +43,11 @@ import org.apache.pulsar.client.impl.schema.BytesSchema;
 import org.apache.pulsar.client.impl.schema.DateSchema;
 import org.apache.pulsar.client.impl.schema.DoubleSchema;
 import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.InstantSchema;
 import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LocalDateSchema;
+import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
+import org.apache.pulsar.client.impl.schema.LocalTimeSchema;
 import org.apache.pulsar.client.impl.schema.LongSchema;
 import org.apache.pulsar.client.impl.schema.ShortSchema;
 import org.apache.pulsar.client.impl.schema.StringSchema;
@@ -69,6 +77,10 @@ public class KeyValueTest {
             put(DateSchema.of(), Arrays.asList(new Date(new 
java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
             put(TimeSchema.of(), Arrays.asList(new Time(new 
java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
             put(TimestampSchema.of(), Arrays.asList(new Timestamp(new 
java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
+            put(InstantSchema.of(), Arrays.asList(Instant.now(), 
Instant.now().minusSeconds(60*23L)));
+            put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(), 
LocalDate.now().minusDays(2)));
+            put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(), 
LocalTime.now().minusHours(2)));
+            put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(), 
LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
         }
     };
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4c3b885..65c4a93 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -444,6 +444,10 @@ public final class PulsarApi {
       Time(13, 13),
       Timestamp(14, 14),
       KeyValue(15, 15),
+      Instant(16, 16),
+      LocalDate(17, 17),
+      LocalTime(18, 18),
+      LocalDateTime(19, 19),
       ;
       
       public static final int None_VALUE = 0;
@@ -462,6 +466,10 @@ public final class PulsarApi {
       public static final int Time_VALUE = 13;
       public static final int Timestamp_VALUE = 14;
       public static final int KeyValue_VALUE = 15;
+      public static final int Instant_VALUE = 16;
+      public static final int LocalDate_VALUE = 17;
+      public static final int LocalTime_VALUE = 18;
+      public static final int LocalDateTime_VALUE = 19;
       
       
       public final int getNumber() { return value; }
@@ -484,6 +492,10 @@ public final class PulsarApi {
           case 13: return Time;
           case 14: return Timestamp;
           case 15: return KeyValue;
+          case 16: return Instant;
+          case 17: return LocalDate;
+          case 18: return LocalTime;
+          case 19: return LocalDateTime;
           default: return null;
         }
       }
@@ -1578,6 +1590,15 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
+            case 42: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
           }
         }
       }
@@ -18860,6 +18881,15 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
+            case 34: {
+              int length = input.readRawVarint32();
+              int limit = input.pushLimit(length);
+              while (input.getBytesUntilLimit() > 0) {
+                addAckSet(input.readInt64());
+              }
+              input.popLimit(limit);
+              break;
+            }
           }
         }
       }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 63ff6ee..9c4b913 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -40,6 +40,10 @@ message Schema {
         Time = 13;
         Timestamp = 14;
         KeyValue = 15;
+        Instant = 16;
+        LocalDate = 17;
+        LocalTime = 18;
+        LocalDateTime = 19;
     }
 
     required string name = 1;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index 4b0083a..6194f95 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -273,6 +273,10 @@ public class SchemaTest extends PulsarTestSuite {
         schemas.add(Schema.DATE);
         schemas.add(Schema.TIME);
         schemas.add(Schema.TIMESTAMP);
+        schemas.add(Schema.INSTANT);
+        schemas.add(Schema.LOCAL_DATE);
+        schemas.add(Schema.LOCAL_TIME);
+        schemas.add(Schema.LOCAL_DATE_TIME);
 
         schemas.forEach(schemaProducer -> {
             schemas.forEach(schemaConsumer -> {

Reply via email to