This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6ec59929bc6c3afce301d3e5a7bedb9dd93ce36c Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sat Oct 10 00:38:31 2020 +0800 [hotfix][connector/common] Add a new util class to help with serde in the Source. --- .../connector/base/source/utils/SerdeUtils.java | 131 +++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java new file mode 100644 index 0000000..cd43ccf --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java @@ -0,0 +1,131 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.connector.base.source.utils; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * A util class with some helper method for serde in the sources. + */ +public class SerdeUtils { + + /** Private constructor for util class. */ + private SerdeUtils() { } + + /** + * Serialize a mapping from subtask ids to lists of assigned splits. + * The serialized format is following: + * <pre> + * 4 bytes - number of subtasks + * 4 bytes - split serializer version + * N bytes - [assignment_for_subtask] + * 4 bytes - subtask id + * 4 bytes - number of assigned splits + * N bytes - [assigned_splits] + * 4 bytes - serialized split length + * N bytes - serialized splits + * </pre> + * + * @param splitAssignments a mapping from subtask ids to lists of assigned splits. + * @param splitSerializer the serializer of the split. + * @param <SplitT> the type of the splits. + * @param <C> the type of the collection to hold the assigned splits for a subtask. + * @return the serialized bytes of the given subtask to splits assignment mapping. + * @throws IOException when serialization failed. + */ + public static <SplitT extends SourceSplit, C extends Collection<SplitT>> byte[] serializeSplitAssignments( + Map<Integer, C> splitAssignments, + SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException { + try ( + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + + out.writeInt(splitAssignments.size()); + // Split serializer version. + out.writeInt(splitSerializer.getVersion()); + // Write assignments for subtasks. + for (Map.Entry<Integer, C> entry : splitAssignments.entrySet()) { + // Subtask ID + int subtaskId = entry.getKey(); + Collection<SplitT> splitsForSubtask = entry.getValue(); + // Number of the splits. + out.writeInt(subtaskId); + out.writeInt(splitsForSubtask.size()); + for (SplitT split : splitsForSubtask) { + byte[] serializedSplit = splitSerializer.serialize(split); + out.writeInt(serializedSplit.length); + out.write(serializedSplit); + } + } + return baos.toByteArray(); + } + } + + /** + * Deserialize the given bytes returned by {@link #serializeSplitAssignments(Map, SimpleVersionedSerializer)}. + * + * @param serialized the serialized bytes returned by + * {@link #serializeSplitAssignments(Map, SimpleVersionedSerializer)}. + * @param splitSerializer the split serializer for the splits. + * @param collectionSupplier the supplier for the {@link Collection} instance to hold the assigned splits for a + * subtask. + * @param <SplitT> the type of the splits. + * @param <C> the type of the collection to hold the assigned splits for a subtask. + * @return A mapping from subtask id to its assigned splits. + * @throws IOException when deserialization failed. + */ + public static <SplitT extends SourceSplit, C extends Collection<SplitT>> Map<Integer, C> deserializeSplitAssignments( + byte[] serialized, + SimpleVersionedSerializer<SplitT> splitSerializer, + Function<Integer, C> collectionSupplier) throws IOException { + try ( + ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + + int numSubtasks = in.readInt(); + Map<Integer, C> splitsAssignments = new HashMap<>(numSubtasks); + int serializerVersion = in.readInt(); + for (int i = 0; i < numSubtasks; i++) { + int subtaskId = in.readInt(); + int numAssignedSplits = in.readInt(); + C assignedSplits = collectionSupplier.apply(numAssignedSplits); + for (int j = 0; j < numAssignedSplits; j++) { + int serializedSplitSize = in.readInt(); + byte[] serializedSplit = new byte[serializedSplitSize]; + in.readFully(serializedSplit); + SplitT split = splitSerializer.deserialize(serializerVersion, serializedSplit); + assignedSplits.add(split); + } + splitsAssignments.put(subtaskId, assignedSplits); + } + return splitsAssignments; + } + } +}
