I believe the docs are out of date regarding distinct. The behavior should be as follows:
- Distinct should be applied across triggers - In order to prevent the state from growing indefinitely, you need to add a watermark - If you don't have a watermark, but your key space is small, that's also fine - If a record arrives and is not in the state, it will be outputted - If a record arrives and is in the state, it will be ignored - Once the watermark passes for a key, it will be dropped from state - If a record arrives late, i.e. after the watermark, it will be ignored HTH! Burak On Mon, Mar 19, 2018 at 12:04 PM, Geoff Von Allmen <ge...@ibleducation.com> wrote: > I see in the documentation that the distinct operation is not supported > <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations> > in Structured Streaming. That being said, I have noticed that you are able > to successfully call distinct() on a data frame and it seems to perform > the desired operation and doesn’t fail with the AnalysisException as > expected. If I call it with a column name specified, then it will fail with > AnalysisException. > > I am using Structured Streaming to read from a Kafka stream and my > question (and concern) is that: > > - The distinct operation is properly applied across the *current* > batch as read from Kafka, however, the distinct operation would not > apply across batches. > > I have tried the following: > > - Started the streaming job to see my baseline data and left the job > streaming > - Created events in kafka that would increment my counts if distinct > was not performing as expected > - Results: > - Distinct still seems to be working over the entire data set even > as I add new data. > - As I add new data, I see spark process the data (I’m doing output > mode = update) but there are no new results indicating the distinct > function is in fact still working across batches as spark pulls in the > new > data from kafka. > > Does anyone know more about the intended behavior of distinct in > Structured Streaming? > > If this is working as intended, does this mean I could have a dataset that > is growing without bound being held in memory/disk or something to that > effect (so it has some way to make that distinct operation against previous > data)? > >