ccchenhe opened a new issue, #6034:
URL: https://github.com/apache/hudi/issues/6034

   **Describe the problem you faced**
   
   testing hudi bucket index, I submit 2 flink applications to consume same 
topic using different group.id.
   
   application A using 0.10.0, index.type(not hoodie.index.type) = 'BLOOM'
   application B using 0.11.1, index.type(not hoodie.index.type) = 'BUCKET'
   
   e.g. we have 2 binlogs
   |id|DML type|detail|
   |--|--|--|
   |1|insert|insert xx into (id, update_time, status) values(1,1656860400, 0)|
   |1|update|update xx set update_time = 1656860460, status = 1 where id = 1 |
   
   and we think final staus should be status = 1 where id = 1
   |application|hudi version|id|record detail|it's right?|
   |--|--|--|--|--|
   |application A|0.10.0|id =1|status=1, update_time = 1656860460|yep|
   |application B|0.11.1|id =1|status=0, update_time = 1656860400|no|
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.create kafka source table
   ```sql
   create table kafka_source_table(
   `database` string
   ,`table` string
   ,`type` string
   ,`ts` bigint
   ,`maxwell_ts` bigint
   ,`session_id` string
   ,`sequence_id` int
   ,`xid` bigint
   ,`xoffset` int
   ,`primary_key` Array<Int>
   ,`primary_key_columns` Array<String>
   ,`data` Map<String, String>
   ,`old` Map<String, String>
   ,`ts` TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss'))
   ,WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND.
   )with (
        'connector' = 'kafka'
       ,'properties.security.protocol' = 'SASL_PLAINTEXT'
       ,'properties.sasl.mechanism' = 'PLAIN'
       ,'properties.sasl.jaas.config' = 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="xxxx" password="xxxx";'
       ,'scan.startup.mode' = 'earliest-offset' 
       ,'format' = 'json'
       ,'json.ignore-parse-errors' = 'true'
       ,'properties.fetch.message.max.bytes' = '10485760'
       ,'properties.socket.receive.buffer.bytes' = '1048576'
       ,'properties.request.timeout.ms' = '60000'
       ,'topic'='xxxxxxxx'
       ,'properties.group.id' = 'xxx'
       ,'properties.bootstrap.servers'='xxxx'
   ); 
   ```
   2.create hudi sink table using BUCKET index
   ```sql 
   create table sink_hudi (accounting_statement_id STRING ,
   amount BIGINT ,
   channel_detail_id STRING ,
   channel_id BIGINT ,
   clearing_agreement_item_id DECIMAL(20, 0) ,
   clearing_date DATE ,
   client_id STRING ,
   complete_time BIGINT ,
   create_time BIGINT ,
   currency STRING ,
   detail_status INT ,
   discrepancy_statement_id STRING ,
   entity_id DECIMAL(20, 0) ,
   extra_info STRING ,
   fund_direction INT ,
   id STRING ,
   identity_id STRING ,
   institution_no STRING ,
   mid STRING ,
   payment_id STRING ,
   reference_id STRING ,
   sub_channel_code STRING ,
   system_code STRING ,
   transaction_id STRING ,
   transaction_time BIGINT ,
   update_time BIGINT ,
   version BIGINT ,
   writeoff_statement_id STRING ,
   `database` STRING, `table` STRING, 
   `_event` Row<`ts` bigint,`database` string,`table` string,`type` 
string,`maxwell_ts` BIGINT>
   , grass_date STRING  ) 
   WITH(
        'compaction.schedule.enabled' = 'true'
        ,'compaction.async.enabled' = 'false'
        ,'compaction.tasks' = '8'
        ,'compaction.delta_commits' = '15'
        ,'hoodie.table.type' = 'COPY_ON_WRITE' 
        ,'hoodie.parquet.max.file.size' = '268435456'
        ,'hoodie.datasource.write.recordkey.field' = 'database,table,id'
        ,'hoodie.datasource.write.precombine.field' = 'update_time'
        ,'hoodie.parquet.small.file.limit' = '104857600'
        ,'hoodie.parquet.compression.codec'= 'snappy'
        ,'connector' = 'hudi'
        ,'path' = '$hdfsPath'
        ,'index.bootstrap.enabled' = 'true'
        ,'index.state.ttl' = '0'
        ,'index.type' = 'BUCKET'
        ,'hoodie.index.type' = 'BLOOM'
        ,'hoodie.index.class' = 'org.apache.hudi.index.bucket.HoodieBucketIndex'
        ,'hive_sync.partition_fields' = 'grass_date'
        ,'hive_sync.metastore.uris' = '$thrift://xxx'
        ,'hive_sync.db' = '$hiveDatabaseName'
        ,'hive_sync.table' = '$hiveTableName'
        ,'hive_sync.enable' = 'true'
        ,'hive_sync.use_jdbc' = 'false'
        ,'hive_sync.mode' = 'hms'
        ,'hoodie.datasource.write.hive_style_partitioning'= 'true'
        ,'write.operation' = 'upsert'
        ,'write.tasks'='4'
        ,'write.index_bootstrap.tasks'='32'
        ,'write.bucket_assign.tasks'='16'
        ,'write.rate.limit' = '64000'
        ,'write.precombine.field' = 'update_time'
        ,'hoodie.payload.ordering.field' = 'update_time'
        ,'write.payload.class' = 
'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
        ,'hoodie.datasource.write.partitionpath.field' = 'grass_date'
        ,'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator'
        ,'hive_sync.partition_extractor_class' = 
'org.apache.hudi.hive.MultiPartKeysValueExtractor'
        ,'hoodie.bucket.index.num.buckets' = '20'
        ,'hoodie.bucket.index.hash.field' = 'database,table,id'
   )
   ```
   3.create hudi sink table using Flink State index 
   ```sql
   -- ...
   -- same
   -- ...
   WITH(
   
        ,'index.type' = 'BLOOM'
       -- ,'hoodie.index.class' = 
'org.apache.hudi.index.bucket.HoodieBucketIndex'
       -- ,'hoodie.bucket.index.num.buckets' = '20'
       -- other config is same
   )
   ```
   4. 2 tasks consume same topic using different groupid, and sink different 
hudi table, one use flink state, other use bucket
   ```sql
   insert into sink
   SELECT CAST(JSON_VALUE(`data`, '$.accounting_statement_id') as STRING),
   CAST(JSON_VALUE(`data`, '$.amount') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.channel_detail_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.channel_id') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.clearing_agreement_item_id') as DECIMAL(20, 0)), 
   CAST(JSON_VALUE(`data`, '$.clearing_date') as DATE), 
   CAST(JSON_VALUE(`data`, '$.client_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.complete_time') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.create_time') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.currency') as STRING), 
   CAST(JSON_VALUE(`data`, '$.detail_status') as INT), 
   CAST(JSON_VALUE(`data`, '$.discrepancy_statement_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.entity_id') as DECIMAL(20, 0)), 
   CAST(JSON_VALUE(`data`, '$.extra_info') as STRING), 
   CAST(JSON_VALUE(`data`, '$.fund_direction') as INT), 
   CAST(JSON_VALUE(`data`, '$.id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.identity_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.institution_no') as STRING), 
   CAST(JSON_VALUE(`data`, '$.mid') as STRING), 
   CAST(JSON_VALUE(`data`, '$.payment_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.reference_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.sub_channel_code') as STRING), 
   CAST(JSON_VALUE(`data`, '$.system_code') as STRING), 
   CAST(JSON_VALUE(`data`, '$.transaction_id') as STRING), 
   CAST(JSON_VALUE(`data`, '$.transaction_time') as BIGINT), 
   CAST(IFNULL(JSON_VALUE(`data`, '$.update_time'), '0') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.version') as BIGINT), 
   CAST(JSON_VALUE(`data`, '$.writeoff_statement_id') as STRING), 
   `database`, 
   `table`,
   Row(`ts`, `database`, `table`, `type`, `maxwell_ts`), 
   DATE_FORMAT(TO_TIMESTAMP(FROM_UNIXTIME(CAST(JSON_VALUE(`data`, 
'$.create_time') as bigint), 'yyyy-MM-dd HH:mm:ss')), 'yyyy-MM-dd') 
   
   from source
   
   ```
   5. compare 2 hudi tables
   ```sql
   select *
   from stateTable t1
   left join bucketTable t2
   on t1.id = t2.id
   and t1.`database` = t2.`database`
   and t1.`table` = t2.`table`
   where t1.update_time > t2.update_time
   and t1.grass_date >= '2022-06-30'
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   *Flink version: 1.13.14
   
   * Hudi version :0.11.1
   
   * Spark version :not use
   
   * Hive version :3.1.2
   
   * Hadoop version :3.2.0
   
   * Storage (HDFS/S3/GCS..) :HDFS
   
   * Running on Docker? (yes/no) :nope
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to