Github user gyfora commented on a diff in the pull request:
https://github.com/apache/flink/pull/1305#discussion_r45377370
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
* Closes the state backend, releasing all internal resources, but does
not delete any persistent
* checkpoint data.
- *
+ *
* @throws Exception Exceptions can be forwarded and will be logged by
the system
*/
public abstract void close() throws Exception;
-
+
//
------------------------------------------------------------------------
// key/value state
//
------------------------------------------------------------------------
/**
* Creates a key/value state backed by this state backend.
- *
+ *
+ * @param operatorId Unique id for the operator creating the state
+ * @param stateName Name of the created state
* @param keySerializer The serializer for the key.
* @param valueSerializer The serializer for the value.
* @param defaultValue The value that is returned when no other value
has been associated with a key, yet.
* @param <K> The type of the key.
* @param <V> The type of the value.
- *
+ *
* @return A new key/value state backed by this backend.
- *
+ *
* @throws Exception Exceptions may occur during initialization of the
state and should be forwarded.
*/
- public abstract <K, V> KvState<K, V, Backend> createKvState(
+ public abstract <K, V> KvState<K, V, Backend> createKvState(int
operatorId, String stateName,
--- End diff --
Let me first describe how sharding works than I will give a concrete
example.
Key-Value pairs are sharded by key not by the subtask. This means that each
parallel subtask maintains a connection to all the shards and partitions the
states before writing them to the appropriate shards according to the user
defined partitioner (in the backend config). This is much better than sharding
by subtask because we can later change the parallelism of the job without
affecting the state and also lets us defined a more elaborate sharding strategy
through the partitioner.
This means, when a kv state is created we create a table for that kvstate
in each shard. If we would do it according to your suggestion we would need to
create numShards number of tables for each parallel instance (total of p*ns)
for each kvstate. Furthermore this makes the fancy sharding useless because we
cannot change the job parallelism. So we need to make sure that parallel
subtasks of a given operator write to the same state tables (so we only have ns
number of tables regardless of the parallelism).
In order to do this we need something that uniqely identifies a given state
in the streaming program (and parallel instances should have the same id).
The information required to create such unique state id is an identifier
for the operator that has the state + the name of the state. (The information
obtained from the environment is not enough because chained operators have the
same environment, therefore if they have conflicting state names the id is not
unique). The only thing that identifies an operator in the logical streaming
program is the operator id assigned by the jobgraphbuilder (thats the whole
point of having it).
An example job with p=2 and numshards = 3:
chained map -> filter, both the mapper and filter has a state named
"count", and let's assume that mapper has opid 1 and filter 2.
In this case the mapper would create 3 db tables (1 on each shard) with the
same name kvstate_count_1_jobId. The filter would also create 3 tables with
names: kvstate_count_2_jobId
All mapper instances would write to all three database shards, and the same
goes for all the filters.
I hope you get what I am trying to say.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---