Hello Prashant, thanks for your time.

> With non unique keys how would tagging of records (for updates /
deletes) work?

Currently both GLOBAL_SIMPLE/BLOOM work out of the box in the mentioned
context. See below pyspark script and results. As for the
implementation, the tagLocationBacktoRecords returns a rdd of
HoodieRecord with (key/part/location), and it can contain duplicate
keys (then multiple records for same key).

```
tableName = "test_global_bloom"
basePath = f"/tmp/{tableName}"

hudi_options = {
    "hoodie.table.name": tableName,
    "hoodie.datasource.write.recordkey.field": "event_id",
    "hoodie.datasource.write.partitionpath.field": "part",
    "hoodie.datasource.write.table.name": tableName,
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.enable": "false",
    "hoodie.metadata.enable": "true",
    "hoodie.index.type": "GLOBAL_BLOOM", # GLOBAL_SIMPLE works as well
}

# LET'S GEN DUPLS
mode="overwrite"
df =spark.sql("""select '1' as event_id, '2' as ts, '2' as part UNION
                 select '1' as event_id, '3' as ts, '3' as part UNION
                 select '1' as event_id, '2' as ts, '3' as part UNION
                 select '2' as event_id, '2' as ts, '3' as part""")
df.write.format("hudi").options(**hudi_options).option("hoodie.datasour
ce.write.operation", "BULK_INSERT").mode(mode).save(basePath)
spark.read.format("hudi").load(basePath).select("event_id",
"ts","part").show()
# +--------+---+----+
# |event_id| ts|part|
# +--------+---+----+
# |       1|  3|   3|
# |       1|  2|   3|
# |       2|  2|   3|
# |       1|  2|   2|
# +--------+---+----+

# UPDATE
mode="append"
spark.sql("select '1' as event_id, '20' as ts, '4' as
part").write.format("hudi").options(**hudi_options).option("hoodie.data
source.write.operation", "UPSERT").mode(mode).save(basePath)
spark.read.format("hudi").load(basePath).select("event_id",
"ts","part").show()
# +--------+---+----+
# |event_id| ts|part|
# +--------+---+----+
# |       1| 20|   4|
# |       1| 20|   4|
# |       1| 20|   4|
# |       2|  2|   3|
# +--------+---+----+

# DELETE
mode="append"
spark.sql("select 1 as
event_id").write.format("hudi").options(**hudi_options).option("hoodie.
datasource.write.operation", "DELETE").mode(mode).save(basePath)
spark.read.format("hudi").load(basePath).select("event_id",
"ts","part").show()
# +--------+---+----+
# |event_id| ts|part|
# +--------+---+----+
# |       2|  2|   3|
# +--------+---+----+
```


> How would record Index know which mapping of the array to
return for a given record key?

As well as GLOBAL_SIMPLE/BLOOM, for a given record key, the RLI would
return a list of mapping. Then the operation (update, delete, FCOW ...)
would apply to each location.

To illustrate, we could get something like this in the MDT:

|event_id:1|[
             {part=2, -5811947225812876253, -6812062179961430298, 0, 
1689147210233}, 
             {part=3, -711947225812876253, -8812062179961430298, 1, 
1689147210233},
             {part=3, -1811947225812876253, -2812062179961430298, 0, 
1689147210233} 
             ]|


On Thu, 2023-07-13 at 10:17 -0700, Prashant Wason wrote:
> Hi Nicolas,
> 
> The RI feature is designed for max performance as it is at a record-
> count
> scale. Hence, the schema is simplified and minimized.
> 
> With non unique keys how would tagging of records (for updates /
> deletes)
> work? How would record Index know which mapping of the array to
> return for
> a given record key?
> 
> Thanks
> Prashant
> 
> 
> 
> On Wed, Jul 12, 2023 at 2:02 AM nicolas paris
> <nicolas.pa...@riseup.net>
> wrote:
> 
> > hi there,
> > 
> > Just tested preview of RLI (rfc-08), amazing feature. Soon the fast
> > COW
> > (rfc-68) will be based on RLI to get the parquet offsets and allow
> > targeting parquet row groups.
> > 
> > RLI is a global index, therefore it assumes the hudi key is present
> > in
> > at most one parquet file. As a result in the MDT, the RLI is of
> > type
> > struct, and there is a 1:1 mapping w/ a given file.
> > 
> > Type:
> >    |-- recordIndexMetadata: struct (nullable = true)
> >    |    |-- partition: string (nullable = false)
> >    |    |-- fileIdHighBits: long (nullable = false)
> >    |    |-- fileIdLowBits: long (nullable = false)
> >    |    |-- fileIndex: integer (nullable = false)
> >    |    |-- instantTime: long (nullable = false)
> > 
> > Content:
> >    |event_id:1        |{part=3, -6811947225812876253,
> > -7812062179961430298, 0, 1689147210233}|
> > 
> > We would love to use both RLI and FCOW features, but I'm afraid our
> > keys are not unique in our kafka archives. Same key might be
> > present
> > in multiple partitions, and even in multiple slices within
> > partitions.
> > 
> > I wonder if the future, RLI could support multiple parquet files
> > (by
> > storing an array of struct for eg). This would enable to leverage
> > LRI
> > in more contexts
> > 
> > Thx
> > 
> > 
> > 
> > 
> > 

Reply via email to