[ 
https://issues.apache.org/jira/browse/SPARK-55131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-55131:
---------------------------------
    Description: 
The default delimit of StringAppendOperator (merge operator for RocksDB) 
conflicts when merge is used with non-existence value.

When there is an existing value, applying merge would be following:

<size of the encoded value (4 bytes, big endian)><encoded 
value><delimiter><...>{color:#ff0000}<delimiter><size of the encoded 
value><encoded value>{color}

*red color refers to the newly added content

Reading the value would be straightforward, read the size, read the encoded 
value based on the size, skip one byte (for delimiter), loop.

When there is no existing value, applying merge would be following:

{color:#ff0000}<delimiter><size of the encoded value><encoded value>{color}

Reading the value would not be the same with the case in above - we need to 
skip reading delimiter at the start to apply the same read logic.

-That said, we need to ensure delimiter to be an "invalid value" of the first 
byte of the size.-

-The value of default delimit is "," (44, 0x2C) which does not satisfy it. 
Since we do not allow negative value for the size, any value making the size (4 
bytes) to be negative can be used as a delimiter.-

^ The above is dependent on the endianness and we tend to use sun's Unsafe 
module for putting some value into byte array & UnsafeRow which follows 
system's default endianness. The above is only safe when the system uses big 
endian, which is guaranteed by JVM but that's abstracted within JVM and Unsafe 
module breaks the abstraction.

Alternative is to ensure the delimiter to be available in the first byte of the 
value, regardless of whether the value is written from put vs merge.

  was:
The default delimit of StringAppendOperator (merge operator for RocksDB) 
conflicts when merge is used with non-existence value.

When there is an existing value, applying merge would be following:

<size of the encoded value (4 bytes, big endian)><encoded 
value><delimiter><...>{color:#FF0000}<delimiter><size of the encoded 
value><encoded value>{color}

*red color refers to the newly added content

Reading the value would be straightforward, read the size, read the encoded 
value based on the size, skip one byte (for delimiter), loop.

When there is no existing value, applying merge would be following:

{color:#FF0000}<delimiter><size of the encoded value><encoded value>{color}

Reading the value would not be the same with the case in above - we need to 
skip reading delimiter at the start to apply the same read logic.

That said, we need to ensure delimiter to be an "invalid value" of the first 
byte of the size.

The value of default delimit is "," (44, 0x2C) which does not satisfy it. Since 
we do not allow negative value for the size, any value making the size (4 
bytes) to be negative can be used as a delimiter.


> The default delimiter of StringAppendOperator (merge operator for RocksDB) 
> conflicts when merge is used with non-existence value
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55131
>                 URL: https://issues.apache.org/jira/browse/SPARK-55131
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Jungtaek Lim
>            Priority: Major
>
> The default delimit of StringAppendOperator (merge operator for RocksDB) 
> conflicts when merge is used with non-existence value.
> When there is an existing value, applying merge would be following:
> <size of the encoded value (4 bytes, big endian)><encoded 
> value><delimiter><...>{color:#ff0000}<delimiter><size of the encoded 
> value><encoded value>{color}
> *red color refers to the newly added content
> Reading the value would be straightforward, read the size, read the encoded 
> value based on the size, skip one byte (for delimiter), loop.
> When there is no existing value, applying merge would be following:
> {color:#ff0000}<delimiter><size of the encoded value><encoded value>{color}
> Reading the value would not be the same with the case in above - we need to 
> skip reading delimiter at the start to apply the same read logic.
> -That said, we need to ensure delimiter to be an "invalid value" of the first 
> byte of the size.-
> -The value of default delimit is "," (44, 0x2C) which does not satisfy it. 
> Since we do not allow negative value for the size, any value making the size 
> (4 bytes) to be negative can be used as a delimiter.-
> ^ The above is dependent on the endianness and we tend to use sun's Unsafe 
> module for putting some value into byte array & UnsafeRow which follows 
> system's default endianness. The above is only safe when the system uses big 
> endian, which is guaranteed by JVM but that's abstracted within JVM and 
> Unsafe module breaks the abstraction.
> Alternative is to ensure the delimiter to be available in the first byte of 
> the value, regardless of whether the value is written from put vs merge.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to