This is an automated email from the ASF dual-hosted git repository.
wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71ca8ef4ecd KAFKA-14722: Make BooleanSerde public (#13382)
71ca8ef4ecd is described below
commit 71ca8ef4ecd8522ac6dbf86daf4a3a9540482950
Author: Spacrocket <[email protected]>
AuthorDate: Fri Mar 24 16:41:51 2023 +0100
KAFKA-14722: Make BooleanSerde public (#13382)
KAFKA-14722: Make BooleanSerde public (#13328)
Addition of boolean serde
https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface
During the task KAFKA-14491 Victoria added BooleanSerde class, It will be
useful to have such class in public package.
Reviewers: Walker Carlson <[email protected]>, Lucas Brutschy
<[email protected]>, Divij Vaidya <[email protected]>
---
.../common/serialization/BooleanDeserializer.java | 43 ++++++++++++++++++++++
.../common/serialization/BooleanSerializer.java | 33 +++++++++++++++++
.../apache/kafka/common/serialization/Serdes.java | 19 +++++++++-
.../common/serialization/SerializationTest.java | 29 +++++++++++++++
docs/streams/developer-guide/datatypes.html | 3 ++
gradle/spotbugs-exclude.xml | 2 +-
.../NullableValueAndTimestampDeserializer.java | 2 +-
.../internals/NullableValueAndTimestampSerde.java | 41 ---------------------
.../NullableValueAndTimestampSerializer.java | 2 +-
9 files changed, 129 insertions(+), 45 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java
new file mode 100644
index 00000000000..2384fc905f5
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+public class BooleanDeserializer implements Deserializer<Boolean> {
+ private static final byte TRUE = 0x01;
+ private static final byte FALSE = 0x00;
+
+ @Override
+ public Boolean deserialize(final String topic, final byte[] data) {
+ if (data == null) {
+ return null;
+ }
+
+ if (data.length != 1) {
+ throw new SerializationException("Size of data received by
BooleanDeserializer is not 1");
+ }
+
+ if (data[0] == TRUE) {
+ return true;
+ } else if (data[0] == FALSE) {
+ return false;
+ } else {
+ throw new SerializationException("Unexpected byte received by
BooleanDeserializer: " + data[0]);
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java
new file mode 100644
index 00000000000..ad4722a8d80
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+public class BooleanSerializer implements Serializer<Boolean> {
+
+ private static final byte TRUE = 0x01;
+ private static final byte FALSE = 0x00;
+ @Override
+ public byte[] serialize(final String topic, final Boolean data) {
+ if (data == null) {
+ return null;
+ }
+
+ return new byte[] {
+ data ? TRUE : FALSE
+ };
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 4a150e0c022..cae8bd1187a 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -126,6 +126,12 @@ public class Serdes {
}
}
+ static public final class BooleanSerde extends WrapperSerde<Boolean> {
+ public BooleanSerde() {
+ super(new BooleanSerializer(), new BooleanDeserializer());
+ }
+ }
+
static public final class ListSerde<Inner> extends
WrapperSerde<List<Inner>> {
final static int NULL_ENTRY_VALUE = -1;
@@ -189,9 +195,13 @@ public class Serdes {
return (Serde<T>) UUID();
}
+ if (Boolean.class.isAssignableFrom(type)) {
+ return (Serde<T>) Boolean();
+ }
+
// TODO: we can also serializes objects of type T using generic Java
serialization by default
throw new IllegalArgumentException("Unknown class for built-in
serializer. Supported types are: " +
- "String, Short, Integer, Long, Float, Double, ByteArray,
ByteBuffer, Bytes, UUID");
+ "String, Short, Integer, Long, Float, Double, ByteArray,
ByteBuffer, Bytes, UUID, Boolean");
}
/**
@@ -274,6 +284,13 @@ public class Serdes {
return new UUIDSerde();
}
+ /**
+ * A serde for nullable {@code Boolean} type.
+ */
+ static public Serde<Boolean> Boolean() {
+ return new BooleanSerde();
+ }
+
/**
* A serde for nullable {@code byte[]} type.
*/
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 844ce500667..6607678dedf 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -389,4 +391,31 @@ public class SerializationTest {
assertArrayEquals(bytes, serializer.serialize(topic,
directBuffer1));
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void testBooleanSerializer(Boolean dataToSerialize) {
+ byte[] testData = new byte[1];
+ testData[0] = (byte) (dataToSerialize ? 1 : 0);
+
+ Serde<Boolean> booleanSerde = Serdes.Boolean();
+ assertArrayEquals(testData, booleanSerde.serializer().serialize(topic,
dataToSerialize));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void testBooleanDeserializer(Boolean dataToDeserialize) {
+ byte[] testData = new byte[1];
+ testData[0] = (byte) (dataToDeserialize ? 1 : 0);
+
+ Serde<Boolean> booleanSerde = Serdes.Boolean();
+ assertEquals(dataToDeserialize,
booleanSerde.deserializer().deserialize(topic, testData));
+ }
+
+ @Test
+ public void booleanDeserializerShouldThrowOnEmptyInput() {
+ try (Serde<Boolean> serde = Serdes.Boolean()) {
+ assertThrows(SerializationException.class, () ->
serde.deserializer().deserialize(topic, new byte[0]));
+ }
+ }
}
diff --git a/docs/streams/developer-guide/datatypes.html
b/docs/streams/developer-guide/datatypes.html
index 458b47b149c..607d133f3b2 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -144,6 +144,9 @@ userCountByRegion.to("RegionCountsTopic",
Produced.valueSerde(Serdes.L
<tr class="row-even"><td>List</td>
<td><code class="docutils literal"><span
class="pre">Serdes.ListSerde()</span></code></td>
</tr>
+ <tr class="row-even"><td>Boolean</td>
+ <td><code class="docutils literal"><span
class="pre">Serdes.Boolean()</span></code></td>
+ </tr>
</tbody>
</table>
<div class="admonition tip">
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index e040e54d306..e519909b1d0 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -490,7 +490,7 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Match>
<!-- Boolean deserializer intentionally returns null on null input. -->
- <Class
name="org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde$BooleanSerde$BooleanDeserializer"/>
+ <Class
name="org.apache.kafka.common.serialization.BooleanDeserializer"/>
<Method name="deserialize"/>
<Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
</Match>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
index 7d70708ff12..2fc4c5c7d97 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import
org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+import org.apache.kafka.common.serialization.BooleanDeserializer;
/**
* See {@link NullableValueAndTimestampSerde}.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
index 090a4daa35c..72aebee2dd0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
@@ -18,10 +18,7 @@ package org.apache.kafka.streams.state.internals;
import static java.util.Objects.requireNonNull;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -48,42 +45,4 @@ public class NullableValueAndTimestampSerde<V> extends
WrappingNullableSerde<Val
);
}
- static final class BooleanSerde {
- private static final byte TRUE = 0x01;
- private static final byte FALSE = 0x00;
-
- static class BooleanSerializer implements Serializer<Boolean> {
- @Override
- public byte[] serialize(final String topic, final Boolean data) {
- if (data == null) {
- return null;
- }
-
- return new byte[] {
- data ? TRUE : FALSE
- };
- }
- }
-
- static class BooleanDeserializer implements Deserializer<Boolean> {
- @Override
- public Boolean deserialize(final String topic, final byte[] data) {
- if (data == null) {
- return null;
- }
-
- if (data.length != 1) {
- throw new SerializationException("Size of data received by
BooleanDeserializer is not 1");
- }
-
- if (data[0] == TRUE) {
- return true;
- } else if (data[0] == FALSE) {
- return false;
- } else {
- throw new SerializationException("Unexpected byte received
by BooleanDeserializer: " + data[0]);
- }
- }
- }
- }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
index 93eaca0ff21..d1d9bf2a8d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import
org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+import org.apache.kafka.common.serialization.BooleanSerializer;
/**
* See {@link NullableValueAndTimestampSerde}.