This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e36458c add java8 date and time type to pulsar's primitive schemas
(#7874)
e36458c is described below
commit e36458c3f43db933ce86127f5e55f8ddca855c5b
Author: jianyun <[email protected]>
AuthorDate: Fri Aug 28 21:50:32 2020 +0800
add java8 date and time type to pulsar's primitive schemas (#7874)
### Motivation
*Compatible with flink 1.11 need to use java8 date api in pulsar's
primitive schemas.*
### Modifications
*Add Instant, LocalDate, LocalTime, LocalDateTime to pulsar's primitive
schemas*
### Verifying this change
Add Instant, LocalDate, LocalTime, LocalDateTime types to the Schema type
test
---
.github/workflows/ci-integration-schema.yaml | 7 +-
.../schema/validator/SchemaDataValidator.java | 4 +
.../src/main/proto/SchemaRegistryFormat.proto | 4 +
.../pulsar/broker/admin/AdminApiSchemaTest.java | 4 +
.../schema/validator/SchemaDataValidatorTest.java | 4 +
.../java/org/apache/pulsar/client/api/Schema.java | 21 ++++++
.../client/internal/DefaultImplementation.java | 28 +++++++
.../apache/pulsar/common/schema/SchemaType.java | 28 +++++++
.../client/impl/schema/AutoConsumeSchema.java | 8 ++
.../pulsar/client/impl/schema/InstantSchema.java | 84 +++++++++++++++++++++
.../pulsar/client/impl/schema/LocalDateSchema.java | 80 ++++++++++++++++++++
.../client/impl/schema/LocalDateTimeSchema.java | 88 ++++++++++++++++++++++
.../pulsar/client/impl/schema/LocalTimeSchema.java | 80 ++++++++++++++++++++
.../client/impl/schema/InstantSchemaTest.java | 83 ++++++++++++++++++++
.../client/impl/schema/LocalDateSchemaTest.java | 88 ++++++++++++++++++++++
.../impl/schema/LocalDateTimeSchemaTest.java | 83 ++++++++++++++++++++
.../client/impl/schema/LocalTimeSchemaTest.java | 88 ++++++++++++++++++++++
.../client/impl/schema/PrimitiveSchemaTest.java | 16 ++++
.../apache/pulsar/common/schema/KeyValueTest.java | 12 +++
.../apache/pulsar/common/api/proto/PulsarApi.java | 30 ++++++++
pulsar-common/src/main/proto/PulsarApi.proto | 4 +
.../tests/integration/schema/SchemaTest.java | 4 +
22 files changed, 847 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/ci-integration-schema.yaml
b/.github/workflows/ci-integration-schema.yaml
index 944c74f..e608af7 100644
--- a/.github/workflows/ci-integration-schema.yaml
+++ b/.github/workflows/ci-integration-schema.yaml
@@ -72,7 +72,12 @@ jobs:
- name: run install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -q -B -ntp clean install -DskipTests
-
+ - name: build pulsar image
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests
-Ddocker.nocache=true
+ - name: build pulsar-all image
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker
-DskipTests -Ddocker.nocache=true
- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker
-DskipTests
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
index 0c44612..5e5f846 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -54,6 +54,10 @@ public interface SchemaDataValidator {
case DATE:
case TIME:
case TIMESTAMP:
+ case INSTANT:
+ case LOCAL_DATE:
+ case LOCAL_TIME:
+ case LOCAL_DATE_TIME:
PrimitiveSchemaDataValidator.of().validate(schemaData);
break;
case NONE:
diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
index 9d2c756..32790b2 100644
--- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -40,6 +40,10 @@ message SchemaInfo {
TIME = 14;
TIMESTAMP = 15;
KEYVALUE = 16;
+ INSTANT = 17;
+ LOCALDATE = 18;
+ LOCALTIME = 19;
+ LOCALDATETIME = 20;
}
message KeyValuePair {
required string key = 1;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index 9cc8105..fa09b7d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -98,6 +98,10 @@ public class AdminApiSchemaTest extends
MockedPulsarServiceBaseTest {
{ Schema.DATE },
{ Schema.TIME },
{ Schema.TIMESTAMP },
+ { Schema.INSTANT },
+ { Schema.LOCAL_DATE},
+ { Schema.LOCAL_TIME},
+ { Schema.LOCAL_DATE_TIME},
{ Schema.AVRO(
SchemaDefinition.builder()
.withPojo(Foo.class)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
index 998d363..1f85655 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -50,6 +50,10 @@ public class SchemaDataValidatorTest {
{ SchemaType.DATE },
{ SchemaType.TIME },
{ SchemaType.TIMESTAMP },
+ { SchemaType.INSTANT },
+ { SchemaType.LOCAL_DATE },
+ { SchemaType.LOCAL_TIME },
+ { SchemaType.LOCAL_DATE_TIME },
};
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 0c871b3..e7e5bbe 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.client.api;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Date;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -208,6 +212,23 @@ public interface Schema<T> extends Cloneable{
*/
Schema<Timestamp> TIMESTAMP = DefaultImplementation.newTimestampSchema();
+ /**
+ * Instant Schema.
+ */
+ Schema<Instant> INSTANT = DefaultImplementation.newInstantSchema();
+ /**
+ * LocalDate Schema.
+ */
+ Schema<LocalDate> LOCAL_DATE = DefaultImplementation.newLocalDateSchema();
+ /**
+ * LocalTime Schema.
+ */
+ Schema<LocalTime> LOCAL_TIME = DefaultImplementation.newLocalTimeSchema();
+ /**
+ * LocalDateTime Schema.
+ */
+ Schema<LocalDateTime> LOCAL_DATE_TIME =
DefaultImplementation.newLocalDateTimeSchema();
+
// CHECKSTYLE.OFF: MethodName
/**
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 0171be5..589ac69 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -29,6 +29,10 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Date;
import java.util.Map;
import java.util.function.Supplier;
@@ -219,6 +223,30 @@ public class DefaultImplementation {
"org.apache.pulsar.client.impl.schema.TimestampSchema",
"of", null)
.invoke(null, null));
}
+ public static Schema<Instant> newInstantSchema() {
+ return catchExceptions(
+ () -> (Schema<Instant>) getStaticMethod(
+ "org.apache.pulsar.client.impl.schema.InstantSchema", "of",
null)
+ .invoke(null, null));
+ }
+ public static Schema<LocalDate> newLocalDateSchema() {
+ return catchExceptions(
+ () -> (Schema<LocalDate>) getStaticMethod(
+ "org.apache.pulsar.client.impl.schema.LocalDateSchema",
"of", null)
+ .invoke(null, null));
+ }
+ public static Schema<LocalTime> newLocalTimeSchema() {
+ return catchExceptions(
+ () -> (Schema<LocalTime>) getStaticMethod(
+ "org.apache.pulsar.client.impl.schema.LocalTimeSchema",
"of", null)
+ .invoke(null, null));
+ }
+ public static Schema<LocalDateTime> newLocalDateTimeSchema() {
+ return catchExceptions(
+ () -> (Schema<LocalDateTime>) getStaticMethod(
+ "org.apache.pulsar.client.impl.schema.LocalDateTimeSchema",
"of", null)
+ .invoke(null, null));
+ }
public static <T> Schema<T> newAvroSchema(SchemaDefinition
schemaDefinition) {
return catchExceptions(
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index dd8fc4a..3b16e73 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -114,6 +114,26 @@ public enum SchemaType {
*/
KEY_VALUE(15),
+ /**
+ * Instant.
+ */
+ INSTANT(16),
+
+ /**
+ * LocalDate.
+ */
+ LOCAL_DATE(17),
+
+ /**
+ * LocalTime.
+ */
+ LOCAL_TIME(18),
+
+ /**
+ * LocalDateTime.
+ */
+ LOCAL_DATE_TIME(19),
+
//
// Schemas that don't have schema info. the value should be negative.
//
@@ -167,6 +187,10 @@ public enum SchemaType {
case 13: return TIME;
case 14: return TIMESTAMP;
case 15: return KEY_VALUE;
+ case 16: return INSTANT;
+ case 17: return LOCAL_DATE;
+ case 18: return LOCAL_TIME;
+ case 19: return LOCAL_DATE_TIME;
case -1: return BYTES;
case -2: return AUTO;
case -3: return AUTO_CONSUME;
@@ -198,6 +222,10 @@ public enum SchemaType {
case TIME:
case TIMESTAMP:
case BYTES:
+ case INSTANT:
+ case LOCAL_DATE:
+ case LOCAL_TIME:
+ case LOCAL_DATE_TIME:
case NONE:
return true;
default:
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 049b0f5..fb3ac59 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -182,6 +182,14 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
return TimeSchema.of();
case TIMESTAMP:
return TimestampSchema.of();
+ case INSTANT:
+ return InstantSchema.of();
+ case LOCAL_DATE:
+ return LocalDateSchema.of();
+ case LOCAL_TIME:
+ return LocalTimeSchema.of();
+ case LOCAL_DATE_TIME:
+ return LocalDateTimeSchema.of();
case JSON:
case AVRO:
return GenericSchemaImpl.of(schemaInfo);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
new file mode 100644
index 0000000..5830cea
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+
+/**
+ * A schema for `java.time.Instant`.
+ */
+public class InstantSchema extends AbstractSchema<Instant> {
+
+ private static final InstantSchema INSTANCE;
+ private static final SchemaInfo SCHEMA_INFO;
+
+ static {
+ SCHEMA_INFO = new SchemaInfo()
+ .setName("Instant")
+ .setType(SchemaType.INSTANT)
+ .setSchema(new byte[0]);
+ INSTANCE = new InstantSchema();
+ }
+
+ public static InstantSchema of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public byte[] encode(Instant message) {
+ if (null == message) {
+ return null;
+ }
+ // Instant is accurate to nanoseconds and requires two value storage.
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+ buffer.putLong(message.getEpochSecond());
+ buffer.putInt(message.getNano());
+ return buffer.array();
+ }
+
+ @Override
+ public Instant decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ long epochSecond = buffer.getLong();
+ int nanos = buffer.getInt();
+ return Instant.ofEpochSecond(epochSecond, nanos);
+ }
+
+ @Override
+ public Instant decode(ByteBuf byteBuf) {
+ if (null == byteBuf) {
+ return null;
+ }
+ long epochSecond = byteBuf.readLong();
+ int nanos = byteBuf.readInt();
+ return Instant.ofEpochSecond(epochSecond, nanos);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
new file mode 100644
index 0000000..add6fd2
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import java.time.LocalDate;
+
+/**
+ * A schema for `java.time.LocalDate`.
+ */
+public class LocalDateSchema extends AbstractSchema<LocalDate> {
+
+ private static final LocalDateSchema INSTANCE;
+ private static final SchemaInfo SCHEMA_INFO;
+
+ static {
+ SCHEMA_INFO = new SchemaInfo()
+ .setName("LocalDate")
+ .setType(SchemaType.LOCAL_DATE)
+ .setSchema(new byte[0]);
+ INSTANCE = new LocalDateSchema();
+ }
+
+ public static LocalDateSchema of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public byte[] encode(LocalDate message) {
+ if (null == message) {
+ return null;
+ }
+
+ Long epochDay = message.toEpochDay();
+ return LongSchema.of().encode(epochDay);
+ }
+
+ @Override
+ public LocalDate decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+
+ Long decode = LongSchema.of().decode(bytes);
+ return LocalDate.ofEpochDay(decode);
+ }
+
+ @Override
+ public LocalDate decode(ByteBuf byteBuf) {
+ if (null == byteBuf) {
+ return null;
+ }
+
+ Long decode = LongSchema.of().decode(byteBuf);
+ return LocalDate.ofEpochDay(decode);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
new file mode 100644
index 0000000..8a6c4fb
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+/**
+ * A schema for `java.time.LocalDateTime`.
+ */
+public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
+
+ private static final LocalDateTimeSchema INSTANCE;
+ private static final SchemaInfo SCHEMA_INFO;
+ public static final String DELIMITER = ":";
+
+ static {
+ SCHEMA_INFO = new SchemaInfo()
+ .setName("LocalDateTime")
+ .setType(SchemaType.LOCAL_DATE_TIME)
+ .setSchema(new byte[0]);
+ INSTANCE = new LocalDateTimeSchema();
+ }
+
+ public static LocalDateTimeSchema of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public byte[] encode(LocalDateTime message) {
+ if (null == message) {
+ return null;
+ }
+ //LocalDateTime is accurate to nanoseconds and requires two value
storage.
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2);
+ buffer.putLong(message.toLocalDate().toEpochDay());
+ buffer.putLong(message.toLocalTime().toNanoOfDay());
+ return buffer.array();
+ }
+
+ @Override
+ public LocalDateTime decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ long epochDay = buffer.getLong();
+ long nanoOfDay = buffer.getLong();
+ return LocalDateTime.of(LocalDate.ofEpochDay(epochDay),
LocalTime.ofNanoOfDay(nanoOfDay));
+ }
+
+ @Override
+ public LocalDateTime decode(ByteBuf byteBuf) {
+ if (null == byteBuf) {
+ return null;
+ }
+ long epochDay = byteBuf.readLong();
+ long nanoOfDay = byteBuf.readLong();
+ return LocalDateTime.of(LocalDate.ofEpochDay(epochDay),
LocalTime.ofNanoOfDay(nanoOfDay));
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
new file mode 100644
index 0000000..6e2bf62
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import java.time.LocalTime;
+
+/**
+ * A schema for `java.time.LocalTime`.
+ */
+public class LocalTimeSchema extends AbstractSchema<LocalTime> {
+
+ private static final LocalTimeSchema INSTANCE;
+ private static final SchemaInfo SCHEMA_INFO;
+
+ static {
+ SCHEMA_INFO = new SchemaInfo()
+ .setName("LocalTime")
+ .setType(SchemaType.LOCAL_TIME)
+ .setSchema(new byte[0]);
+ INSTANCE = new LocalTimeSchema();
+ }
+
+ public static LocalTimeSchema of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public byte[] encode(LocalTime message) {
+ if (null == message) {
+ return null;
+ }
+
+ Long nanoOfDay = message.toNanoOfDay();
+ return LongSchema.of().encode(nanoOfDay);
+ }
+
+ @Override
+ public LocalTime decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+
+ Long decode = LongSchema.of().decode(bytes);
+ return LocalTime.ofNanoOfDay(decode);
+ }
+
+ @Override
+ public LocalTime decode(ByteBuf byteBuf) {
+ if (null == byteBuf) {
+ return null;
+ }
+
+ Long decode = LongSchema.of().decode(byteBuf);
+ return LocalTime.ofNanoOfDay(decode);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java
new file mode 100644
index 0000000..ca7856a
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+
+public class InstantSchemaTest {
+
+ @Test
+ public void testSchemaEncode() {
+ InstantSchema schema = InstantSchema.of();
+ Instant instant = Instant.now();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES +
Integer.BYTES);
+ byteBuffer.putLong(instant.getEpochSecond());
+ byteBuffer.putInt(instant.getNano());
+ byte[] expected = byteBuffer.array();
+ Assert.assertEquals(expected, schema.encode(instant));
+ }
+
+ @Test
+ public void testSchemaEncodeDecodeFidelity() {
+ InstantSchema schema = InstantSchema.of();
+ Instant instant = Instant.now();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES +
Integer.BYTES);
+ byte[] bytes = schema.encode(instant);
+ byteBuf.writeBytes(bytes);
+ Assert.assertEquals(instant, schema.decode(bytes));
+ Assert.assertEquals(instant, schema.decode(byteBuf));
+ }
+
+ @Test
+ public void testSchemaDecode() {
+ Instant instant = Instant.now();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES +
Integer.BYTES);
+ byteBuffer.putLong(instant.getEpochSecond());
+ byteBuffer.putInt(instant.getNano());
+ byte[] byteData = byteBuffer.array();
+ long epochSecond = instant.getEpochSecond();
+ long nano = instant.getNano();
+
+ InstantSchema schema = InstantSchema.of();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES +
Integer.BYTES);
+ byteBuf.writeBytes(byteData);
+ Instant decode = schema.decode(byteData);
+ Assert.assertEquals(epochSecond, decode.getEpochSecond());
+ Assert.assertEquals(nano, decode.getNano());
+ decode = schema.decode(byteBuf);
+ Assert.assertEquals(epochSecond, decode.getEpochSecond());
+ Assert.assertEquals(nano, decode.getNano());
+ }
+
+ @Test
+ public void testNullEncodeDecode() {
+ ByteBuf byteBuf = null;
+ byte[] bytes = null;
+
+ Assert.assertNull(InstantSchema.of().encode(null));
+ Assert.assertNull(InstantSchema.of().decode(byteBuf));
+ Assert.assertNull(InstantSchema.of().decode(bytes));
+ }
+
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java
new file mode 100644
index 0000000..31b48b2
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.time.LocalDate;
+
+public class LocalDateSchemaTest {
+
+ @Test
+ public void testSchemaEncode() {
+ LocalDateSchema schema = LocalDateSchema.of();
+ LocalDate localDate = LocalDate.now();
+ byte[] expected = new byte[] {
+ (byte) (localDate.toEpochDay() >>> 56),
+ (byte) (localDate.toEpochDay() >>> 48),
+ (byte) (localDate.toEpochDay() >>> 40),
+ (byte) (localDate.toEpochDay() >>> 32),
+ (byte) (localDate.toEpochDay() >>> 24),
+ (byte) (localDate.toEpochDay() >>> 16),
+ (byte) (localDate.toEpochDay() >>> 8),
+ ((Long)localDate.toEpochDay()).byteValue()
+ };
+ Assert.assertEquals(expected, schema.encode(localDate));
+ }
+
+ @Test
+ public void testSchemaEncodeDecodeFidelity() {
+ LocalDateSchema schema = LocalDateSchema.of();
+ LocalDate localDate = LocalDate.now();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+ byte[] bytes = schema.encode(localDate);
+ byteBuf.writeBytes(bytes);
+ Assert.assertEquals(localDate, schema.decode(bytes));
+ Assert.assertEquals(localDate, schema.decode(byteBuf));
+ }
+
+ @Test
+ public void testSchemaDecode() {
+ byte[] byteData = new byte[] {
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 10,
+ 24,
+ 42
+ };
+ long expected = 10*65536 + 24*256 + 42;
+
+ LocalDateSchema schema = LocalDateSchema.of();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+ byteBuf.writeBytes(byteData);
+ Assert.assertEquals(expected, schema.decode(byteData).toEpochDay());
+ Assert.assertEquals(expected, schema.decode(byteBuf).toEpochDay());
+ }
+
+ @Test
+ public void testNullEncodeDecode() {
+ ByteBuf byteBuf = null;
+ byte[] bytes = null;
+
+ Assert.assertNull(LocalDateSchema.of().encode(null));
+ Assert.assertNull(LocalDateSchema.of().decode(byteBuf));
+ Assert.assertNull(LocalDateSchema.of().decode(bytes));
+ }
+
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java
new file mode 100644
index 0000000..1214560
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+
+public class LocalDateTimeSchemaTest {
+
+ @Test
+ public void testSchemaEncode() {
+ LocalDateTimeSchema schema = LocalDateTimeSchema.of();
+ LocalDateTime localDateTime = LocalDateTime.now();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2);
+ byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay());
+ byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay());
+ byte[] expected = byteBuffer.array();
+ Assert.assertEquals(expected, schema.encode(localDateTime));
+ }
+
+ @Test
+ public void testSchemaEncodeDecodeFidelity() {
+ LocalDateTimeSchema schema = LocalDateTimeSchema.of();
+ LocalDateTime localDateTime = LocalDateTime.now();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2);
+ byte[] bytes = schema.encode(localDateTime);
+ byteBuf.writeBytes(bytes);
+ Assert.assertEquals(localDateTime, schema.decode(bytes));
+ Assert.assertEquals(localDateTime, schema.decode(byteBuf));
+ }
+
+ @Test
+ public void testSchemaDecode() {
+ LocalDateTime localDateTime = LocalDateTime.of(2020, 8, 22, 2, 0, 0);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2);
+ byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay());
+ byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay());
+ byte[] byteData = byteBuffer.array();
+ long expectedEpochDay = localDateTime.toLocalDate().toEpochDay();
+ long expectedNanoOfDay = localDateTime.toLocalTime().toNanoOfDay();
+
+ LocalDateTimeSchema schema = LocalDateTimeSchema.of();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2);
+ byteBuf.writeBytes(byteData);
+ LocalDateTime decode = schema.decode(byteData);
+ Assert.assertEquals(expectedEpochDay,
decode.toLocalDate().toEpochDay());
+ Assert.assertEquals(expectedNanoOfDay,
decode.toLocalTime().toNanoOfDay());
+ decode = schema.decode(byteBuf);
+ Assert.assertEquals(expectedEpochDay,
decode.toLocalDate().toEpochDay());
+ Assert.assertEquals(expectedNanoOfDay,
decode.toLocalTime().toNanoOfDay());
+ }
+
+ @Test
+ public void testNullEncodeDecode() {
+ ByteBuf byteBuf = null;
+ byte[] bytes = null;
+
+ Assert.assertNull(LocalDateSchema.of().encode(null));
+ Assert.assertNull(LocalDateSchema.of().decode(byteBuf));
+ Assert.assertNull(LocalDateSchema.of().decode(bytes));
+ }
+
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java
new file mode 100644
index 0000000..ed729de
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.time.LocalTime;
+
+public class LocalTimeSchemaTest {
+
+ @Test
+ public void testSchemaEncode() {
+ LocalTimeSchema schema = LocalTimeSchema.of();
+ LocalTime localTime = LocalTime.now();
+ byte[] expected = new byte[] {
+ (byte) (localTime.toNanoOfDay() >>> 56),
+ (byte) (localTime.toNanoOfDay() >>> 48),
+ (byte) (localTime.toNanoOfDay() >>> 40),
+ (byte) (localTime.toNanoOfDay() >>> 32),
+ (byte) (localTime.toNanoOfDay() >>> 24),
+ (byte) (localTime.toNanoOfDay() >>> 16),
+ (byte) (localTime.toNanoOfDay() >>> 8),
+ ((Long)localTime.toNanoOfDay()).byteValue()
+ };
+ Assert.assertEquals(expected, schema.encode(localTime));
+ }
+
+ @Test
+ public void testSchemaEncodeDecodeFidelity() {
+ LocalTimeSchema schema = LocalTimeSchema.of();
+ LocalTime localTime = LocalTime.now();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+ byte[] bytes = schema.encode(localTime);
+ byteBuf.writeBytes(bytes);
+ Assert.assertEquals(localTime, schema.decode(bytes));
+ Assert.assertEquals(localTime, schema.decode(byteBuf));
+ }
+
+ @Test
+ public void testSchemaDecode() {
+ byte[] byteData = new byte[] {
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 10,
+ 24,
+ 42
+ };
+ long expected = 10*65536 + 24*256 + 42;
+
+ LocalTimeSchema schema = LocalTimeSchema.of();
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+ byteBuf.writeBytes(byteData);
+ Assert.assertEquals(expected, schema.decode(byteData).toNanoOfDay());
+ Assert.assertEquals(expected, schema.decode(byteBuf).toNanoOfDay());
+ }
+
+ @Test
+ public void testNullEncodeDecode() {
+ ByteBuf byteBuf = null;
+ byte[] bytes = null;
+
+ Assert.assertNull(LocalTimeSchema.of().encode(null));
+ Assert.assertNull(LocalTimeSchema.of().decode(byteBuf));
+ Assert.assertNull(LocalTimeSchema.of().decode(bytes));
+ }
+
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
index dceebbd..a2efb25 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
@@ -28,6 +28,10 @@ import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -60,6 +64,10 @@ public class PrimitiveSchemaTest {
put(DateSchema.of(), Arrays.asList(new Date(new
java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
put(TimeSchema.of(), Arrays.asList(new Time(new
java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
put(TimestampSchema.of(), Arrays.asList(new Timestamp(new
java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
+ put(InstantSchema.of(), Arrays.asList(Instant.now(),
Instant.now().minusSeconds(60*23L)));
+ put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(),
LocalDate.now().minusDays(2)));
+ put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(),
LocalTime.now().minusHours(2)));
+ put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(),
LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
}
};
@@ -78,6 +86,10 @@ public class PrimitiveSchemaTest {
put(Schema.DATE, Arrays.asList(new Date(new
java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
put(Schema.TIME, Arrays.asList(new Time(new
java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
put(Schema.TIMESTAMP, Arrays.asList(new Timestamp(new
java.util.Date().getTime() - 10000), new Timestamp(new
java.util.Date().getTime())));
+ put(Schema.INSTANT, Arrays.asList(Instant.now(),
Instant.now().minusSeconds(60*23L)));
+ put(Schema.LOCAL_DATE, Arrays.asList(LocalDate.now(),
LocalDate.now().minusDays(2)));
+ put(Schema.LOCAL_TIME, Arrays.asList(LocalTime.now(),
LocalTime.now().minusHours(2)));
+ put(Schema.LOCAL_DATE_TIME, Arrays.asList(LocalDateTime.now(),
LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
}
};
@@ -139,6 +151,10 @@ public class PrimitiveSchemaTest {
assertEquals(SchemaType.DATE,
DateSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.TIME,
TimeSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.TIMESTAMP,
TimestampSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.INSTANT,
InstantSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.LOCAL_DATE,
LocalDateSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.LOCAL_TIME,
LocalTimeSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.LOCAL_DATE_TIME,
LocalDateTimeSchema.of().getSchemaInfo().getType());
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
index 7ee270b..1011e0e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
@@ -25,6 +25,10 @@ import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
@@ -39,7 +43,11 @@ import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.client.impl.schema.DateSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.InstantSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LocalDateSchema;
+import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
+import org.apache.pulsar.client.impl.schema.LocalTimeSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
@@ -69,6 +77,10 @@ public class KeyValueTest {
put(DateSchema.of(), Arrays.asList(new Date(new
java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
put(TimeSchema.of(), Arrays.asList(new Time(new
java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
put(TimestampSchema.of(), Arrays.asList(new Timestamp(new
java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
+ put(InstantSchema.of(), Arrays.asList(Instant.now(),
Instant.now().minusSeconds(60*23L)));
+ put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(),
LocalDate.now().minusDays(2)));
+ put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(),
LocalTime.now().minusHours(2)));
+ put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(),
LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
}
};
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 4c3b885..65c4a93 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -444,6 +444,10 @@ public final class PulsarApi {
Time(13, 13),
Timestamp(14, 14),
KeyValue(15, 15),
+ Instant(16, 16),
+ LocalDate(17, 17),
+ LocalTime(18, 18),
+ LocalDateTime(19, 19),
;
public static final int None_VALUE = 0;
@@ -462,6 +466,10 @@ public final class PulsarApi {
public static final int Time_VALUE = 13;
public static final int Timestamp_VALUE = 14;
public static final int KeyValue_VALUE = 15;
+ public static final int Instant_VALUE = 16;
+ public static final int LocalDate_VALUE = 17;
+ public static final int LocalTime_VALUE = 18;
+ public static final int LocalDateTime_VALUE = 19;
public final int getNumber() { return value; }
@@ -484,6 +492,10 @@ public final class PulsarApi {
case 13: return Time;
case 14: return Timestamp;
case 15: return KeyValue;
+ case 16: return Instant;
+ case 17: return LocalDate;
+ case 18: return LocalTime;
+ case 19: return LocalDateTime;
default: return null;
}
}
@@ -1578,6 +1590,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
+ case 42: {
+ int length = input.readRawVarint32();
+ int limit = input.pushLimit(length);
+ while (input.getBytesUntilLimit() > 0) {
+ addAckSet(input.readInt64());
+ }
+ input.popLimit(limit);
+ break;
+ }
}
}
}
@@ -18860,6 +18881,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
+ case 34: {
+ int length = input.readRawVarint32();
+ int limit = input.pushLimit(length);
+ while (input.getBytesUntilLimit() > 0) {
+ addAckSet(input.readInt64());
+ }
+ input.popLimit(limit);
+ break;
+ }
}
}
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 63ff6ee..9c4b913 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -40,6 +40,10 @@ message Schema {
Time = 13;
Timestamp = 14;
KeyValue = 15;
+ Instant = 16;
+ LocalDate = 17;
+ LocalTime = 18;
+ LocalDateTime = 19;
}
required string name = 1;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index 4b0083a..6194f95 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -273,6 +273,10 @@ public class SchemaTest extends PulsarTestSuite {
schemas.add(Schema.DATE);
schemas.add(Schema.TIME);
schemas.add(Schema.TIMESTAMP);
+ schemas.add(Schema.INSTANT);
+ schemas.add(Schema.LOCAL_DATE);
+ schemas.add(Schema.LOCAL_TIME);
+ schemas.add(Schema.LOCAL_DATE_TIME);
schemas.forEach(schemaProducer -> {
schemas.forEach(schemaConsumer -> {