[
https://issues.apache.org/jira/browse/SPARK-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen resolved SPARK-4392.
------------------------------
Resolution: Won't Fix
WontFix, per PR
> Event proration based on event timestamps
> -----------------------------------------
>
> Key: SPARK-4392
> URL: https://issues.apache.org/jira/browse/SPARK-4392
> Project: Spark
> Issue Type: New Feature
> Components: Streaming
> Reporter: Bijay Singh Bisht
>
> Topic: Spark Streaming using Event TimeStamps
> Spark streaming creates a batch (RDD) of events every T duration. The batch
> is based on a schedule and the timestamp associated with the batch is the
> time at which it was scheduled by Spark. Spark applied timestamp may be less
> relevant than the timestamp for which the event was originally meant to be.
> The fundamental reason for the event timestamp to differ from the spark stamp
> is the delay in event generation in the upstream system and delay in
> transporting the event to spark after the event generation. The problem is
> compounded in case of events having a start and an end with both the
> timestamps packed in a single event generated after the event ends as
> illustrated in the following diagram.
> (upstream) -------s--------e----g---------------->
> (spark ) ------------------------r------------->
> Horizontal axis is time. Event starts at s ends at e and the event record is
> generated at g, which is then received by spark at r.
> So there is a need to create batches which only contain the relevant events
> or the relevant proportion of the events according to the original timestamp
> passed to Spark as a part of the received tuples. Lets refer to a batch which
> has all the events occurring in the time window it represents as a bin. So a
> bin from T1 - T2 will 'only have events' which occurred in the period T1 -
> T2. The definition of the bin can be extended to include ‘all the events’
> which occurred in a given period, however second constraint is harder to
> satisfy in practice, because events can be arbitrarily delayed.
> For the rest of the discussion the definition of the batch and the bin shall
> be as per the previous paragraph.
> Bin sizes determine time series time granularity and is an independent
> consideration in itself i.e independent of the batch/event/delay.
> Lets say that batch size is T and the bin size is n*T and an event is delayed
> (for reception) at a maximum by d*T. So in order to generate a bin, n + d
> batches of size T are required.
> Conversely every batch is going to contribute to current up till the last
> ceiling((n + d)/ n) bins.
> For for batch @ t. The contents can be seen as T1 @ t (where the notation T1
> @ t implies events corresponding to bin T1 from batch t), T1 - 1 @t, T1 - 2
> @t ... T1 - m @ t (where T1 - 1, represents the bin previous to T1 and m =
> ceiling(n + d)/ n))).
> We can then de-multiplex the contributions from batch @ t into bins T1, T1 -
> 1, T1 - 2, T1 -3, resulting into streams which represent partial bins
> relative to the batch stream. So a stream i represents partial bin T1 - i
> received at t. This way the spark application can deliver incremental bins to
> the downstream in the most real time possible. Now depending on how the
> downstream application can handle the partial bins, the definition and the
> generation of the streams needs to be controlled.
> Cases:
> The downstream application can handle incremental updates to the bin (i.e. a
> partial bin = current partial bin + latest partial bin). For this what is
> required is m streams which send the updates every T interval where
> Stream 1: T1 @ t
> Stream 2: T1 - 1 @ t
> …
> Stream m: T1 - m @ t.
> The downstream application can only handle full updates to the bin ( i.e.
> partial bin = latest partial bin). For this what is required is m streams
> which send the updates every T interval where
> Stream 1: T1 @ t
> Stream 2: T1 - 1 @ t + @ t - 1
> ...
> Stream m: T1 - m @ t + … + @ t - m
> i.e a bin is getting updated at every T until the bin is final. The first
> stream represents the most current bin with the latest cumulative update. The
> next stream represents the previous bin with the latest cumulative update and
> so on. Until the last stream which represents a final bin.
> The downstream application cannot handle updates to a bin. This is basically
> the last stream from case 2 (highlighted in bold) with the exception that it
> slides by nT and not T. Note that the next bin after T1 @ t is T1 + 1 @ t +
> n*T, because the size of the bin is n*T.
> Typically each stream needs to treated similarly because it represents that
> same kind of content, however there can be use cases where the stream may be
> required to be treated differently. A consideration for the API.
> Implementation:
> In order to transform a batch stream to a partial bin stream, we can filter
> the events and put the prorated events in a bin streams representing T @ t,
> T-1 @ t and so on.
> For this we can define a new DStream which generates a DStream by prorating
> the data from batch to a bin corresponding to the stream.
> For the use case 2 which requires progressively accumulating all the events
> for a bin. A new DStream is required which generates a pulsating window which
> goes from (s*n + 1) to (s*n + n) where s is the partial stream index. A
> stream index 0 implies that it is the most current partial bin stream.
> APIs
> BinStreamerT
> This will return a BinStreamer object.
> The BinStreamer object can be used to generate incremental bin streams (case
> 1)/ final bin (case 3) stream/ updated bin streams (case 2) using the
> following APIs.
> BinStreamer.incrementalStreams(sizeInNumBatches: Int, delayIndex: Int,
> numStreams: Int) : Seq[BinStream[(T,percentage)]]
> BinStreamer.finalStream(sizeInNumBatches: Int, delayIndex: Int) :
> BinStream[(T,percentage)]
> BinStreamer.updatedStreams(sizeInNumBatches: Int, delayIndex: Int,
> numStreams: Int) : Seq[BinStream[(T,percentage)]]
> DStream[T] : This is the batch stream.
> start : Closure to get the start time from the record.
> end : Closure to get the end time from the record.
> sizeInNumBatches : The size of bin as a multiple of batch size.
> delayIndex : The maximum delay between the event relevance and event
> reception.
> numStreams: This is the number of bin streams. Even though it is fixed by
> batch size, bin size and the delayIndex. This is an optional parameter to
> control the number of output Streams and it does so by delaying the most
> current bin.
> Each BinStream will wrap a DStream.
> def prorate(binStart: Time, binEnd: Time)(x: T) = {
> val sx = startFunc(x)
> val ex = endFunc(x)
> // Even though binStart is not inclusive, binStart here implies limit x as
> x approaches binStart+
> val s = if (sx > binStart) sx else binStart
> val e = if (ex < binEnd) ex else binEnd
> if (ex == sx) {
> (x, 1.0)
> }
> else {
> (x, (e - s) / (ex - sx))
> }
> }
> def filter(binStart: Time, binEnd: Time)(x: T) = {
> // The flow is starting in the subsequent bin
> if (startFunc(x) > binEnd) false
> // The flow ended in the prior bin
> else if (endFunc(x) <= binStart) false
> // start(x) approaches from binEnd+
> else if (startFunc(x) == binEnd && endFunc(x) > binEnd) false
> else true
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]