[ 
https://issues.apache.org/jira/browse/HUDI-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

konwu updated HUDI-3559:
------------------------
    Description: 
*Environment:*

```sql

CREATE TABLE test_source (
 userid int, ts TIMESTAMP(3)
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='1',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100'
);

CREATE TABLE hudi_hdfs_test(
  userid int,
  ts TIMESTAMP(3),
  PRIMARY KEY (userid) NOT ENFORCED
)WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_011_2',
  'table.type' = 'COPY_ON_WRITE',
  'write.insert.drop.duplicates' = 'true',
  'write.precombine.field' = 'ts',

 'index.type'='BUCKET',

'hive_sync.enable' = 'true'
 );

insert into hudi_hdfs_test SELECT * from test_source

```

*Exception*

```java

Caused by: java.util.NoSuchElementException: No value present in Option at 
org.apache.hudi.common.util.Option.get(Option.java:88) 
~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116) 
~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) 
~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:485)
 ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:142)
 ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]

```

*Root cause*

processElement in 
hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java

```java

    if (bucketToFileIDMap.containsKey(partitionBucketId))

{       location = new HoodieRecordLocation("U", 
bucketToFileIDMap.get(partitionBucketId));     }

else

{       String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);   
    location = new HoodieRecordLocation("I", newFileId);       
bucketToFileIDMap.put(partitionBucketId, newFileId);     }

```

for Bucket Index Type , COW table will deduplicateRecords before merge , but 
deduplicateRecords  Method in FlinkWriteHelper

maybe run out of order

 

  was:
*Environment:*

```sql

CREATE TABLE test_source (
 userid int, ts TIMESTAMP(3)
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='1',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100'
);

CREATE TABLE hudi_hdfs_test(
  userid int,
  ts TIMESTAMP(3),
  PRIMARY KEY (userid) NOT ENFORCED
)WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_011_2',
  'table.type' = 'COPY_ON_WRITE',
  'write.insert.drop.duplicates' = 'true',
  'write.precombine.field' = 'ts',

 'index.type'='BUCKET',

'hive_sync.enable' = 'true'
 );

insert into hudi_hdfs_test SELECT * from test_source

```

*Exception*

```java

Caused by: java.util.NoSuchElementException: No value present in Option at 
org.apache.hudi.common.util.Option.get(Option.java:88) 
~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116) 
~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) 
~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:485)
 ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:142)
 ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]

```

*Root cause*

processElement in 
hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java

```java

    if (bucketToFileIDMap.containsKey(partitionBucketId)) {
      location = new HoodieRecordLocation("U", 
bucketToFileIDMap.get(partitionBucketId));
    } else {
      String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
      location = new HoodieRecordLocation("I", newFileId);
      bucketToFileIDMap.put(partitionBucketId, newFileId);
    }

```

With this logical if the `newFileId` in `bucketToFileIDMap` was created just 
before first commit , it use MergeHandle Then Throw this Exception ,because of 
no baseFile to Merge

 


> NoSuchElementException when use BUCKET index in flink cow table
> ---------------------------------------------------------------
>
>                 Key: HUDI-3559
>                 URL: https://issues.apache.org/jira/browse/HUDI-3559
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: konwu
>            Priority: Major
>             Fix For: 0.11.0
>
>
> *Environment:*
> ```sql
> CREATE TABLE test_source (
>  userid int, ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='1',
>  'fields.userid.kind'='random',
>  'fields.userid.min'='1',
>  'fields.userid.max'='100'
> );
> CREATE TABLE hudi_hdfs_test(
>   userid int,
>   ts TIMESTAMP(3),
>   PRIMARY KEY (userid) NOT ENFORCED
> )WITH (
>   'connector' = 'hudi',
>   'path' = '/tmp/hudi_011_2',
>   'table.type' = 'COPY_ON_WRITE',
>   'write.insert.drop.duplicates' = 'true',
>   'write.precombine.field' = 'ts',
>  'index.type'='BUCKET',
> 'hive_sync.enable' = 'true'
>  );
> insert into hudi_hdfs_test SELECT * from test_source
> ```
> *Exception*
> ```java
> Caused by: java.util.NoSuchElementException: No value present in Option at 
> org.apache.hudi.common.util.Option.get(Option.java:88) 
> ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
> org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116) 
> ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
> org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) 
> ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
> org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:485)
>  ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at 
> org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:142)
>  ~[hudi-flink-bundle_2.11-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
> ```
> *Root cause*
> processElement in 
> hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
> ```java
>     if (bucketToFileIDMap.containsKey(partitionBucketId))
> {       location = new HoodieRecordLocation("U", 
> bucketToFileIDMap.get(partitionBucketId));     }
> else
> {       String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); 
>       location = new HoodieRecordLocation("I", newFileId);       
> bucketToFileIDMap.put(partitionBucketId, newFileId);     }
> ```
> for Bucket Index Type , COW table will deduplicateRecords before merge , but 
> deduplicateRecords  Method in FlinkWriteHelper
> maybe run out of order
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to