Please open a JIRA then! On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <hemant9...@gmail.com> wrote:
> I see. > > monotonically_increasing_id on streaming dataFrames will be really helpful > to me and I believe to many more users. Adding this functionality in Spark > would be efficient in terms of performance as compared to implementing this > functionality inside the applications. > > Hemant > > On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <mich...@databricks.com > > wrote: > >> The basic tenet of structured streaming is that a query should return the >> same answer in streaming or batch mode. We support sorting in complete mode >> because we have all the data and can sort it correctly and return the full >> answer. In update or append mode, sorting would only return a correct >> answer if we could promise that records that sort lower are going to arrive >> later (and we can't). Therefore, it is disallowed. >> >> If you are just looking for a unique, stable id and you are already using >> kafka as the source, you could just combine the partition id and the >> offset. The structured streaming connector to Kafka >> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html> >> exposes both of these in the schema of the streaming DataFrame. (similarly >> for kinesis you can use the shard id and sequence number) >> >> If you need the IDs to be contiguous, then this is a somewhat >> fundamentally hard problem. I think the best we could do is add support >> for monotonically_increasing_id() in streaming dataframes. >> >> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <chaya...@gmail.com> >> wrote: >> >>> Perhaps your use case fits to Apache Kafka better. >>> >>> More info at: >>> https://kafka.apache.org/documentation/streams/ >>> >>> Everything really comes down to the architecture design and algorithm >>> spec. However, from my experience with Spark, there are many good reasons >>> why this requirement is not supported ;) >>> >>> Best, >>> >>> Chayapan (A) >>> >>> >>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <hemant9...@gmail.com> >>> wrote: >>> >>> Thanks Chris. There are many ways in which I can solve this problem but >>> they are cumbersome. The easiest way would have been to sort the streaming >>> dataframe. The reason I asked this question is because I could not find a >>> reason why sorting on streaming dataframe is disallowed. >>> >>> Hemant >>> >>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris < >>> chris.bow...@microfocus.com> wrote: >>> >>>> You can happily sort the underlying RDD of InternalRow(s) inside a >>>> sink, assuming you are willing to implement and maintain your own sink(s). >>>> That is, just grabbing the parquet sink, etc. isn’t going to work out of >>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient >>>> and requires less working knowledge to make effective reuse of internals. >>>> Just group by foo and then sort accordingly and assign ids. The id counter >>>> can be stateful per group. Sometimes this problem may not need to be solved >>>> at all. For example, if you are using kafka, a proper partitioning scheme >>>> and message offsets may be “good enough”. >>>> ------------------------------ >>>> *From:* Hemant Bhanawat <hemant9...@gmail.com> >>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM >>>> *To:* Reynold Xin >>>> *Cc:* dev >>>> *Subject:* Re: Sorting on a streaming dataframe >>>> >>>> Well, we want to assign snapshot ids (incrementing counters) to the >>>> incoming records. For that, we are zipping the streaming rdds with that >>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the >>>> records in the streaming dataframe gets counters in random order but the >>>> counter should always be incrementing. >>>> >>>> This is working fine until we have a failure. When we have a failure, >>>> we re-assign the records to snapshot ids and this time same snapshot id >>>> can get assigned to a different record. This is a problem because the >>>> primary key in our storage engine is <recordid, snapshotid>. So we want to >>>> sort the dataframe so that the records always get the same snapshot id. >>>> >>>> >>>> >>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <r...@databricks.com> >>>> wrote: >>>> >>>> Can you describe your use case more? >>>> >>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9...@gmail.com> >>>> wrote: >>>> >>>> Hi Guys, >>>> >>>> Why is sorting on streaming dataframes not supported(unless it is >>>> complete mode)? My downstream needs me to sort the streaming dataframe. >>>> >>>> Hemant >>>> >>>> >>>> >>> >>> >> >