This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71ca8ef4ecd KAFKA-14722: Make BooleanSerde public (#13382)
71ca8ef4ecd is described below

commit 71ca8ef4ecd8522ac6dbf86daf4a3a9540482950
Author: Spacrocket <[email protected]>
AuthorDate: Fri Mar 24 16:41:51 2023 +0100

    KAFKA-14722: Make BooleanSerde public (#13382)
    
    KAFKA-14722: Make BooleanSerde public (#13328)
    
    Addition of boolean serde
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface
    
    During the task KAFKA-14491 Victoria added BooleanSerde class, It will be 
useful to have such class in public package.
    
    Reviewers: Walker Carlson <[email protected]>, Lucas Brutschy 
<[email protected]>, Divij Vaidya <[email protected]>
---
 .../common/serialization/BooleanDeserializer.java  | 43 ++++++++++++++++++++++
 .../common/serialization/BooleanSerializer.java    | 33 +++++++++++++++++
 .../apache/kafka/common/serialization/Serdes.java  | 19 +++++++++-
 .../common/serialization/SerializationTest.java    | 29 +++++++++++++++
 docs/streams/developer-guide/datatypes.html        |  3 ++
 gradle/spotbugs-exclude.xml                        |  2 +-
 .../NullableValueAndTimestampDeserializer.java     |  2 +-
 .../internals/NullableValueAndTimestampSerde.java  | 41 ---------------------
 .../NullableValueAndTimestampSerializer.java       |  2 +-
 9 files changed, 129 insertions(+), 45 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java
new file mode 100644
index 00000000000..2384fc905f5
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+public class BooleanDeserializer implements Deserializer<Boolean> {
+    private static final byte TRUE = 0x01;
+    private static final byte FALSE = 0x00;
+
+    @Override
+    public Boolean deserialize(final String topic, final byte[] data) {
+        if (data == null) {
+            return null;
+        }
+
+        if (data.length != 1) {
+            throw new SerializationException("Size of data received by 
BooleanDeserializer is not 1");
+        }
+
+        if (data[0] == TRUE) {
+            return true;
+        } else if (data[0] == FALSE) {
+            return false;
+        } else {
+            throw new SerializationException("Unexpected byte received by 
BooleanDeserializer: " + data[0]);
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java
new file mode 100644
index 00000000000..ad4722a8d80
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+public class BooleanSerializer implements Serializer<Boolean> {
+
+    private static final byte TRUE = 0x01;
+    private static final byte FALSE = 0x00;
+    @Override
+    public byte[] serialize(final String topic, final Boolean data) {
+        if (data == null) {
+            return null;
+        }
+
+        return new byte[] {
+            data ? TRUE : FALSE
+        };
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 4a150e0c022..cae8bd1187a 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -126,6 +126,12 @@ public class Serdes {
         }
     }
 
+    static public final class BooleanSerde extends WrapperSerde<Boolean> {
+        public BooleanSerde() {
+            super(new BooleanSerializer(), new BooleanDeserializer());
+        }
+    }
+
     static public final class ListSerde<Inner> extends 
WrapperSerde<List<Inner>> {
 
         final static int NULL_ENTRY_VALUE = -1;
@@ -189,9 +195,13 @@ public class Serdes {
             return (Serde<T>) UUID();
         }
 
+        if (Boolean.class.isAssignableFrom(type)) {
+            return (Serde<T>) Boolean();
+        }
+
         // TODO: we can also serializes objects of type T using generic Java 
serialization by default
         throw new IllegalArgumentException("Unknown class for built-in 
serializer. Supported types are: " +
-            "String, Short, Integer, Long, Float, Double, ByteArray, 
ByteBuffer, Bytes, UUID");
+            "String, Short, Integer, Long, Float, Double, ByteArray, 
ByteBuffer, Bytes, UUID, Boolean");
     }
 
     /**
@@ -274,6 +284,13 @@ public class Serdes {
         return new UUIDSerde();
     }
 
+    /**
+     * A serde for nullable {@code Boolean} type.
+     */
+    static public Serde<Boolean> Boolean() {
+        return new BooleanSerde();
+    }
+
     /**
      * A serde for nullable {@code byte[]} type.
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 844ce500667..6607678dedf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.serialization;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -389,4 +391,31 @@ public class SerializationTest {
             assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer1));
         }
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testBooleanSerializer(Boolean dataToSerialize) {
+        byte[] testData = new byte[1];
+        testData[0] = (byte) (dataToSerialize ? 1 : 0);
+
+        Serde<Boolean> booleanSerde = Serdes.Boolean();
+        assertArrayEquals(testData, booleanSerde.serializer().serialize(topic, 
dataToSerialize));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testBooleanDeserializer(Boolean dataToDeserialize) {
+        byte[] testData = new byte[1];
+        testData[0] = (byte) (dataToDeserialize ? 1 : 0);
+
+        Serde<Boolean> booleanSerde = Serdes.Boolean();
+        assertEquals(dataToDeserialize, 
booleanSerde.deserializer().deserialize(topic, testData));
+    }
+
+    @Test
+    public void booleanDeserializerShouldThrowOnEmptyInput() {
+        try (Serde<Boolean> serde = Serdes.Boolean()) {
+            assertThrows(SerializationException.class, () -> 
serde.deserializer().deserialize(topic, new byte[0]));
+        }
+    }
 }
diff --git a/docs/streams/developer-guide/datatypes.html 
b/docs/streams/developer-guide/datatypes.html
index 458b47b149c..607d133f3b2 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -144,6 +144,9 @@ userCountByRegion.to(&quot;RegionCountsTopic&quot;, 
Produced.valueSerde(Serdes.L
           <tr class="row-even"><td>List</td>
             <td><code class="docutils literal"><span 
class="pre">Serdes.ListSerde()</span></code></td>
           </tr>
+          <tr class="row-even"><td>Boolean</td>
+              <td><code class="docutils literal"><span 
class="pre">Serdes.Boolean()</span></code></td>
+          </tr>
           </tbody>
         </table>
         <div class="admonition tip">
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index e040e54d306..e519909b1d0 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -490,7 +490,7 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
     <Match>
         <!-- Boolean deserializer intentionally returns null on null input. -->
-        <Class 
name="org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde$BooleanSerde$BooleanDeserializer"/>
+        <Class 
name="org.apache.kafka.common.serialization.BooleanDeserializer"/>
         <Method name="deserialize"/>
         <Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
     </Match>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
index 7d70708ff12..2fc4c5c7d97 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
-import 
org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+import org.apache.kafka.common.serialization.BooleanDeserializer;
 
 /**
  * See {@link NullableValueAndTimestampSerde}.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
index 090a4daa35c..72aebee2dd0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
@@ -18,10 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import static java.util.Objects.requireNonNull;
 
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
@@ -48,42 +45,4 @@ public class NullableValueAndTimestampSerde<V> extends 
WrappingNullableSerde<Val
         );
     }
 
-    static final class BooleanSerde {
-        private static final byte TRUE = 0x01;
-        private static final byte FALSE = 0x00;
-
-        static class BooleanSerializer implements Serializer<Boolean> {
-            @Override
-            public byte[] serialize(final String topic, final Boolean data) {
-                if (data == null) {
-                    return null;
-                }
-
-                return new byte[] {
-                    data ? TRUE : FALSE
-                };
-            }
-        }
-
-        static class BooleanDeserializer implements Deserializer<Boolean> {
-            @Override
-            public Boolean deserialize(final String topic, final byte[] data) {
-                if (data == null) {
-                    return null;
-                }
-
-                if (data.length != 1) {
-                    throw new SerializationException("Size of data received by 
BooleanDeserializer is not 1");
-                }
-
-                if (data[0] == TRUE) {
-                    return true;
-                } else if (data[0] == FALSE) {
-                    return false;
-                } else {
-                    throw new SerializationException("Unexpected byte received 
by BooleanDeserializer: " + data[0]);
-                }
-            }
-        }
-    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
index 93eaca0ff21..d1d9bf2a8d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
-import 
org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+import org.apache.kafka.common.serialization.BooleanSerializer;
 
 /**
  * See {@link NullableValueAndTimestampSerde}.

Reply via email to