This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6029082 [Flink][Bug] Fix potential NPE when cancel
DorisSourceFunction (#6838)
6029082 is described below
commit 6029082c2a23009a7aaabd2d4d9762e8691e5a2c
Author: Yun Tang <[email protected]>
AuthorDate: Sat Oct 23 16:45:24 2021 +0800
[Flink][Bug] Fix potential NPE when cancel DorisSourceFunction (#6838)
Fix potential NPE of `scalaValueReader` when cancelling DorisSourceFunction.
---
.../doris/flink/datastream/DorisSourceFunction.java | 19 +++++++++++++------
.../doris/flink/datastream/ScalaValueReader.scala | 3 +--
2 files changed, 14 insertions(+), 8 deletions(-)
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 08ec5b0..85f5f6b 100644
---
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -43,8 +43,8 @@ public class DorisSourceFunction extends
RichSourceFunction<List<?>> implements
private final DorisDeserializationSchema<List<?>> deserializer;
private final DorisOptions options;
private final DorisReadOptions readOptions;
+ private transient volatile boolean isRunning;
private List<PartitionDefinition> dorisPartitions;
- private ScalaValueReader scalaValueReader;
public DorisSourceFunction(DorisStreamOptions streamOptions,
DorisDeserializationSchema<List<?>> deserializer) {
this.deserializer = deserializer;
@@ -55,25 +55,32 @@ public class DorisSourceFunction extends
RichSourceFunction<List<?>> implements
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
+ this.isRunning = true;
this.dorisPartitions = RestService.findPartitions(options,
readOptions, logger);
}
@Override
public void run(SourceContext<List<?>> sourceContext) {
for (PartitionDefinition partitions : dorisPartitions) {
- scalaValueReader = new ScalaValueReader(partitions, options,
readOptions);
- while (scalaValueReader.hasNext()) {
- List<?> next = scalaValueReader.next();
- sourceContext.collect(next);
+ try (ScalaValueReader scalaValueReader = new
ScalaValueReader(partitions, options, readOptions)) {
+ while (isRunning && scalaValueReader.hasNext()) {
+ List<?> next = scalaValueReader.next();
+ sourceContext.collect(next);
+ }
}
}
}
@Override
public void cancel() {
- scalaValueReader.close();
+ isRunning = false;
}
+ @Override
+ public void close() throws Exception {
+ super.close();
+ isRunning = false;
+ }
@Override
public TypeInformation<List<?>> getProducedType() {
diff --git
a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
index 093390d..06df2ef 100644
---
a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++
b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
@@ -19,7 +19,6 @@ package org.apache.doris.flink.datastream
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
-
import org.apache.doris.flink.backend.BackendClient
import org.apache.doris.flink.cfg.ConfigurationOptions._
import org.apache.doris.flink.cfg.{DorisOptions, DorisReadOptions}
@@ -41,7 +40,7 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param options request configuration
*/
-class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions,
readOptions: DorisReadOptions) {
+class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions,
readOptions: DorisReadOptions) extends AutoCloseable {
protected val logger = Logger.getLogger(classOf[ScalaValueReader])
protected val client = new BackendClient(new
Routing(partition.getBeAddress), readOptions)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]