[ https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866032#comment-15866032 ]
ASF GitHub Bot commented on FLINK-5790: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3305#discussion_r101070139 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +@SuppressWarnings("ForLoopReplaceableByForEach") +public class ListSerializer<T> extends TypeSerializer<List<T>> { + + private static final long serialVersionUID = 1119562170939152304L; + + private final TypeSerializer<T> elementSerializer; + + public ListSerializer(TypeSerializer<T> elementSerializer) { + this.elementSerializer = elementSerializer; + } + + public TypeSerializer<T> getElementSerializer() { + return elementSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<List<T>> duplicate() { + TypeSerializer<T> duplicateElement = elementSerializer.duplicate(); + return duplicateElement == elementSerializer ? this : new ListSerializer<T>(duplicateElement); + } + + @Override + public List<T> createInstance() { + return new ArrayList<>(); + } + + @Override + public List<T> copy(List<T> from) { + List<T> newList = new ArrayList<>(from.size()); + for (int i = 0; i < from.size(); i++) { + newList.add(elementSerializer.copy(from.get(i))); + } + return newList; + } + + @Override + public List<T> copy(List<T> from, List<T> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; // var length + } + + @Override + public void serialize(List<T> list, DataOutputView target) throws IOException { + final int size = list.size(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elementSerializer.serialize(list.get(i), target); + } + } + + @Override + public List<T> deserialize(DataInputView source) throws IOException { + final int size = source.readInt(); + final List<T> list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + list.add(elementSerializer.deserialize(source)); + } + return list; + } + + @Override + public List<T> deserialize(List<T> reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + // copy number of elements + final int num = source.readInt(); + target.writeInt(num); + for (int i = 0; i < num; i++) { + elementSerializer.copy(source, target); + } + } + + // -------------------------------------------------------------------- + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + elementSerializer.equals(((ListSerializer<?>) obj).elementSerializer)); + } + + @Override + public boolean canEqual(Object obj) { + return true; + } --- End diff -- This is not the contract of `canEqual`. Given that this class is not final this method should define which objects can be equal to this type. So in this case it would be `obj instanceOf ListSerializer`. Furthermore the check has to be incorporated into the `equals` method where you check `((ListSerializer) obj).canEqual(this))`. Otherwise the symmetry of equals is violated. > Use list types when ListStateDescriptor extends StateDescriptor > --------------------------------------------------------------- > > Key: FLINK-5790 > URL: https://issues.apache.org/jira/browse/FLINK-5790 > Project: Flink > Issue Type: Improvement > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > Flink keeps the state serializer in {{StateDescriptor}}, but it's the > serializer of list elements that is put in {{ListStateDescriptor}}. The > implementation is a little confusing. Some backends need to construct the > state serializer with the element serializer by themselves. > We should use an {{ArrayListSerializer}}, which is composed of the serializer > of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid > constructing the state serializer. > If a backend needs customized serialization of the state (e.g. > {{RocksDBStateBackend}}), it still can obtain the element serializer from the > {{ArrayListSerializer}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)