Re: [Spark Streaming] How to clear old data from Stream State?

2015-11-25 Thread Ted Yu
trackStateByKey API is in branch-1.6

FYI

On Wed, Nov 25, 2015 at 6:03 AM, Todd Nist  wrote:

> Perhaps the new trackStateByKey targeted for very 1.6 may help you here.
> I'm not sure if it is part of 1.6 or not for sure as the jira does not
> specify a fixed version.  The jira describing it is here:
> https://issues.apache.org/jira/browse/SPARK-2629, and the design doc that
> discusses the API changes is here:
>
>
> https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#
>
> Look for the timeout  function:
>
> /**
>
>   * Set the duration of inactivity (i.e. no new data) after which a state
>
>   * can be terminated by the system. After this idle period, the system
>
>   * will mark the idle state as being timed out, and call the tracking
>
>   * function with State[S].isTimingOut() = true.
>
>   */
>
>  def timeout(duration: Duration): this.type
>
> -Todd
>
> On Wed, Nov 25, 2015 at 8:00 AM, diplomatic Guru  > wrote:
>
>> Hello,
>>
>> I know how I could clear the old state depending on the input value. If
>> some condition matches to determine that the state is old then set the
>> return null, will invalidate the record. But this is only feasible if a new
>> record arrives that matches the old key. What if no new data arrives for
>> the old data, how could I make that invalid.
>>
>> e.g.
>>
>> A key/Value arrives like this
>>
>> Key 12-11-2015:10:00: Value:test,1,2,12-11-2015:10:00
>>
>> Above key will be updated to state.
>>
>> Every time there is a value for this '12-11-2015:10:00' key, it will be
>> aggregated and updated. If the job is running for 24/7, then this state
>> will be kept forever until we restart the job. But I could have a
>> validation within the updateStateByKey function to check and delete the
>> record if value[3]< SYSTIME-1. But this only effective if a new record
>> arrives that matches the 12-11-2015:10:00 in the later days. What if no new
>> values are received for this key:12-11-2015:10:00. I assume it will remain
>> in the state, am I correct? if so the how do I clear the state?
>>
>> Thank you.
>>
>>
>>
>


[Spark Streaming] How to clear old data from Stream State?

2015-11-25 Thread diplomatic Guru
Hello,

I know how I could clear the old state depending on the input value. If
some condition matches to determine that the state is old then set the
return null, will invalidate the record. But this is only feasible if a new
record arrives that matches the old key. What if no new data arrives for
the old data, how could I make that invalid.

e.g.

A key/Value arrives like this

Key 12-11-2015:10:00: Value:test,1,2,12-11-2015:10:00

Above key will be updated to state.

Every time there is a value for this '12-11-2015:10:00' key, it will be
aggregated and updated. If the job is running for 24/7, then this state
will be kept forever until we restart the job. But I could have a
validation within the updateStateByKey function to check and delete the
record if value[3]< SYSTIME-1. But this only effective if a new record
arrives that matches the 12-11-2015:10:00 in the later days. What if no new
values are received for this key:12-11-2015:10:00. I assume it will remain
in the state, am I correct? if so the how do I clear the state?

Thank you.


Re: [Spark Streaming] How to clear old data from Stream State?

2015-11-25 Thread Todd Nist
Perhaps the new trackStateByKey targeted for very 1.6 may help you here.
I'm not sure if it is part of 1.6 or not for sure as the jira does not
specify a fixed version.  The jira describing it is here:
https://issues.apache.org/jira/browse/SPARK-2629, and the design doc that
discusses the API changes is here:

https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#

Look for the timeout  function:

/**

  * Set the duration of inactivity (i.e. no new data) after which a state

  * can be terminated by the system. After this idle period, the system

  * will mark the idle state as being timed out, and call the tracking

  * function with State[S].isTimingOut() = true.

  */

 def timeout(duration: Duration): this.type

-Todd

On Wed, Nov 25, 2015 at 8:00 AM, diplomatic Guru 
wrote:

> Hello,
>
> I know how I could clear the old state depending on the input value. If
> some condition matches to determine that the state is old then set the
> return null, will invalidate the record. But this is only feasible if a new
> record arrives that matches the old key. What if no new data arrives for
> the old data, how could I make that invalid.
>
> e.g.
>
> A key/Value arrives like this
>
> Key 12-11-2015:10:00: Value:test,1,2,12-11-2015:10:00
>
> Above key will be updated to state.
>
> Every time there is a value for this '12-11-2015:10:00' key, it will be
> aggregated and updated. If the job is running for 24/7, then this state
> will be kept forever until we restart the job. But I could have a
> validation within the updateStateByKey function to check and delete the
> record if value[3]< SYSTIME-1. But this only effective if a new record
> arrives that matches the 12-11-2015:10:00 in the later days. What if no new
> values are received for this key:12-11-2015:10:00. I assume it will remain
> in the state, am I correct? if so the how do I clear the state?
>
> Thank you.
>
>
>