Hi Seth,

yes, that helped! :-)

What I was looking for is essentially `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It would be great if this could be written in a way, that would enable reusing it in different engine (as I mentioned - Apache Spark). There seem to be some issues though. It uses interface Savepoint, which uses several other objects and interfaces from Flink's runtime. Maybe some convenience API might help - Apache Beam, handles operator naming, so that definitely should be transitionable between systems, but I'm not sure, how to construct OperatorID from this name. Would you think, that it is possible to come up with something that could be used in this portable way?

I understand, there are some more conditions, that need to be satisfied (grouping, aggregating, ...), which would of course have to be handled by the target system. But Apache Beam can help leverage that. My idea would be, that there can be runner specified PTransform, that takes PCollection of some tuples of `(operator name, key, state name, value1), (operator name, key, state name, value2)`, and Runner's responsibility would be to group/aggregate this so that it can be written by runner's provided writer (output format).

All of this would need a lot more design, these are just ideas of "what could be possible", I was just wondering if this FLIP can make some first steps towards this.

Many thanks for comments,

Jan

On 5/31/19 8:12 PM, Seth Wiesman wrote:
@Jan Gotcha,

So in reusing components it explicitly is not a writer. This is not a savepoint 
output format in the way we have a parquet output format. The reason for the 
Transform api is to hide the underlying details, it does not simply append a 
output writer to the end of a dataset. This gets into the implementation 
details but at a high level, the dataset is:

1) partitioned using key groups
2) data is run through a standard stream operator that takes a snapshot of its 
state after processing all records and outputs metadata handles for each subtask
3) those metadata handles are aggregated down to a single savepoint handle
4) that handle is written out as a final metadata file

What’s important here is that the api actually depends on the data flow 
collection and state is written out as a side effect of taking a savepoint. The 
FLIP describes a lose coupling to the dataset api for eventual migration to 
BoundedStream, that is true. However, the api does require knowing what 
concrete data flow is being used to perform these re-partitionings  and post 
aggregations.

I’m linking to my prototype implementation, particularly what actually happens 
when you call write and run the transformations. Let me know if that helps 
clarify.

Seth

https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63



On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:

Hi Seth,

that sounds reasonable. What I was asking for was not to reverse engineer binary format, 
but to make the savepoint write API a little more reusable, so that it could be wrapped 
into some other technology. I don't know the details enough to propose a solution, but it 
seems to me, that it could be possible to use something like Writer instead of Transform. 
Or maybe the Transform can use the Writer internally, the goal is just to enable to 
create the savepoint from "'outside" of Flink (with some library, of course).

Jan

On 5/31/19 1:17 PM, Seth Wiesman wrote:
@Konstantin agreed, that was a large impotence for this feature. Also I am 
happy to change the name to something that better  describes the feature set.

@Lan

Savepoints depend heavily on a number of flink internal components that may 
change between versions: state backends internals, type serializers, the 
specific hash function used to turn a UID into an OperatorID, etc. I consider 
it a feature of this proposal that the library depends on those internal 
components instead of reverse engineering the binary format. This way as those 
internals change, or new state features are added (think the recent addition of 
TTL) we will get support automatically. I do not believe anything else is 
maintainable.

Seth

On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:

Hi,

this is awesome, and really useful feature. If I might ask for one thing to 
consider - would it be possible to make the Savepoint manipulation API (at 
least writing the Savepoint) less dependent on other parts of Flink internals 
(e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. 
some generic Writer)? Why I'm asking for that - I can totally imagine 
situation, where users might want to create bootstrapped state by some other 
runner (e.g. Apache Spark), and then run Apache Flink after the state has been 
created. This makes even more sense in context of Apache Beam, which provides 
all the necessary work to make this happen. The question is - would it be 
possible to design this feature so that writing the savepoint from different 
runner would be possible?

Cheers,

  Jan

On 5/30/19 1:14 AM, Seth Wiesman wrote:
Hey Everyone!
​
Gordon and I have been discussing adding a savepoint connector to flink for 
reading, writing and modifying savepoints.
​
This is useful for:
​
     Analyzing state for interesting patterns
     Troubleshooting or auditing jobs by checking for discrepancies in state
     Bootstrapping state for new applications
     Modifying savepoints such as:
         Changing max parallelism
         Making breaking schema changes
         Correcting invalid state
​
We are looking forward to your feedback!
​
This is the FLIP:
​
https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector

Seth



Reply via email to