Why not just close the existing stream and start a stream if there is a redefine? Just trying to understand the difference in the redefinition case.
On Thu, Oct 26, 2017 at 7:13 AM, Brian Hulette <[email protected]> wrote: > My initial thinking was just appending to the dictionary, but it could be > useful to have the ability to redefine it as Wes suggested. > > Redefining does add some extra burden on stream consumers though since a > dictionary batch would no longer apply globally - consumers would have to > determine the appropriate dictionary batch(es) to apply to a given record > batch when looking back at data earlier in the stream. > > That's not that difficult to implement, but its a complication worth > considering. > > Brian > > > > On 10/25/2017 09:25 PM, Wes McKinney wrote: > >> What I'd proposed was to add metadata to indicate either an append >> (DELTA) or a replacement (NEW) >> >> On Wed, Oct 25, 2017 at 9:23 PM, Jacques Nadeau <[email protected]> >> wrote: >> >>> Is the proposal to only append to the dictionary or to redefine it? >>> >>> >>> On Wed, Oct 25, 2017 at 7:16 AM, Wes McKinney <[email protected]> >>> wrote: >>> >>> Opened https://issues.apache.org/jira/browse/ARROW-1727 >>>> >>>> On Tue, Oct 24, 2017 at 6:16 PM, Wes McKinney <[email protected]> >>>> wrote: >>>> >>>>> hi Brian, >>>>> >>>>> Thanks for bringing this up. I'm +1 on having a mechanism to enable >>>>> dictionaries to grow or change mid-stream. I figured that this would >>>>> eventually come up and the current design for the stream does not >>>>> preclude having dictionaries show up mid-stream. As an example, a >>>>> service streaming data from Parquet files might send >>>>> dictionary-encoded versions of some columns, and it would not be >>>>> practical to have to scan all of the Parquet files of interest to find >>>>> the global dictionary. The Apache CarbonData format built some >>>>> Spark-based infrastructure around this exact problem, but we cannot >>>>> assume that it will be cheap or practical to find the global >>>>> dictionary up front. >>>>> >>>>> I think having dictionary messages occur after the first record >>>>> batches is a reasonable strategy. I would suggest we add a "type" >>>>> field to the DictionaryBatch message type ([1]) so that we can either >>>>> indicate that the message is a NEW dictionary (i.e. the existing one >>>>> should be dropped) or a DELTA (additions) to an existing dictionary. I >>>>> don't think it will be difficult to accommodate this in the C++ >>>>> implementation, for example (though we will need to finally implement >>>>> "concatenate" for all supported types to make it work). >>>>> >>>>> Thanks, >>>>> Wes >>>>> >>>>> [1]: https://github.com/apache/arrow/blob/master/format/Message. >>>>> fbs#L86 >>>>> >>>>> On Tue, Oct 24, 2017 at 3:44 PM, Brian Hulette <[email protected] >>>>> > >>>>> >>>> wrote: >>>> >>>>> One issue we've struggled with when adding an Arrow interface to >>>>>> >>>>> Geomesa is >>>> >>>>> the requirement to send all dictionary batches before record batches in >>>>>> >>>>> the >>>> >>>>> IPC formats. Sometimes we have pre-computed "top-k" stats that we can >>>>>> >>>>> use to >>>> >>>>> assemble a dictionary beforehand, but those don't always exist, and >>>>>> even >>>>>> when they do they aren't complete by definition, so we could end up >>>>>> >>>>> hiding >>>> >>>>> valuable data in an "Other" category. So in practice we often have to >>>>>> >>>>> wait >>>> >>>>> to collect all the data before we can start streaming anything. >>>>>> >>>>>> I'd like to propose a couple of modifications to the Arrow IPC formats >>>>>> >>>>> that >>>> >>>>> could help alleviate this problem: >>>>>> 1) Allow multiple dictionary batches to use the same id. The vectors >>>>>> in >>>>>> >>>>> all >>>> >>>>> dictionary batches with the same id can be concatenated together to >>>>>> represent the full dictionary with that id. >>>>>> 2) Allow dictionary batches and record batches to be interleaved. For >>>>>> >>>>> the >>>> >>>>> streaming format, there could be an additional requirement that any >>>>>> dictionary key used in a record batch must have been defined in a >>>>>> >>>>> previously >>>> >>>>> sent dictionary batch. >>>>>> >>>>>> These changes would allow producers to send "delta" dictionary batches >>>>>> >>>>> in an >>>> >>>>> Arrow stream to define new keys that will be used in future record >>>>>> >>>>> batches. >>>> >>>>> Here's an example stream with one column of city names, to help >>>>>> >>>>> illustrate >>>> >>>>> the idea: >>>>>> >>>>>> <SCHEMA> >>>>>> <DICTIONARY id=0> >>>>>> (0) "New York" >>>>>> (1) "Seattle" >>>>>> (2) "Washington, DC" >>>>>> >>>>>> <RECORD BATCH 0> >>>>>> 0 >>>>>> 1 >>>>>> 2 >>>>>> 1 >>>>>> >>>>>> <DICTIONARY id=0> >>>>>> (3) "Chicago" >>>>>> (4) "San Francisco" >>>>>> >>>>>> <RECORD BATCH 1> >>>>>> 3 >>>>>> 2 >>>>>> 4 >>>>>> 0 >>>>>> EOS >>>>>> >>>>>> >>>>>> Decoded Data: >>>>>> ------------- >>>>>> New York >>>>>> Seattle >>>>>> Washington, DC >>>>>> Seattle >>>>>> Chicago >>>>>> Washington, DC >>>>>> San Francisco >>>>>> New York >>>>>> >>>>>> >>>>>> I also think it can be valuable if the requirement mentioned in #2 >>>>>> >>>>> applies >>>> >>>>> only to the streaming format, so that the random-access format would >>>>>> >>>>> support >>>> >>>>> dictionary batches following record batches. That way producers >>>>>> creating >>>>>> random-access files could start writing record batches before all the >>>>>> >>>>> data >>>> >>>>> for the dictionaries has been assembled. >>>>>> >>>>>> I need to give Paul Taylor credit for this idea - he actually already >>>>>> >>>>> wrote >>>> >>>>> the JS arrow reader to combine dictionaries with the same id >>>>>> (https://github.com/apache/arrow/blob/master/js/src/reader/ >>>>>> arrow.ts#L59 >>>>>> >>>>> ), >>>> >>>>> and it occurred to me that that could be a solution for us. >>>>>> >>>>>> Thanks >>>>>> Brian >>>>>> >>>>>> >
