+1 I think is is a very valuable new additional and we should try and not get 
stuck on trying to design the perfect solution for everything

> On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> +1 to renaming it as State Processing API and adding it under the
> flink-libraries module.
> 
> I also think we can start with the development of the feature. From the
> feedback so far, it seems like we're in a good spot to add in at least the
> initial version of this API, hopefully making it ready for 1.9.0.
> 
> Cheers,
> Gordon
> 
> On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com> wrote:
> 
>> It seems like a recurring piece of feedback was a different name. I’d like
>> to propose moving the functionality to the libraries module and naming this
>> the State Processing API.
>> 
>> Seth
>> 
>>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> wrote:
>>> 
>>> The SavepointOutputFormat only writes out the savepoint metadata file
>> and should be mostly ignored.
>>> 
>>> The actual state is written out by stream operators and tied into the
>> flink runtime[1, 2, 3].
>>> 
>>> This is the most important part and the piece that I don’t think can be
>> reasonably extracted.
>>> 
>>> Seth
>>> 
>>> [1]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
>>> 
>>> [2]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
>>> 
>>> [3]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
>>> 
>>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
>>>> 
>>>> Hi Seth,
>>>> 
>>>> yes, that helped! :-)
>>>> 
>>>> What I was looking for is essentially
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
>> would be great if this could be written in a way, that would enable reusing
>> it in different engine (as I mentioned - Apache Spark). There seem to be
>> some issues though. It uses interface Savepoint, which uses several other
>> objects and interfaces from Flink's runtime. Maybe some convenience API
>> might help - Apache Beam, handles operator naming, so that definitely
>> should be transitionable between systems, but I'm not sure, how to
>> construct OperatorID from this name. Would you think, that it is possible
>> to come up with something that could be used in this portable way?
>>>> 
>>>> I understand, there are some more conditions, that need to be satisfied
>> (grouping, aggregating, ...), which would of course have to be handled by
>> the target system. But Apache Beam can help leverage that. My idea would
>> be, that there can be runner specified PTransform, that takes PCollection
>> of some tuples of `(operator name, key, state name, value1), (operator
>> name, key, state name, value2)`, and Runner's responsibility would be to
>> group/aggregate this so that it can be written by runner's provided writer
>> (output format).
>>>> 
>>>> All of this would need a lot more design, these are just ideas of "what
>> could be possible", I was just wondering if this FLIP can make some first
>> steps towards this.
>>>> 
>>>> Many thanks for comments,
>>>> 
>>>> Jan
>>>> 
>>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
>>>>> @Jan Gotcha,
>>>>> 
>>>>> So in reusing components it explicitly is not a writer. This is not a
>> savepoint output format in the way we have a parquet output format. The
>> reason for the Transform api is to hide the underlying details, it does not
>> simply append a output writer to the end of a dataset. This gets into the
>> implementation details but at a high level, the dataset is:
>>>>> 
>>>>> 1) partitioned using key groups
>>>>> 2) data is run through a standard stream operator that takes a
>> snapshot of its state after processing all records and outputs metadata
>> handles for each subtask
>>>>> 3) those metadata handles are aggregated down to a single savepoint
>> handle
>>>>> 4) that handle is written out as a final metadata file
>>>>> 
>>>>> What’s important here is that the api actually depends on the data
>> flow collection and state is written out as a side effect of taking a
>> savepoint. The FLIP describes a lose coupling to the dataset api for
>> eventual migration to BoundedStream, that is true. However, the api does
>> require knowing what concrete data flow is being used to perform these
>> re-partitionings  and post aggregations.
>>>>> 
>>>>> I’m linking to my prototype implementation, particularly what actually
>> happens when you call write and run the transformations. Let me know if
>> that helps clarify.
>>>>> 
>>>>> Seth
>>>>> 
>>>>> 
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
>>>>> 
>>>>> 
>>>>> 
>>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>> 
>>>>>> Hi Seth,
>>>>>> 
>>>>>> that sounds reasonable. What I was asking for was not to reverse
>> engineer binary format, but to make the savepoint write API a little more
>> reusable, so that it could be wrapped into some other technology. I don't
>> know the details enough to propose a solution, but it seems to me, that it
>> could be possible to use something like Writer instead of Transform. Or
>> maybe the Transform can use the Writer internally, the goal is just to
>> enable to create the savepoint from "'outside" of Flink (with some library,
>> of course).
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
>>>>>>> @Konstantin agreed, that was a large impotence for this feature.
>> Also I am happy to change the name to something that better  describes the
>> feature set.
>>>>>>> 
>>>>>>> @Lan
>>>>>>> 
>>>>>>> Savepoints depend heavily on a number of flink internal components
>> that may change between versions: state backends internals, type
>> serializers, the specific hash function used to turn a UID into an
>> OperatorID, etc. I consider it a feature of this proposal that the library
>> depends on those internal components instead of reverse engineering the
>> binary format. This way as those internals change, or new state features
>> are added (think the recent addition of TTL) we will get support
>> automatically. I do not believe anything else is maintainable.
>>>>>>> 
>>>>>>> Seth
>>>>>>> 
>>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> this is awesome, and really useful feature. If I might ask for one
>> thing to consider - would it be possible to make the Savepoint manipulation
>> API (at least writing the Savepoint) less dependent on other parts of Flink
>> internals (e.g. |KeyedStateBootstrapFunction|) and provide something more
>> general (e.g. some generic Writer)? Why I'm asking for that - I can totally
>> imagine situation, where users might want to create bootstrapped state by
>> some other runner (e.g. Apache Spark), and then run Apache Flink after the
>> state has been created. This makes even more sense in context of Apache
>> Beam, which provides all the necessary work to make this happen. The
>> question is - would it be possible to design this feature so that writing
>> the savepoint from different runner would be possible?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> 
>>>>>>>> Jan
>>>>>>>> 
>>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
>>>>>>>>> Hey Everyone!
>>>>>>>>> ​
>>>>>>>>> Gordon and I have been discussing adding a savepoint connector to
>> flink for reading, writing and modifying savepoints.
>>>>>>>>> ​
>>>>>>>>> This is useful for:
>>>>>>>>> ​
>>>>>>>>>   Analyzing state for interesting patterns
>>>>>>>>>   Troubleshooting or auditing jobs by checking for discrepancies
>> in state
>>>>>>>>>   Bootstrapping state for new applications
>>>>>>>>>   Modifying savepoints such as:
>>>>>>>>>       Changing max parallelism
>>>>>>>>>       Making breaking schema changes
>>>>>>>>>       Correcting invalid state
>>>>>>>>> ​
>>>>>>>>> We are looking forward to your feedback!
>>>>>>>>> ​
>>>>>>>>> This is the FLIP:
>>>>>>>>> ​
>>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
>>>>>>>>> 
>>>>>>>>> Seth
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>> 

Reply via email to