[
https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866032#comment-15866032
]
ASF GitHub Bot commented on FLINK-5790:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3305#discussion_r101070139
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+@SuppressWarnings("ForLoopReplaceableByForEach")
+public class ListSerializer<T> extends TypeSerializer<List<T>> {
+
+ private static final long serialVersionUID = 1119562170939152304L;
+
+ private final TypeSerializer<T> elementSerializer;
+
+ public ListSerializer(TypeSerializer<T> elementSerializer) {
+ this.elementSerializer = elementSerializer;
+ }
+
+ public TypeSerializer<T> getElementSerializer() {
+ return elementSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<List<T>> duplicate() {
+ TypeSerializer<T> duplicateElement =
elementSerializer.duplicate();
+ return duplicateElement == elementSerializer ? this : new
ListSerializer<T>(duplicateElement);
+ }
+
+ @Override
+ public List<T> createInstance() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> copy(List<T> from) {
+ List<T> newList = new ArrayList<>(from.size());
+ for (int i = 0; i < from.size(); i++) {
+ newList.add(elementSerializer.copy(from.get(i)));
+ }
+ return newList;
+ }
+
+ @Override
+ public List<T> copy(List<T> from, List<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1; // var length
+ }
+
+ @Override
+ public void serialize(List<T> list, DataOutputView target) throws
IOException {
+ final int size = list.size();
+ target.writeInt(size);
+ for (int i = 0; i < size; i++) {
+ elementSerializer.serialize(list.get(i), target);
+ }
+ }
+
+ @Override
+ public List<T> deserialize(DataInputView source) throws IOException {
+ final int size = source.readInt();
+ final List<T> list = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ list.add(elementSerializer.deserialize(source));
+ }
+ return list;
+ }
+
+ @Override
+ public List<T> deserialize(List<T> reuse, DataInputView source) throws
IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ // copy number of elements
+ final int num = source.readInt();
+ target.writeInt(num);
+ for (int i = 0; i < num; i++) {
+ elementSerializer.copy(source, target);
+ }
+ }
+
+ // --------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+
elementSerializer.equals(((ListSerializer<?>) obj).elementSerializer));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return true;
+ }
--- End diff --
This is not the contract of `canEqual`. Given that this class is not final
this method should define which objects can be equal to this type. So in this
case it would be `obj instanceOf ListSerializer`. Furthermore the check has to
be incorporated into the `equals` method where you check `((ListSerializer)
obj).canEqual(this))`. Otherwise the symmetry of equals is violated.
> Use list types when ListStateDescriptor extends StateDescriptor
> ---------------------------------------------------------------
>
> Key: FLINK-5790
> URL: https://issues.apache.org/jira/browse/FLINK-5790
> Project: Flink
> Issue Type: Improvement
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Flink keeps the state serializer in {{StateDescriptor}}, but it's the
> serializer of list elements that is put in {{ListStateDescriptor}}. The
> implementation is a little confusing. Some backends need to construct the
> state serializer with the element serializer by themselves.
> We should use an {{ArrayListSerializer}}, which is composed of the serializer
> of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid
> constructing the state serializer.
> If a backend needs customized serialization of the state (e.g.
> {{RocksDBStateBackend}}), it still can obtain the element serializer from the
> {{ArrayListSerializer}}.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)