[
https://issues.apache.org/jira/browse/FLINK-35343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865452#comment-17865452
]
Shekhar Prasad Rajak commented on FLINK-35343:
----------------------------------------------
I would like to work on this issue.
> NullPointerException in SourceReaderBase
> ----------------------------------------
>
> Key: FLINK-35343
> URL: https://issues.apache.org/jira/browse/FLINK-35343
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.17.2
> Environment: * flink(1.17.2), local mode and deploy on k8s
> * doris-flink-connector-1.17(1.6.0)
> * Doris(2.1)
> h2.
> Reporter: zyh
> Priority: Blocker
> Attachments: flinktask.png, servicelog.png
>
>
> h2. operation
> I used flink batch to read data from Doris and write to Doris.
> The flink job include two source task, one table join task and one sink task,
> which like:
> source: Table A
> source: Table B
> hashjoin: c= a join b
> sink: c
> h2.
> h2. table properties
> h3. source
> properties.put("connector", "doris");
> properties.put("fenodes", inputDataSource.getHttpUrl());
> properties.put("table.identifier", inputDataSource.getDatabase() + "." +
> sourceTable.getName());
> properties.put("username", inputDataSource.getUsername());
> properties.put("password", inputDataSource.getPassword());
> h3. sink
> properties.put("connector", "doris");
> properties.put("fenodes", dataExplore.getOutputDataSource().getHttpUrl());
> properties.put("table.identifier",
> dataExplore.getOutputDataSource().getDatabase() + "." + tableName);
> properties.put("username", dataExplore.getOutputDataSource().getUsername());
> properties.put("password", dataExplore.getOutputDataSource().getPassword());
> properties.put("sink.properties.format", "csv");
> //列分隔符
> properties.put("sink.properties.column_separator", "#cs_");
> //行分隔符
> properties.put("sink.properties.line_delimiter", "#ld_");
> properties.put("sink.label-prefix", "doris_label" + UUID.randomUUID());
> properties.put("sink.parallelism", "2");
>
>
>
> h2. exception stack
> {code:java}
> Caused by: java.lang.NullPointerException at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:194)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:208)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:173)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at
> java.lang.Thread.run(Thread.java:748) {code}
>
> h2. other
> The problem only occur in flink local mode and deploy on k8s.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)