[
https://issues.apache.org/jira/browse/HUDI-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
qian heng updated HUDI-3444:
----------------------------
Description:
*{*}Describe the problem you faced{*}*
NPE occured when ingesting data to MOR table by flink.
Through stacktrace, we can find the position where NPE occured is when hudi
attemped to close HoodeiAppendHandle and write all left data in
HoodieAppendHandle to a log file. (The line number may be different as same
logs are added)
{code:java}
//代码占位符
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
time 20220216172134086
at
org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:79)
at
org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49)
at
org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72)
at
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:167)
at
org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$6(StreamWriteFunction.java:548)
at
org.apache.hudi.sink.StreamWriteFunction.flushBucket(StreamWriteFunction.java:836)
at
org.apache.hudi.sink.StreamWriteFunction.bufferRecord(StreamWriteFunction.java:788)
at
org.apache.hudi.sink.StreamWriteFunction.processElement(StreamWriteFunction.java:298)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:264)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:257)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:426)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:688)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:643)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:654)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:627)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:831)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:612)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException:
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot
invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
because "this.writer" is null
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:114)
at
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70)
at
org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72)
... 22 more
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot
invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
because "this.writer" is null
at
org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:106)
at
org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:43)
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 26 more
Caused by: org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot
invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
because "this.writer" is null
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:188)
at
org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:102)
... 28 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException: Cannot invoke
"org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" because
"this.writer" is null
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:182)
... 29 more
Caused by: java.lang.NullPointerException
at
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:387)
at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:416)
at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:94)
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:115)
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:80)
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:41)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:162)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more {code}
Through code, I think this NPE is caused by this postion:
BaseFlinkDeltaCommitActionExecutor.java
{code:java}
//代码占位符
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx,
Iterator<HoodieRecord<T>> recordItr)
{ return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime,
table, idPfx, taskContextSupplier, new
ExplicitWriteHandleFactory(writeHandle)); }
{code}
the BaseFlinkDeltaCommitActionExecutor use ExplicitWriteHandleFactory as
writeHandleFactory, which leads to the reuse of the HoodieAppendHandle in
CopyOnWriteInsertHandler
{code:java}
//代码占位符
if (!handle.canWrite(payload.record))
{ // Handle is full. Close the handle and add the WriteStatus
statuses.addAll(handle.close()); // Open new handle handle =
writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle); }
handle.write(insertPayload, payload.insertValue, payload.exception);
}
{code}
Thus, if a log file is fullfilled and its handle is closed which means the
writer in the HoodieAppendHandle is also cleared. When we use the same
HoodieAppendHandle next time will leads to NPE of the writer.
*{*}To Reproduce{*}*
Steps to reproduce the behavior:
1. use flink to ingest data to a MOR table
2. set hoodie.parquet.max.file.size to a small value, so the
#handle.canWrite(payload.record)# can become false easily, then the handle will
be closed
3. when consuming the next record, the NPE will occur
*{*}Expected behavior{*}*
A clear and concise description of what you expected to happen.
*{*}Environment Description{*}*
* Hudi version : 0.10.0
* Flink version : 0.13.1
* Storage (HDFS/S3/GCS..) : hdfs
* Running on Docker? (yes/no) : no
was:
**Describe the problem you faced**
NPE occured when ingesting data to MOR table by flink.
Through stacktrace, we can find the position where NPE occured is when hudi
attemped to close HoodeiAppendHandle and write all left data in
HoodieAppendHandle to a log file. (The line number may be different as same
logs are added)
```
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
time 20220216172134086
at
org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:79)
at
org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49)
at
org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72)
at
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:167)
at
org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$6(StreamWriteFunction.java:548)
at
org.apache.hudi.sink.StreamWriteFunction.flushBucket(StreamWriteFunction.java:836)
at
org.apache.hudi.sink.StreamWriteFunction.bufferRecord(StreamWriteFunction.java:788)
at
org.apache.hudi.sink.StreamWriteFunction.processElement(StreamWriteFunction.java:298)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:264)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:257)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:426)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:688)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:643)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:654)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:627)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:831)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:612)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException:
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot
invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
because "this.writer" is null
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:114)
at
org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70)
at
org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72)
... 22 more
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot
invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
because "this.writer" is null
at
org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:106)
at
org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:43)
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 26 more
Caused by: org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot
invoke "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
because "this.writer" is null
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:188)
at
org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:102)
... 28 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException: Cannot invoke
"org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" because
"this.writer" is null
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:182)
... 29 more
Caused by: java.lang.NullPointerException
at
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:387)
at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:416)
at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:94)
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:115)
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:80)
at
org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:41)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:162)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
```
Through code, I think this NPE is caused by this postion:
BaseFlinkDeltaCommitActionExecutor.java
```java
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx,
Iterator<HoodieRecord<T>> recordItr) {
return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime,
table,
idPfx, taskContextSupplier, new
ExplicitWriteHandleFactory(writeHandle));
}
```
the BaseFlinkDeltaCommitActionExecutor use ExplicitWriteHandleFactory as
writeHandleFactory, which leads to the reuse of the HoodieAppendHandle in
CopyOnWriteInsertHandler
```java
if (!handle.canWrite(payload.record)) {
// Handle is full. Close the handle and add the WriteStatus
statuses.addAll(handle.close());
// Open new handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
handle.write(insertPayload, payload.insertValue, payload.exception);
}
```
Thus, if a log file is fullfilled and its handle is closed which means the
writer in the HoodieAppendHandle is also cleared. When we use the same
HoodieAppendHandle next time will leads to NPE of the writer.
**To Reproduce**
Steps to reproduce the behavior:
1. use flink to ingest data to a MOR table
2. set hoodie.parquet.max.file.size to a small value, so the
#handle.canWrite(payload.record)# can become false easily, then the handle will
be closed
3. when consuming the next record, the NPE will occur
**Expected behavior**
A clear and concise description of what you expected to happen.
**Environment Description**
* Hudi version : 0.10.0
* Flink version : 0.13.1
* Storage (HDFS/S3/GCS..) : hdfs
* Running on Docker? (yes/no) : no
> NPE occured when ingesting data to MOR table by flink
> -----------------------------------------------------
>
> Key: HUDI-3444
> URL: https://issues.apache.org/jira/browse/HUDI-3444
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Affects Versions: 0.10.0
> Reporter: qian heng
> Priority: Major
>
> *{*}Describe the problem you faced{*}*
> NPE occured when ingesting data to MOR table by flink.
> Through stacktrace, we can find the position where NPE occured is when hudi
> attemped to close HoodeiAppendHandle and write all left data in
> HoodieAppendHandle to a log file. (The line number may be different as same
> logs are added)
> {code:java}
> //代码占位符
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
> time 20220216172134086
> at
> org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:79)
> at
> org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49)
> at
> org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72)
> at
> org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:167)
> at
> org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$6(StreamWriteFunction.java:548)
> at
> org.apache.hudi.sink.StreamWriteFunction.flushBucket(StreamWriteFunction.java:836)
> at
> org.apache.hudi.sink.StreamWriteFunction.bufferRecord(StreamWriteFunction.java:788)
> at
> org.apache.hudi.sink.StreamWriteFunction.processElement(StreamWriteFunction.java:298)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:264)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:257)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:426)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:688)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:643)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:627)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:831)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:612)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException:
> org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieException:
> java.util.concurrent.ExecutionException: java.lang.NullPointerException:
> Cannot invoke
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
> because "this.writer" is null
> at
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> at
> org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:114)
> at
> org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70)
> at
> org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72)
> ... 22 more
> Caused by: org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieException:
> java.util.concurrent.ExecutionException: java.lang.NullPointerException:
> Cannot invoke
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
> because "this.writer" is null
> at
> org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:106)
> at
> org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:43)
> at
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
> ... 26 more
> Caused by: org.apache.hudi.exception.HoodieException:
> java.util.concurrent.ExecutionException: java.lang.NullPointerException:
> Cannot invoke
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
> because "this.writer" is null
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:188)
> at
> org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:102)
> ... 28 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException: Cannot invoke
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()"
> because "this.writer" is null
> at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:182)
> ... 29 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:387)
> at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:416)
> at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:94)
> at
> org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:115)
> at
> org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:80)
> at
> org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:41)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:162)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more {code}
> Through code, I think this NPE is caused by this postion:
> BaseFlinkDeltaCommitActionExecutor.java
> {code:java}
> //代码占位符
> @Override
> public Iterator<List<WriteStatus>> handleInsert(String idPfx,
> Iterator<HoodieRecord<T>> recordItr)
> { return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime,
> table, idPfx, taskContextSupplier, new
> ExplicitWriteHandleFactory(writeHandle)); }
> {code}
> the BaseFlinkDeltaCommitActionExecutor use ExplicitWriteHandleFactory as
> writeHandleFactory, which leads to the reuse of the HoodieAppendHandle in
> CopyOnWriteInsertHandler
> {code:java}
> //代码占位符
> if (!handle.canWrite(payload.record))
> { // Handle is full. Close the handle and add the WriteStatus
> statuses.addAll(handle.close()); // Open new handle handle =
> writeHandleFactory.create(config, instantTime, hoodieTable,
> insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
> handles.put(partitionPath, handle); }
> handle.write(insertPayload, payload.insertValue, payload.exception);
> }
> {code}
> Thus, if a log file is fullfilled and its handle is closed which means the
> writer in the HoodieAppendHandle is also cleared. When we use the same
> HoodieAppendHandle next time will leads to NPE of the writer.
> *{*}To Reproduce{*}*
> Steps to reproduce the behavior:
> 1. use flink to ingest data to a MOR table
> 2. set hoodie.parquet.max.file.size to a small value, so the
> #handle.canWrite(payload.record)# can become false easily, then the handle
> will be closed
> 3. when consuming the next record, the NPE will occur
> *{*}Expected behavior{*}*
> A clear and concise description of what you expected to happen.
> *{*}Environment Description{*}*
> * Hudi version : 0.10.0
> * Flink version : 0.13.1
> * Storage (HDFS/S3/GCS..) : hdfs
> * Running on Docker? (yes/no) : no
--
This message was sent by Atlassian Jira
(v8.20.1#820001)