[
https://issues.apache.org/jira/browse/FLINK-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14958538#comment-14958538
]
ASF GitHub Bot commented on FLINK-2808:
---------------------------------------
Github user gyfora commented on a diff in the pull request:
https://github.com/apache/flink/pull/1239#discussion_r42098258
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/AbstractHeapKvState.java
---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for key/value state implementations that are backed by a
regular heap hash map. The
+ * concrete implementations define how the state is checkpointed.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ * @param <Backend> The type of the backend that snapshots this key/value
state.
+ */
+public abstract class AbstractHeapKvState<K, V, Backend extends
StateBackend<Backend>> implements KvState<K, V, Backend> {
+
+ /** Map containing the actual key/value pairs */
+ private final HashMap<K, V> state;
+
+ /** The serializer for the keys */
+ private final TypeSerializer<K> keySerializer;
+
+ /** The serializer for the values */
+ private final TypeSerializer<V> valueSerializer;
+
+ /** The value that is returned when no other value has been associated
with a key, yet */
+ private final V defaultValue;
+
+ /** The current key, which the next value methods will refer to */
+ private K currentKey;
+
+ /**
+ * Creates a new empty key/value state.
+ *
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the values.
+ * @param defaultValue The value that is returned when no other value
has been associated with a key, yet.
+ */
+ protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+
TypeSerializer<V> valueSerializer,
+ V
defaultValue) {
+ this(keySerializer, valueSerializer, defaultValue, new
HashMap<K, V>());
+ }
+
+ /**
+ * Creates a new key/value state for the given hash map of key/value
pairs.
+ *
+ * @param keySerializer The serializer for the keys.
+ * @param valueSerializer The serializer for the values.
+ * @param defaultValue The value that is returned when no other value
has been associated with a key, yet.
+ * @param state The state map to use in this kev/value state. May
contain initial state.
+ */
+ protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+
TypeSerializer<V> valueSerializer,
+ V
defaultValue,
+
HashMap<K, V> state) {
+ this.state = requireNonNull(state);
+ this.keySerializer = requireNonNull(keySerializer);
+ this.valueSerializer = requireNonNull(valueSerializer);
+ this.defaultValue = defaultValue;
+ }
+
+ //
------------------------------------------------------------------------
+
+ @Override
+ public V value() {
+ V value = state.get(currentKey);
+ return value != null ? value : defaultValue;
--- End diff --
I think you should make a copy of the default value here. Otherwise you end
up with the same objects for non primitive types.
> Rework / Extend the StatehandleProvider
> ---------------------------------------
>
> Key: FLINK-2808
> URL: https://issues.apache.org/jira/browse/FLINK-2808
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> I would like to make some changes (mostly additions) to the
> {{StateHandleProvider}}. Ideally for the upcoming release, as it is somewhat
> part of the public API.
> The rational behind this is to handle in a nice and extensible way the
> creation of key/value state backed by various implementations (FS,
> distributed KV store, local KV store with FS backup, ...) and various
> checkpointing ways (full dump, append, incremental keys, ...)
> The changes would concretely be:
> 1. There should be a default {{StateHandleProvider}} set on the execution
> environment. Functions can later specify the {{StateHandleProvider}} when
> grabbing the {{StreamOperatorState}} from the runtime context (plus
> optionally a {{Checkpointer}})
> 2. The {{StreamOperatorState}} is created from the {{StateHandleProvider}}.
> That way, a KeyValueStore state backend can create a {{StreamOperatorState}}
> that directly updates data in the KV store on every access, if that is
> desired (and filter accesses by timestamps to only show committed data)
> 3. The StateHandleProvider should have methods to get an output stream that
> writes to the state checkpoint directly (and returns a StateHandle upon
> closing). That way we can convert and dump large state into the checkpoint
> without crating a full copy in memory before.
> Lastly, I would like to change some names
> - {{StateHandleProvider}} to either {{StateBackend}}, {{StateStore}}, or
> {{StateProvider}} (simpler name).
> - {{StreamOperatorState}} to either {{State}} or {{KVState}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)