yashmayya commented on code in PR #14093:
URL: https://github.com/apache/kafka/pull/14093#discussion_r1291268068


##########
connect/runtime/src/test/java/org/apache/kafka/connect/converters/BooleanConverterTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.converters;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BooleanConverterTest {
+    private static final String TOPIC = "topic";
+    private static final byte[] TRUE = new byte[] {0x01};
+    private static final byte[] FALSE = new byte[] {0x00};
+    private final BooleanConverter converter = new BooleanConverter();
+
+    @Before
+    public void setUp() {
+        converter.configure(Collections.<String, String>emptyMap(), false);
+    }
+
+    @Test
+    public void testFromConnect() {
+        assertArrayEquals(
+                TRUE,
+                converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, 
Boolean.TRUE)
+        );
+        assertArrayEquals(
+            FALSE,
+            converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, 
Boolean.FALSE)
+        );
+    }
+
+
+    @Test
+    public void testFromConnectBadSchema() {
+        assertThrows(DataException.class,
+            () -> converter.fromConnectData(TOPIC, Schema.INT32_SCHEMA, 
Boolean.FALSE));
+    }
+
+    @Test
+    public void testFromConnectInvalidValue() {
+        assertThrows(DataException.class,
+            () -> converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, 
"true"));
+    }
+
+    @Test
+    public void testFromConnectNull() {
+        assertNull(converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, 
null));
+    }
+
+    @Test
+    public void testToConnect() {

Review Comment:
   Let's add a small test to cover the case where bad data is being passed to 
the `toConnectData` method as well?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverter.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.converters;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.serialization.BooleanDeserializer;
+import org.apache.kafka.common.serialization.BooleanSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+/**
+ * {@link Converter} and {@link HeaderConverter} implementation that supports 
serializing to and
+ * deserializing from Boolean values.
+ * <p>
+ * When converting from bytes to Kafka Connect format, the converter will 
always return an optional
+ * BOOLEAN schema.
+ */
+public class BooleanConverter implements Converter, HeaderConverter {
+
+    private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
+
+    private final BooleanSerializer serializer = new BooleanSerializer();
+    private final BooleanDeserializer deserializer = new BooleanDeserializer();
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        BooleanConverterConfig conf = new BooleanConverterConfig(configs);
+        boolean isKey = conf.type() == ConverterType.KEY;
+        serializer.configure(configs, isKey);
+        deserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Map<String, Object> conf = new HashMap<>(configs);
+        conf.put(ConverterConfig.TYPE_CONFIG,
+            isKey ? ConverterType.KEY.getName() : 
ConverterType.VALUE.getName());
+        configure(conf);
+    }
+
+    @Override
+    public byte[] fromConnectData(String topic, Schema schema, Object value) {
+        if (schema != null && schema.type() != Type.BOOLEAN)
+            throw new DataException("Invalid schema type for BooleanConverter: 
" + schema.type().toString());
+
+        try {
+            return serializer.serialize(topic, (Boolean) value);
+        } catch (ClassCastException e) {
+            throw new DataException("BooleanConverter is not compatible with 
objects of type " + value.getClass());
+        }
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(String topic, byte[] value) {
+        return new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA,
+            deserializer.deserialize(topic, value));

Review Comment:
   Let's catch the `SerializationException` that can be thrown here and wrap it 
in a `DataException` for consistency with other converters?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/converters/BooleanConverterConfig.java:
##########
@@ -25,13 +25,13 @@
  */
 public class BooleanConverterConfig extends ConverterConfig {

Review Comment:
   I'd note that the `BooleanSerializer` and `BooleanDeserializer` do not 
actually implement the default no-op configure method from the `Serializer` / 
`Deserializer` interface and hence don't really care about whether the field 
being (de)serialized is the key or the value of a message. However, I think 
that configuring them appropriately still makes sense since the implementation 
could change in the future to use the key / value info (even though it's 
unlikely, it's better to adhere to the expectations of the interface).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to