[
https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499756#comment-16499756
]
ASF GitHub Bot commented on KAFKA-6161:
---------------------------------------
guozhangwang closed pull request #4175: KAFKA-6161 add base classes for
(De)Serializers with empty conf methods
URL: https://github.com/apache/kafka/pull/4175
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
index 267211576b6..1c16b0bfc42 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
@@ -16,22 +16,10 @@
*/
package org.apache.kafka.common.serialization;
-import java.util.Map;
-
-public class ByteArrayDeserializer implements Deserializer<byte[]> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class ByteArrayDeserializer extends NoConfDeserializer<byte[]> {
@Override
public byte[] deserialize(String topic, byte[] data) {
return data;
}
-
- @Override
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
index d069e9495e6..f5057b362a3 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
@@ -16,22 +16,10 @@
*/
package org.apache.kafka.common.serialization;
-import java.util.Map;
-
-public class ByteArraySerializer implements Serializer<byte[]> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class ByteArraySerializer extends NoConfSerializer<byte[]> {
@Override
public byte[] serialize(String topic, byte[] data) {
return data;
}
-
- @Override
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
index d41f03c6675..ad22796c07f 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
@@ -17,13 +17,8 @@
package org.apache.kafka.common.serialization;
import java.nio.ByteBuffer;
-import java.util.Map;
-public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class ByteBufferDeserializer extends NoConfDeserializer<ByteBuffer> {
public ByteBuffer deserialize(String topic, byte[] data) {
if (data == null)
@@ -31,8 +26,4 @@ public ByteBuffer deserialize(String topic, byte[] data) {
return ByteBuffer.wrap(data);
}
-
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
index c8c369272dd..ee3689a8a88 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -17,13 +17,8 @@
package org.apache.kafka.common.serialization;
import java.nio.ByteBuffer;
-import java.util.Map;
-public class ByteBufferSerializer implements Serializer<ByteBuffer> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class ByteBufferSerializer extends NoConfSerializer<ByteBuffer> {
public byte[] serialize(String topic, ByteBuffer data) {
if (data == null)
@@ -43,8 +38,4 @@ public void configure(Map<String, ?> configs, boolean isKey) {
data.rewind();
return ret;
}
-
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
index 66b07eb5841..b912ef5b40c 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
@@ -18,13 +18,7 @@
import org.apache.kafka.common.utils.Bytes;
-import java.util.Map;
-
-public class BytesDeserializer implements Deserializer<Bytes> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class BytesDeserializer extends NoConfDeserializer<Bytes> {
public Bytes deserialize(String topic, byte[] data) {
if (data == null)
@@ -32,8 +26,4 @@ public Bytes deserialize(String topic, byte[] data) {
return new Bytes(data);
}
-
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
index 0dc4476d46d..02e2da8bd4e 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
@@ -18,13 +18,7 @@
import org.apache.kafka.common.utils.Bytes;
-import java.util.Map;
-
-public class BytesSerializer implements Serializer<Bytes> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class BytesSerializer extends NoConfSerializer<Bytes> {
public byte[] serialize(String topic, Bytes data) {
if (data == null)
@@ -32,9 +26,5 @@ public void configure(Map<String, ?> configs, boolean isKey) {
return data.get();
}
-
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
index 24f6007cb35..2ff3382feb3 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
@@ -18,14 +18,7 @@
import org.apache.kafka.common.errors.SerializationException;
-import java.util.Map;
-
-public class DoubleDeserializer implements Deserializer<Double> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class DoubleDeserializer extends NoConfDeserializer<Double> {
@Override
public Double deserialize(String topic, byte[] data) {
@@ -42,9 +35,4 @@ public Double deserialize(String topic, byte[] data) {
}
return Double.longBitsToDouble(value);
}
-
- @Override
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
index 7dd4edc3b62..1370e6e3e14 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
@@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
-import java.util.Map;
-
-public class DoubleSerializer implements Serializer<Double> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
+public class DoubleSerializer extends NoConfSerializer<Double> {
@Override
public byte[] serialize(String topic, Double data) {
@@ -42,9 +35,4 @@ public void configure(Map<String, ?> configs, boolean isKey) {
(byte) bits
};
}
-
- @Override
- public void close() {
- // nothing to do
- }
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedNoConfDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedNoConfDeserializer.java
new file mode 100644
index 00000000000..1136a2f4701
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedNoConfDeserializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+/**
+ * An extended Deserializer with empty {@code configure()} and {@code close()}
methods.
+ *
+ * Prefer {@link ExtendedDeserializer} if both {@code configure()} and {@code
close()}
+ * methods are needed to be non-empty.
+ *
+ * Once Kafka drops support for Java 7, the {@code configure()} and
+ * {@code close()} methods will be implemented as default empty methods in
+ * {@link Deserializer} so that backwards compatibility is maintained. This
class
+ * may be deprecated once that happens.
+ */
+public abstract class ExtendedNoConfDeserializer<T> extends
NoConfDeserializer<T> implements ExtendedDeserializer<T> { }
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedNoConfSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedNoConfSerializer.java
new file mode 100644
index 00000000000..85cdb1e9c86
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedNoConfSerializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+/**
+ * An extended Serializer with empty {@code configure()} and {@code close()}
methods.
+ *
+ * Prefer {@link ExtendedSerializer} if both {@code configure()} and {@code
close()}
+ * methods are needed to be non-empty.
+ *
+ * Once Kafka drops support for Java 7, the {@code configure()} and
+ * {@code close()} methods will be implemented as default empty methods in
+ * {@link Serializer} so that backwards compatibility is maintained. This class
+ * may be deprecated once that happens.
+ */
+public abstract class ExtendedNoConfSerializer<T> extends NoConfSerializer<T>
implements ExtendedSerializer<T> { }
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
index 3834ce20b07..57fe0d24f2d 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
@@ -18,14 +18,7 @@
import org.apache.kafka.common.errors.SerializationException;
-import java.util.Map;
-
-public class FloatDeserializer implements Deserializer<Float> {
-
- @Override
- public void configure(final Map<String, ?> configs, final boolean isKey) {
- // nothing to do
- }
+public class FloatDeserializer extends NoConfDeserializer<Float> {
@Override
public Float deserialize(final String topic, final byte[] data) {
@@ -42,10 +35,4 @@ public Float deserialize(final String topic, final byte[]
data) {
}
return Float.intBitsToFloat(value);
}
-
- @Override
- public void close() {
- // nothing to do
- }
-
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
index 6eb766dcd42..8ea701ab99b 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
@@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
-import java.util.Map;
-
-public class FloatSerializer implements Serializer<Float> {
-
- @Override
- public void configure(final Map<String, ?> configs, final boolean isKey) {
- // nothing to do
- }
+public class FloatSerializer extends NoConfSerializer<Float> {
@Override
public byte[] serialize(final String topic, final Float data) {
@@ -38,9 +31,4 @@ public void configure(final Map<String, ?> configs, final
boolean isKey) {
(byte) bits
};
}
-
- @Override
- public void close() {
- // nothing to do
- }
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
index 45f8cf18fd4..7cac468a716 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
@@ -18,14 +18,7 @@
import org.apache.kafka.common.errors.SerializationException;
-import java.util.Map;
-
-public class IntegerDeserializer implements Deserializer<Integer> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
+public class IntegerDeserializer extends NoConfDeserializer<Integer> {
public Integer deserialize(String topic, byte[] data) {
if (data == null)
return null;
@@ -40,8 +33,4 @@ public Integer deserialize(String topic, byte[] data) {
}
return value;
}
-
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
index f2144ceee70..3a4dd40464d 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
@@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
-import java.util.Map;
-
-public class IntegerSerializer implements Serializer<Integer> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
+public class IntegerSerializer extends NoConfSerializer<Integer> {
public byte[] serialize(String topic, Integer data) {
if (data == null)
return null;
@@ -35,8 +28,4 @@ public void configure(Map<String, ?> configs, boolean isKey) {
data.byteValue()
};
}
-
- public void close() {
- // nothing to do
- }
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
index a58b1d38cf3..7ef36ec59ae 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
@@ -18,14 +18,7 @@
import org.apache.kafka.common.errors.SerializationException;
-import java.util.Map;
-
-public class LongDeserializer implements Deserializer<Long> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
+public class LongDeserializer extends NoConfDeserializer<Long> {
public Long deserialize(String topic, byte[] data) {
if (data == null)
return null;
@@ -40,8 +33,4 @@ public Long deserialize(String topic, byte[] data) {
}
return value;
}
-
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
index d37842c3914..1c1a674eebb 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
@@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
-import java.util.Map;
-
-public class LongSerializer implements Serializer<Long> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
+public class LongSerializer extends NoConfSerializer<Long> {
public byte[] serialize(String topic, Long data) {
if (data == null)
return null;
@@ -39,8 +32,4 @@ public void configure(Map<String, ?> configs, boolean isKey) {
data.byteValue()
};
}
-
- public void close() {
- // nothing to do
- }
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/NoConfDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/NoConfDeserializer.java
new file mode 100644
index 00000000000..3d8d97ee225
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/NoConfDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+/**
+ * A Deserializer with empty {@code configure()} and {@code close()} methods.
+ *
+ * Prefer {@link Deserializer} if both {@code configure()} and {@code close()}
+ * methods are needed to be non-empty.
+ *
+ * Once Kafka drops support for Java 7, the {@code configure()} and
+ * {@code close()} methods will be implemented as default empty methods in
+ * {@link Deserializer} so that backwards compatibility is maintained. This
class
+ * may be deprecated once that happens.
+ */
+public abstract class NoConfDeserializer<T> implements Deserializer<T> {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/NoConfSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/NoConfSerializer.java
new file mode 100644
index 00000000000..ca8aeed8aa6
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/NoConfSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+/**
+ * A Serializer with empty {@code configure()} and {@code close()} methods.
+ *
+ * Prefer {@link Serializer} if both {@code configure()} and {@code close()}
+ * methods are needed to be non-empty.
+ *
+ * Once Kafka drops support for Java 7, the {@code configure()} and
+ * {@code close()} methods will be implemented as default empty methods in
+ * {@link Serializer} so that backwards compatibility is maintained. This class
+ * may be deprecated once that happens.
+ */
+public abstract class NoConfSerializer<T> implements Serializer<T> {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
index 45aa8ae7ae3..81276bb099a 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
@@ -18,14 +18,7 @@
import org.apache.kafka.common.errors.SerializationException;
-import java.util.Map;
-
-public class ShortDeserializer implements Deserializer<Short> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
+public class ShortDeserializer extends NoConfDeserializer<Short> {
public Short deserialize(String topic, byte[] data) {
if (data == null)
return null;
@@ -40,8 +33,4 @@ public Short deserialize(String topic, byte[] data) {
}
return value;
}
-
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
index a66aaa09685..f27bad3ba74 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
@@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
-import java.util.Map;
-
-public class ShortSerializer implements Serializer<Short> {
-
- public void configure(Map<String, ?> configs, boolean isKey) {
- // nothing to do
- }
-
+public class ShortSerializer extends NoConfSerializer<Short> {
public byte[] serialize(String topic, Short data) {
if (data == null)
return null;
@@ -33,8 +26,4 @@ public void configure(Map<String, ?> configs, boolean isKey) {
data.byteValue()
};
}
-
- public void close() {
- // nothing to do
- }
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
index c647c9b7332..3d2a7eb8ead 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
@@ -25,7 +25,7 @@
* String encoding defaults to UTF8 and can be customized by setting the
property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take
precedence over the last.
*/
-public class StringDeserializer implements Deserializer<String> {
+public class StringDeserializer extends NoConfDeserializer<String> {
private String encoding = "UTF8";
@Override
@@ -49,9 +49,4 @@ public String deserialize(String topic, byte[] data) {
throw new SerializationException("Error when deserializing byte[]
to string due to unsupported encoding " + encoding);
}
}
-
- @Override
- public void close() {
- // nothing to do
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
index c2862ddcb4e..7e50c1a5e9c 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
@@ -25,7 +25,7 @@
* String encoding defaults to UTF8 and can be customized by setting the
property key.serializer.encoding,
* value.serializer.encoding or serializer.encoding. The first two take
precedence over the last.
*/
-public class StringSerializer implements Serializer<String> {
+public class StringSerializer extends NoConfSerializer<String> {
private String encoding = "UTF8";
@Override
@@ -49,9 +49,4 @@ public void configure(Map<String, ?> configs, boolean isKey) {
throw new SerializationException("Error when serializing string to
byte[] due to unsupported encoding " + encoding);
}
}
-
- @Override
- public void close() {
- // nothing to do
- }
}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
index 99551f718a9..22c1ac7417b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
@@ -18,13 +18,12 @@
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
-import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.NoConfDeserializer;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-public class MockDeserializer implements ClusterResourceListener,
Deserializer<byte[]> {
+public class MockDeserializer extends NoConfDeserializer<byte[]> implements
ClusterResourceListener {
public static AtomicInteger initCount = new AtomicInteger(0);
public static AtomicInteger closeCount = new AtomicInteger(0);
public static AtomicReference<ClusterResource> clusterMeta = new
AtomicReference<>();
@@ -42,10 +41,6 @@ public MockDeserializer() {
initCount.incrementAndGet();
}
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- }
-
@Override
public byte[] deserialize(String topic, byte[] data) {
// This will ensure that we get the cluster metadata when deserialize
is called for the first time
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
index 0c597c8b72a..79dae984150 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
@@ -18,13 +18,12 @@
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.ClusterResource;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.NoConfSerializer;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-public class MockSerializer implements ClusterResourceListener,
Serializer<byte[]> {
+public class MockSerializer extends NoConfSerializer<byte[]> implements
ClusterResourceListener {
public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
public static final AtomicReference<ClusterResource> CLUSTER_META = new
AtomicReference<>();
@@ -35,10 +34,6 @@ public MockSerializer() {
INIT_COUNT.incrementAndGet();
}
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- }
-
@Override
public byte[] serialize(String topic, byte[] data) {
// This will ensure that we get the cluster metadata when serialize is
called for the first time
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
index 8f2171bc4bc..4a003c2e464 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
@@ -19,15 +19,13 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.util.Map;
+import org.apache.kafka.common.serialization.NoConfDeserializer;
/**
* JSON deserializer for Jackson's JsonNode tree model. Using the tree model
allows it to work with arbitrarily
* structured data without having associated Java classes. This deserializer
also supports Connect schemas.
*/
-public class JsonDeserializer implements Deserializer<JsonNode> {
+public class JsonDeserializer extends NoConfDeserializer<JsonNode> {
private ObjectMapper objectMapper = new ObjectMapper();
/**
@@ -36,10 +34,6 @@
public JsonDeserializer() {
}
- @Override
- public void configure(Map<String, ?> props, boolean isKey) {
- }
-
@Override
public JsonNode deserialize(String topic, byte[] bytes) {
if (bytes == null)
@@ -54,9 +48,4 @@ public JsonNode deserialize(String topic, byte[] bytes) {
return data;
}
-
- @Override
- public void close() {
-
- }
}
diff --git
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
index 438daa17e6b..c0416e33108 100644
---
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
+++
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
@@ -19,15 +19,13 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.util.Map;
+import org.apache.kafka.common.serialization.NoConfSerializer;
/**
* Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree
model allows handling arbitrarily
* structured data without corresponding Java classes. This serializer also
supports Connect schemas.
*/
-public class JsonSerializer implements Serializer<JsonNode> {
+public class JsonSerializer extends NoConfSerializer<JsonNode> {
private final ObjectMapper objectMapper = new ObjectMapper();
/**
@@ -37,10 +35,6 @@ public JsonSerializer() {
}
- @Override
- public void configure(Map<String, ?> config, boolean isKey) {
- }
-
@Override
public byte[] serialize(String topic, JsonNode data) {
if (data == null)
@@ -53,8 +47,4 @@ public void configure(Map<String, ?> config, boolean isKey) {
}
}
- @Override
- public void close() {
- }
-
}
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 2d672b6487c..10ea6943e21 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig,
KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.NoConfDeserializer
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Before, Test}
@@ -399,11 +399,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends
ZooKeeperTestHarness
createServer(fromProps(config))
}
- private class StubDeserializer extends Deserializer[Array[Byte]] {
- override def configure(configs: java.util.Map[String, _], isKey: Boolean):
Unit = {}
-
+ private class StubDeserializer extends NoConfDeserializer[Array[Byte]] {
override def deserialize(topic: String, data: Array[Byte]): Array[Byte] =
{ data }
-
- override def close(): Unit = {}
}
}
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
index b2ef6dcad19..aaa4650f3b0 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
@@ -18,11 +18,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.NoConfDeserializer;
import java.util.Map;
-public class JsonPOJODeserializer<T> implements Deserializer<T> {
+public class JsonPOJODeserializer<T> extends NoConfDeserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
@@ -53,9 +53,4 @@ public T deserialize(String topic, byte[] bytes) {
return data;
}
-
- @Override
- public void close() {
-
- }
}
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
index 625bda9e1c7..283dae22d2e 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
@@ -19,11 +19,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.NoConfSerializer;
-import java.util.Map;
-
-public class JsonPOJOSerializer<T> implements Serializer<T> {
+public class JsonPOJOSerializer<T> extends NoConfSerializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
/**
@@ -32,10 +30,6 @@
public JsonPOJOSerializer() {
}
- @Override
- public void configure(Map<String, ?> props, boolean isKey) {
- }
-
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
@@ -48,8 +42,4 @@ public void configure(Map<String, ?> props, boolean isKey) {
}
}
- @Override
- public void close() {
- }
-
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 1363a0bc923..f3144c8d83e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -19,13 +19,13 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.ExtendedDeserializer;
+import org.apache.kafka.common.serialization.ExtendedNoConfDeserializer;
import java.nio.ByteBuffer;
-import java.util.Map;
import static
org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
-public class ChangedDeserializer<T> implements ExtendedDeserializer<Change<T>>
{
+public class ChangedDeserializer<T> extends
ExtendedNoConfDeserializer<Change<T>> {
private static final int NEWFLAG_SIZE = 1;
@@ -43,11 +43,6 @@ public void setInner(Deserializer<T> inner) {
this.inner = ensureExtended(inner);
}
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // do nothing
- }
-
@Override
public Change<T> deserialize(String topic, Headers headers, byte[] data) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index fa261cb7e26..0ce594e9bf1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -17,16 +17,16 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.ExtendedNoConfSerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import java.nio.ByteBuffer;
-import java.util.Map;
import static
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
-public class ChangedSerializer<T> implements ExtendedSerializer<Change<T>> {
+public class ChangedSerializer<T> extends ExtendedNoConfSerializer<Change<T>> {
private static final int NEWFLAG_SIZE = 1;
@@ -44,11 +44,6 @@ public void setInner(Serializer<T> inner) {
this.inner = ensureExtended(inner);
}
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // do nothing
- }
-
/**
* @throws StreamsException if both old and new values of data are null,
or if
* both values are not null
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 2910561eead..ae591773725 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.NoConfDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
@@ -85,17 +86,13 @@ public void close() {
}
}
- private class SessionKeyDeserializer implements Deserializer<Windowed<K>> {
+ private class SessionKeyDeserializer extends
NoConfDeserializer<Windowed<K>> {
private final Deserializer<K> deserializer;
SessionKeyDeserializer(final Deserializer<K> deserializer) {
this.deserializer = deserializer;
}
- @Override
- public void configure(final Map<String, ?> configs, final boolean
isKey) {
- }
-
@Override
public Windowed<K> deserialize(final String topic, final byte[] data) {
if (data == null || data.length == 0) {
@@ -103,12 +100,6 @@ public void configure(final Map<String, ?> configs, final
boolean isKey) {
}
return from(data, deserializer, topic);
}
-
-
- @Override
- public void close() {
-
- }
}
public static long extractEnd(final byte[] binaryKey) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index d98fd7face7..69fc93ab12d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -24,6 +24,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.NoConfDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
@@ -233,7 +234,7 @@ public void close() {
}
// Note: these are also in the streams example package, eventuall use 1
file
- private class JsonPOJODeserializer<T> implements Deserializer<T> {
+ private class JsonPOJODeserializer<T> extends NoConfDeserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
@@ -264,11 +265,6 @@ public T deserialize(String topic, byte[] bytes) {
return data;
}
-
- @Override
- public void close() {
-
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 4c7c3a6fa14..44695921784 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -18,12 +18,11 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.serialization.ExtendedDeserializer;
+import org.apache.kafka.common.serialization.ExtendedNoConfDeserializer;
import org.apache.kafka.test.MockSourceNode;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
-import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -45,21 +44,15 @@ public void
shouldProvideTopicHeadersAndDataToValueDeserializer() {
assertThat(deserializedValue, is("topic" + headers + "data"));
}
- public static class TheExtendedDeserializer implements
ExtendedDeserializer<String> {
+ public static class TheExtendedDeserializer extends
ExtendedNoConfDeserializer<String> {
@Override
public String deserialize(final String topic, final Headers headers,
final byte[] data) {
return topic + headers + new String(data, StandardCharsets.UTF_8);
}
- @Override
- public void configure(final Map<String, ?> configs, final boolean
isKey) { }
-
@Override
public String deserialize(final String topic, final byte[] data) {
return deserialize(topic, null, data);
}
-
- @Override
- public void close() { }
}
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Introduce new serdes interfaces with empty configure() and close()
> ------------------------------------------------------------------
>
> Key: KAFKA-6161
> URL: https://issues.apache.org/jira/browse/KAFKA-6161
> Project: Kafka
> Issue Type: Improvement
> Components: clients, streams
> Reporter: Evgeny Veretennikov
> Assignee: Evgeny Veretennikov
> Priority: Major
>
> {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods
> {{configure()}} and {{close()}}. Pretty often one want to leave these methods
> empty. For example, a lot of serializers inside
> {{org.apache.kafka.common.serialization}} package have these methods empty:
> {code}
> @Override
> public void configure(Map<String, ?> configs, boolean isKey) {
> // nothing to do
> }
> @Override
> public void close() {
> // nothing to do
> }
> {code}
> To avoid such boilerplate, we may create new interfaces (like
> {{UnconfiguredSerializer}}), in which we will define these methods empty.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)