Thanks a lot Siva! Tested and it works as expected.

-----Original Message-----
From: Sivabalan <[email protected]> 
Sent: Sunday, May 24, 2020 11:10 PM
To: [email protected]
Subject: RE: [EXTERNAL] Hudi Global Bloom Index Issue

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi Raghu,
   Hudi has a property named "hoodie.bloom.index.update.partition.path".
You might want to try setting this to true if you need the behavior you are 
expecting. Here is the docs docs for this config. Default value is false for 
this config param.

/**
 * Only applies if index type is GLOBAL_BLOOM.
 * <p>
 * When set to true, an update to a record with a different partition from its 
existing one
 * will insert the record to the new partition and delete it from the old 
partition.
 * <p>
 * When set to false, a record will be updated to the old partition.
 */

Let me know if this makes sense and is what you are expecting.



On Sun, May 24, 2020 at 10:14 AM Dubey, Raghu <[email protected]>
wrote:

> Hi Team,
>
> I used DeltaStreamer to ingest data and performed a test where the 
> partition column changes. When the partition column in my dataset got 
> updated, my hive query on Hudi dataset returned 2 rows for the same 
> recordKey. This was expected and I got the explanation in this issue.
> https://github.com/apache/incubator-hudi/issues/423
>
> However, as per the link :
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHud
> iindexingwork&whatareitsbenefits, I tried setting 
> hoodie.index.type=GLOBAL_BLOOM to circumvent this issue.
> Once I do this, I do not see 2 records anymore in the Hive query.
>
> But I was expecting that it should move a file from one partition to 
> another and update the partition column in Hudi (in my case store_id). 
> I only have 2 store IDs (1 and 2). But if you see the results of the 
> queries before and after the update statement, it seems to have just 
> stopped updating the partition path altogether.
>
> Am I doing something wrong here?
> I want to have a global uniqueness of data on recordKey irrespective 
> of partition path.
> Please help.
>
> Below is the updated incremental source data that was used (store_id 
> is updated to 1 instead of 2).
> {
>     "Op": "U",
>     "src_received_ts": "2020-05-22 15:03:51.000000",
>     "tran_id": 18,
>     "tran_date": "2019-03-16",
>     "store_id": 1,
>     "store_city": "CHICAGO",
>     "store_state": "IL",
>     "item_code": "XXXXXX",
>     "quantity": 126
> }
>
> Before:##########
>
> hive> select * from  XXXXX  where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key
> _hoodie_partition_path     _hoodie_file_name   op  src_received_ts
>  tran_id    tran_date   store_city  store_state item_code  quantity
> total       store_id
> 20200519123838  20200519123838_1_1      18      2
>  4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet   I
>      2020-05-19 12:26:03.000000     18      2019-03-16      CHICAGO IL
> XXXXXX  126     22.0    2
>
> After:##########
>
> select * from XXXXX where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key
> _hoodie_partition_path     _hoodie_file_name   op  src_received_ts
>  tran_id    tran_date   store_city  store_state item_code  quantity
> total       store_id
> 20200522150752  20200522150752_0_1      18      2
>  4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet   U
>      2020-05-22 15:03:51.000000     18      2019-03-16      CHICAGO IL
> XXXXXX  126     1.19E-43        2
> Time taken: 0.952 seconds, Fetched: 1 row(s)
>
> Hudi Command:
>
> spark-submit --class
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \ 
> --packages
> org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache
> .spark:spark-avro_2.11:2.4.5
> \
> --master yarn \
> --deploy-mode cluster \
> /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \ 
> --table-type COPY_ON_WRITE \ --source-ordering-field src_received_ts \ 
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ 
> --target-base-path s3://xxxx/xxxxxxx \ --target-table xxxxxxx \ 
> --props s3://xxxx/xxx/hudi_base.properties \ --schemaprovider-class 
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ 
> --transformer-class 
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ 
> --enable-hive-sync
>
> Here is the hudi_base.properties file:
>
> hoodie.cleaner.commits.retained=2
> hoodie.upsert.shuffle.parallelism=1
> hoodie.insert.shuffle.parallelism=1
> hoodie.bulkinsert.shuffle.parallelism=1
> hoodie.datasource.write.recordkey.field=tran_id
> hoodie.datasource.hive_sync.partition_fields=store_id
> hoodie.datasource.write.partitionpath.field=store_id
> hoodie.datasource.hive_sync.database=default
> hoodie.datasource.hive_sync.table=XXXX
> hoodie.datasource.hive_sync.assume_date_partitioning=false
>
> hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/
> trgt.avsc
>
> hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/s
> rc.avsc
>
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.
> hive.MultiPartKeysValueExtractor 
> hoodie.deltastreamer.transformer.sql=select
> Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,i
> tem_code,quantity
> from <SRC>
> hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX
> hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX 
> hoodie.index.type=GLOBAL_BLOOM
>
>
> Thanks,
> Raghu
>
>

--
Regards,
-Sivabalan

Reply via email to