[
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105212#comment-16105212
]
ASF GitHub Bot commented on FLINK-7206:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4355#discussion_r130086377
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer,
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+ * A serializer for [[HeapMapView]]. The serializer relies on a key
serializer and a value
+ * serializer for the serialization of the map's key-value pairs.
+ *
+ * <p>The serialization format for the map is as follows: four bytes for
the length of the map,
+ * followed by the serialized representation of each key-value pair. To
allow null values,
+ * each value is prefixed by a null marker.
+ *
+ * @param mapSerializer Map serializer.
+ * @tparam K The type of the keys in the map.
+ * @tparam V The type of the values in the map.
+ */
+@Internal
+class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V])
+ extends TypeSerializer[MapView[K, V]] {
+
+ override def isImmutableType: Boolean = mapSerializer.isImmutableType
+
+ override def duplicate(): TypeSerializer[MapView[K, V]] =
+ new MapViewSerializer[K, V](
+ mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+ override def createInstance(): MapView[K, V] = {
+ new HeapMapView[K, V](mapSerializer.createInstance())
+ }
+
+ override def copy(from: MapView[K, V]): MapView[K, V] = {
+ val map = from.asInstanceOf[HeapMapView[K, V]].map
+ new HeapMapView[K, V](mapSerializer.copy(map))
+ }
+
+ override def copy(from: MapView[K, V], reuse: MapView[K, V]): MapView[K,
V] = copy(from)
+
+ override def getLength: Int = -1 // var length
+
+ override def serialize(record: MapView[K, V], target: DataOutputView):
Unit = {
+ val map = record.asInstanceOf[HeapMapView[K, V]].map
+ mapSerializer.serialize(map, target)
+ }
+
+ override def deserialize(source: DataInputView): MapView[K, V] =
+ new HeapMapView[K, V](mapSerializer.deserialize(source))
+
+ override def deserialize(reuse: MapView[K, V], source: DataInputView):
MapView[K, V] =
+ deserialize(source)
+
+ override def copy(source: DataInputView, target: DataOutputView): Unit =
+ mapSerializer.copy(source, target)
+
+ override def canEqual(obj: Any): Boolean = obj != null && obj.getClass
== getClass
+
+ override def hashCode(): Int = mapSerializer.hashCode()
+
+ override def equals(obj: Any): Boolean = canEqual(this) &&
+ mapSerializer.equals(obj.asInstanceOf[MapSerializer[_, _]])
--- End diff --
should be `obj.asInstanceOf[MapViewSerializer[_, _]].map`
> Implementation of DataView to support state access for UDAGG
> ------------------------------------------------------------
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Kaibo Zhou
> Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)