Does Kafka garbage collect this eventually in the case where you crash and 
start again  with a different transaction identifier?

Old transactions eventually time out. I believe there is a Kafka configuration setting and can be problematic with long checkpoints.

For Kafka, can you get the 2PC behavior like this: Upstream step: open a 
transaction, write a bunch of stuff to it (let Kafka do the buffering) and emit 
a transaction identifier. Downstream @RequiresStableInput step: close 
transaction. Again, I may be totally missing something, but I think that this 
has identical characteristics:

Nice, that should work. It offloads the buffering to Kafka similar to how the native Flink KafkaProducer works.

-Max

On 04.03.19 18:31, Kenneth Knowles wrote:


On Mon, Mar 4, 2019 at 9:18 AM Reuven Lax <[email protected] <mailto:[email protected]>> wrote:



    On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles <[email protected]
    <mailto:[email protected]>> wrote:



        On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels
        <[email protected] <mailto:[email protected]>> wrote:

             > If you randomly generate shard ids, buffer those until
            finalization, finalize a checkpoint so that you never need
            to re-run that generation, isn't the result stable from that
            point onwards?

            Yes, you're right :) For @RequiresStableInput we will always
            have to
            buffer and emit only after a finalized checkpoint.

            2PC is the better model for Flink, at least in the case of
            Kafka because
            it can offload the buffering to Kafka via its transactions.
            RequiresStableInput is a more general solution and it is
            feasible to
            support it in the Flink Runner. However, we have to make
            sure that
            checkpoints are taken frequently to avoid too much memory
pressure.

            It would be nice to also support 2PC in Beam, i.e. the
            Runner could
            choose to either buffer/materialize input or do a 2PC, but
            it would also
            break the purity of the existing model.


        Still digging in to details. I think the "generate random shard
        ids & buffer" is a tradition but more specific to BigQueryIO or
        FileIO styles. It doesn't have to be done that way if the target
        system has special support like Kafka does.

        For Kafka, can you get the 2PC behavior like this: Upstream
        step: open a transaction, write a bunch of stuff to it (let
        Kafka do the buffering) and emit a transaction identifier.
        Downstream @RequiresStableInput step: close transaction. Again,
        I may be totally missing something, but I think that this has
        identical characteristics:


    Does Kafka garbage collect this eventually in the case where you
    crash and start again  with a different transaction identifier?


I believe that is what I read on the page about Flink's Kafka 2PC, though I cannot find it any more. What would the alternative be for Kafka? You always have to be ready for a client that goes away.

Kenn


          - Kafka does the buffering
          - checkpoint finalization is the driver of latency
          - failure before checkpoint finalization means the old
        transaction sits around and times out eventually
          - failure after checkpoint finalization causes retry with the
        same transaction identifier

        Kenn


            On 01.03.19 19:42, Kenneth Knowles wrote:
             > I think I am fundamentally misunderstanding checkpointing
            in Flink.
             >
             > If you randomly generate shard ids, buffer those until
            finalization,
             > finalize a checkpoint so that you never need to re-run
            that generation,
             > isn't the result stable from that point onwards?
             >
             > Kenn
             >
             > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels
            <[email protected] <mailto:[email protected]>
             > <mailto:[email protected] <mailto:[email protected]>>> wrote:
             >
             >     Fully agree. I think we can improve the situation
            drastically. For
             >     KafkaIO EOS with Flink we need to make these two changes:
             >
             >     1) Introduce buffering while the checkpoint is being
            taken
             >     2) Replace the random shard id assignment with
            something deterministic
             >
             >     However, we won't be able to provide full
            compatibility with
             >     RequiresStableInput because Flink only guarantees
            stable input after a
             >     checkpoint. RequiresStableInput requires input at any
            point in time to
             >     be stable. IMHO the only way to achieve that is
            materializing output
             >     which Flink does not currently support.
             >
             >     KafkaIO does not need all the power of
            RequiresStableInput to achieve
             >     EOS with Flink, but for the general case I don't see
            a good solution at
             >     the moment.
             >
             >     -Max
             >
             >     On 01.03.19 16:45, Reuven Lax wrote:
             >      > Yeah, the person who was working on it originally
            stopped working on
             >      > Beam, and nobody else ever finished it. I think it
            is important to
             >      > finish though. Many of the existing Sinks are only
            fully correct for
             >      > Dataflow today, because they generate either
            Reshuffle or
             >     GroupByKey to
             >      > ensure input stability before outputting (in many
            cases this code
             >     was
             >      > inherited from before Beam existed). On Flink
            today, these sinks
             >     might
             >      > occasionally produce duplicate output in the case
            of failures.
             >      >
             >      > Reuven
             >      >
             >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels
            <[email protected] <mailto:[email protected]>
             >     <mailto:[email protected] <mailto:[email protected]>>
             >      > <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>> wrote:
             >      >
             >      >     Circling back to the RequiresStableInput
            annotation[1]. I've
             >     done some
             >      >     protoyping to see how this could be integrated
            into Flink. I'm
             >      >     currently
             >      >     writing a test based on RequiresStableInput.
             >      >
             >      >     I found out there are already checks in place
            at the Runners to
             >      >     throw in
             >      >     case transforms use RequiresStableInput and
            its not
             >     supported. However,
             >      >     not a single transform actually uses the
            annotation.
             >      >
             >      >     It seems that the effort stopped at some
            point? Would it make
             >     sense to
             >      >     start annotating KafkaExactlyOnceSink with
             >     @RequiresStableInput? We
             >      >     could then get rid of the whitelist.
             >      >
             >      >     -Max
             >      >
             >      >     [1]
             >      >
             >
            
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
             >      >
             >      >
             >      >
             >      >     On 01.03.19 14:28, Maximilian Michels wrote:
             >      >      > Just realized that transactions do not
            spawn multiple
             >     elements in
             >      >      > KafkaExactlyOnceSink. So the proposed
            solution to stop
             >     processing
             >      >      > elements while a snapshot is pending would
            work.
             >      >      >
             >      >      > It is certainly not optimal in terms of
            performance for
             >     Flink and
             >      >     poses
             >      >      > problems when checkpoints take long to
            complete, but it
             >     would be
             >      >      > worthwhile to implement this to make use of
            the EOS feature.
             >      >      >
             >      >      > Thanks,
             >      >      > Max
             >      >      >
             >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
             >      >      >> Thanks you for the prompt replies. It's
            great to see that
             >     there is
             >      >      >> good understanding of how EOS in Flink works.
             >      >      >>
             >      >      >>> This is exactly what RequiresStableInput
            is supposed to
             >     do. On the
             >      >      >>> Flink runner, this would be implemented
            by delaying
             >     processing
             >      >     until
             >      >      >>> the current checkpoint is done.
             >      >      >>
             >      >      >> I don't think that works because we have
            no control over
             >     the Kafka
             >      >      >> transactions. Imagine:
             >      >      >>
             >      >      >> 1) ExactlyOnceWriter writes records to
            Kafka and commits,
             >     then
             >      >     starts
             >      >      >> a new transaction.
             >      >      >> 2) Flink checkpoints, delaying the
            processing of
             >     elements, the
             >      >      >> checkpoint fails.
             >      >      >> 3) We restore from an old checkpoint and
            will start writing
             >      >     duplicate
             >      >      >> data to Kafka. The de-duplication that the
            sink performs
             >     does not
             >      >      >> help, especially because the random shards
            ids might be
             >     assigned
             >      >      >> differently.
             >      >      >>
             >      >      >> IMHO we have to have control over commit
            to be able to
             >     provide EOS.
             >      >      >>
             >      >      >>> When we discussed this in Aug 2017, the
            understanding
             >     was that 2
             >      >      >>> Phase commit utility in Flink used to
            implement Flink's
             >     Kafka EOS
             >      >      >>> could not be implemented in Beam's context.
             >      >      >>
             >      >      >> That's also my understanding, unless we
            change the interface.
             >      >      >>
             >      >      >>> I don't see how SDF solves this problem..
             >      >      >>
             >      >      >> SDF has a checkpoint method which the
            Runner can call,
             >     but I think
             >      >      >> that you are right, that the above problem
            would be the same.
             >      >      >>
             >      >      >>> Absolutely. I would love to support EOS
            in KakaIO for
             >     Flink. I
             >      >     think
             >      >      >>> that will help many future exactly-once
            sinks.. and address
             >      >      >>> fundamental incompatibility between Beam
            model and Flink's
             >      >     horizontal
             >      >      >>> checkpointing for such applications.
             >      >      >>
             >      >      >> Great :)
             >      >      >>
             >      >      >>> The FlinkRunner would need to insert the
            "wait until
             >     checkpoint
             >      >      >>> finalization" logic wherever it sees
            @RequiresStableInput,
             >      >     which is
             >      >      >>> already what it would have to do.
             >      >      >>
             >      >      >> I don't think that fixes the problem. See
            above example.
             >      >      >>
             >      >      >> Thanks,
             >      >      >> Max
             >      >      >>
             >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
             >      >      >>>
             >      >      >>>
             >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
             >     <[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>
             >      >     <mailto:[email protected]
            <mailto:[email protected]> <mailto:[email protected]
            <mailto:[email protected]>>>
             >      >      >>> <mailto:[email protected]
            <mailto:[email protected]> <mailto:[email protected]
            <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
             >      >      >>>
             >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM
            Kenneth Knowles
             >      >     <[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>
             >      >      >>>     <mailto:[email protected]
            <mailto:[email protected]> <mailto:[email protected]
            <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
             >      >      >>>
             >      >      >>>         I'm not sure what a hard fail is.
            I probably
             >     have a shallow
             >      >      >>>         understanding, but doesn't
            @RequiresStableInput work
             >      >     for 2PC?
             >      >      >>>         The preCommit() phase should
            establish the
             >     transaction and
             >      >      >>>         commit() is not called until
            after checkpoint
             >      >     finalization. Can
             >      >      >>>         you describe the way that it does
            not work a
             >     little bit
             >      >     more?
             >      >      >>>
             >      >      >>>
             >      >      >>>     - preCommit() is called before
            checkpoint. Kafka EOS in
             >      >     Flink starts
             >      >      >>>     the transaction before this and makes
            sure it
             >     flushes all
             >      >     records in
             >      >      >>>     preCommit(). So far good.
             >      >      >>>     - commit is called after checkpoint
            is persisted.
             >     Now, imagine
             >      >      >>>     commit() fails for some reason. There
            is no option
             >     to rerun
             >      >     the 1st
             >      >      >>>     phase to write the records again in a new
             >     transaction. This
             >      >     is a
             >      >      >>>     hard failure for the the job. In
            practice Flink might
             >      >     attempt to
             >      >      >>>     commit again (not sure how many
            times), which is
             >     likely to
             >      >     fail and
             >      >      >>>     eventually results in job failure.
             >      >      >>>
             >      >      >>>
             >      >      >>> In Apache Beam, the records could be
            stored in state,
             >     and can be
             >      >      >>> written inside commit() to work around
            this issue. It
             >     could have
             >      >      >>> scalability issues if checkpoints are not
            frequent
             >     enough in Flink
             >      >      >>> runner.
             >      >      >>>
             >      >      >>> Raghu.
             >      >      >>>
             >      >      >>>
             >      >      >>>
             >      >      >>>         Kenn
             >      >      >>>
             >      >      >>>         On Thu, Feb 28, 2019 at 1:25 PM
            Raghu Angadi
             >      >     <[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>
             >      >      >>>         <mailto:[email protected]
            <mailto:[email protected]>
             >     <mailto:[email protected] <mailto:[email protected]>>
            <mailto:[email protected] <mailto:[email protected]>
             >     <mailto:[email protected]
            <mailto:[email protected]>>>>> wrote:
             >      >      >>>
             >      >      >>>             On Thu, Feb 28, 2019 at 11:01
            AM Kenneth Knowles
             >      >      >>>             <[email protected]
            <mailto:[email protected]> <mailto:[email protected]
            <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>
             >      >     <mailto:[email protected]
            <mailto:[email protected]> <mailto:[email protected]
            <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
             >      >      >>>
             >      >      >>>                 I believe the way you
            would implement
             >     the logic
             >      >     behind
             >      >      >>>                 Flink's KafkaProducer
            would be to have
             >     two steps:
             >      >      >>>
             >      >      >>>                 1. Start transaction
             >      >      >>>                 2. @RequiresStableInput
            Close transaction
             >      >      >>>
             >      >      >>>
             >      >      >>>             I see.  What happens if
            closing the transaction
             >      >     fails in
             >      >      >>>             (2)? Flink's 2PC requires
            that commit() should
             >      >     never hard
             >      >      >>>             fail once preCommit()
            succeeds. I think that is
             >      >     cost of not
             >      >      >>>             having an extra shuffle. It
            is alright since
             >     this
             >      >     policy has
             >      >      >>>             worked well for Flink so far.
             >      >      >>>
             >      >      >>>             Overall, it will be great to have
             >     @RequiresStableInput
             >      >      >>>             support in Flink runner.
             >      >      >>>
             >      >      >>>             Raghu.
             >      >      >>>
             >      >      >>>                 The FlinkRunner would
            need to insert the
             >     "wait
             >      >     until
             >      >      >>>                 checkpoint finalization"
            logic wherever it
>      >      >>> sees @RequiresStableInput, which is
             >     already what it
             >      >      >>>                 would have to do.
             >      >      >>>
             >      >      >>>                 This matches the
            KafkaProducer's logic -
             >     delay
             >      >     closing
             >      >      >>>                 the transaction until
            checkpoint
             >     finalization. This
             >      >      >>>                 answers my main question,
            which is "is
             >      >      >>>                 @RequiresStableInput
            expressive enough
             >     to allow
             >      >      >>>                 Beam-on-Flink to have
            exactly once behavior
             >      >     with the
             >      >      >>>                 same performance
            characteristics as
             >     native Flink
             >      >      >>>                 checkpoint finalization?"
             >      >      >>>
             >      >      >>>                 Kenn
             >      >      >>>
             >      >      >>>                 [1]
            https://github.com/apache/beam/pull/7955
             >      >      >>>
             >      >      >>>                 On Thu, Feb 28, 2019 at
            10:43 AM Reuven Lax
             >      >      >>>                 <[email protected]
            <mailto:[email protected]>
             >     <mailto:[email protected] <mailto:[email protected]>>
            <mailto:[email protected] <mailto:[email protected]>
             >     <mailto:[email protected] <mailto:[email protected]>>>
             >      >     <mailto:[email protected]
            <mailto:[email protected]> <mailto:[email protected]
            <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
             >      >      >>>
             >      >      >>>
             >      >      >>>
             >      >      >>>                     On Thu, Feb 28, 2019
            at 10:41 AM
             >     Raghu Angadi
             >      >      >>>                     <[email protected]
            <mailto:[email protected]>
             >     <mailto:[email protected] <mailto:[email protected]>>
            <mailto:[email protected] <mailto:[email protected]>
             >     <mailto:[email protected] <mailto:[email protected]>>>
             >      >     <mailto:[email protected]
            <mailto:[email protected]> <mailto:[email protected]
            <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
             >      >      >>>
             >      >      >>>
             >      >      >>>                             Now why does
            the Flink
             >     Runner not
             >      >     support
             >      >      >>>                             KafkaIO EOS?
            Flink's native
             >      >      >>>                             KafkaProducer
            supports
             >     exactly-once. It
             >      >      >>>                             simply
            commits the pending
             >      >      >>>                             transaction
            once it has
             >     completed a
             >      >      >>> checkpoint.
             >      >      >>>
             >      >      >>>
             >      >      >>>
             >      >      >>>                         On Thu, Feb 28,
            2019 at 9:59 AM
             >     Maximilian
             >      >      >>>                         Michels
            <[email protected] <mailto:[email protected]>
             >     <mailto:[email protected] <mailto:[email protected]>>
             >      >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>
             >     <mailto:[email protected] <mailto:[email protected]>
            <mailto:[email protected] <mailto:[email protected]>>>>>
             >      >      >>>                         wrote:
             >      >      >>>
             >      >      >>>                             Hi,
             >      >      >>>
             >      >      >>>                             I came across
            KafkaIO's Runner
             >      >     whitelist [1]
             >      >      >>>                             for enabling
            exactly-once
             >      >      >>>                             semantics
            (EOS). I think it is
             >      >     questionable
             >      >      >>>                             to exclude
            Runners from
             >      >      >>>                             inside a
            transform, but I
             >     see that the
             >      >      >>>                             intention was
            to save users from
             >      >      >>>                             surprises.
             >      >      >>>
             >      >      >>>                             Now why does
            the Flink
             >     Runner not
             >      >     support
             >      >      >>>                             KafkaIO EOS?
            Flink's native
             >      >      >>>                             KafkaProducer
            supports
             >     exactly-once. It
             >      >      >>>                             simply
            commits the pending
             >      >      >>>                             transaction
            once it has
             >     completed a
             >      >      >>> checkpoint.
             >      >      >>>
             >      >      >>>
             >      >      >>>
             >      >      >>>                         When we discussed
            this in Aug
             >     2017, the
             >      >      >>>                         understanding was
            that 2 Phase
             >     commit
             >      >     utility in
             >      >      >>>                         Flink used to
            implement Flink's
             >     Kafka
             >      >     EOS could
             >      >      >>>                         not be
            implemented in Beam's
             >     context.
             >      >      >>>                         See this message
             >      >      >>>
>  <https://www.mail-archive.com/[email protected]/msg02664.html> in
             >      >      >>>                         that dev thread.
            Has anything
             >     changed
             >      >     in this
             >      >      >>>                         regard? The whole
            thread is
             >     relevant to
             >      >     this
             >      >      >>>                         topic and worth
            going through.
             >      >      >>>
             >      >      >>>                     I think that
            TwoPhaseCommit utility
             >     class
             >      >     wouldn't
             >      >      >>>                     work. The Flink
            runner would
             >     probably want to
             >      >      >>>                     directly use
            notifySnapshotComplete
             >     in order to
>      >      >>> implement @RequiresStableInput.
             >      >      >>>
             >      >      >>>
             >      >      >>>                             A checkpoint
            is realized by
             >     sending
             >      >     barriers
             >      >      >>>                             through all
            channels
             >      >      >>>                             starting from
            the source until
             >      >     reaching all
             >      >      >>>                             sinks. Every
            operator
             >      >      >>>                             persists its
            state once it has
             >      >     received a
             >      >      >>>                             barrier on
            all its input
             >      >      >>>                             channels, it
            then forwards
             >     it to the
             >      >      >>>                             downstream
            operators.
             >      >      >>>
             >      >      >>>                             The
            architecture of Beam's
>      >      >>> KafkaExactlyOnceSink is as
             >     follows[2]:
             >      >      >>>
             >      >      >>>                             Input ->
            AssignRandomShardIds ->
             >      >     GroupByKey
             >      >      >>>                             ->
            AssignSequenceIds ->
             >      >      >>>                             GroupByKey ->
            ExactlyOnceWriter
             >      >      >>>
             >      >      >>>                             As I
            understood, Spark or
             >     Dataflow
             >      >     use the
             >      >      >>>                             GroupByKey
            stages to persist
             >      >      >>>                             the input.
            That is not
             >     required in
             >      >     Flink to
             >      >      >>>                             be able to
            take a consistent
             >      >      >>>                             snapshot of
            the pipeline.
             >      >      >>>
             >      >      >>>                             Basically,
            for Flink we
             >     don't need
             >      >     any of
             >      >      >>>                             that magic
            that KafkaIO does.
             >      >      >>>                             What we would
            need to
             >     support EOS
             >      >     is a way
             >      >      >>>                             to tell the
            ExactlyOnceWriter
             >      >      >>>                             (a DoFn) to
            commit once a
             >      >     checkpoint has
             >      >      >>>                             completed.
             >      >      >>>
             >      >      >>>                             I know that
            the new version
             >     of SDF
             >      >     supports
             >      >      >>>                             checkpointing
            which should
             >      >      >>>                             solve this
            issue. But there is
             >      >     still a lot
             >      >      >>>                             of work to do
            to make this
             >      >      >>>                             reality.
             >      >      >>>
             >      >      >>>
             >      >      >>>                         I don't see how
            SDF solves this
             >      >     problem.. May be
             >      >      >>>                         pseudo code would
            make more
             >     clear.  But if
             >      >      >>>                         helps, that is great!
             >      >      >>>
             >      >      >>>                             So I think it
            would make
             >     sense to think
             >      >      >>>                             about a way
            to make KafkaIO's
             >      >      >>>                             EOS more
            accessible to Runners
             >      >     which support
             >      >      >>>                             a different
            way of
             >      >      >>>                             checkpointing.
             >      >      >>>
             >      >      >>>
             >      >      >>>                         Absolutely. I
            would love to
             >     support EOS in
             >      >      >>>                         KakaIO for Flink.
            I think that will
             >      >     help many
             >      >      >>>                         future
            exactly-once sinks.. and
             >     address
             >      >      >>>                         fundamental
            incompatibility between
             >      >     Beam model
             >      >      >>>                         and Flink's
            horizontal checkpointing
             >      >     for such
             >      >      >>>                         applications.
             >      >      >>>
             >      >      >>>                         Raghu.
             >      >      >>>
             >      >      >>>                             Cheers,
             >      >      >>>                             Max
             >      >      >>>
             >      >      >>>                             PS: I found
            this document about
>      >      >>> RequiresStableInput [3], but
             >     IMHO
             >      >      >>>                             defining an
            annotation only
             >      >     manifests the
             >      >      >>>                             conceptual
            difference between
             >      >      >>>                             the Runners.
             >      >      >>>
             >      >      >>>
             >      >      >>>                             [1]
             >      >      >>>
             >      >
             >
            
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
             >      >
             >      >      >>>
             >      >      >>>                             [2]
             >      >      >>>
             >      >
             >
            
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
             >      >
             >      >      >>>
             >      >      >>>                             [3]
             >      >      >>>
             >      >
             >
            
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
             >      >
             >      >      >>>
             >      >      >>>
             >      >
             >

Reply via email to