Leo Timofeyev created SPARK-47859:
-------------------------------------
Summary: Why does this lead to the memory leak?
Key: SPARK-47859
URL: https://issues.apache.org/jira/browse/SPARK-47859
Project: Spark
Issue Type: IT Help
Components: Spark Core
Affects Versions: 3.5.0, 3.3.2
Reporter: Leo Timofeyev
Attachments: Screenshot 2024-04-15 at 20.43.22.png
Hello Spark community. I have an Java Spark Structured Streaming application:
JedisCluster closed in finally block, but still some memory leak.
{code:java}
FlatMapFunction<Iterator<Row>, Row> myFunction = new
MyFunction(jedisConfiguration);
StructType structSchema = getSchema();
VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
Dataset<Row> dataset = getDataset();
dataset.persist();
JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction);
Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD,
structSchema);
parquetWriter.write(processedDS);
dataset.unpersist();
};
DataStreamWriter<Row> dataStream = dataset
.writeStream()
.foreachBatch(forEachFunc)
.outputMode(outputMode)
.option("checkpointLocation", checkpointLocation);
....<stream dataStream> {code}
And function
{code:java}
public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
<constructor with jedisConfiguration parameter>...
@Override
public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
List<Row> output;
JedisCluster redis = new JedisCluster(jedisConfiguration);
try {
output = new ArrayList<>();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
Long var1 = row.getAs("var1");
Long var2 = row.getAs("var2");
var redisKey = "some_key";
var result = redis.hgetAll(redisKey);
if (!result.isEmpty()) {
output.add(RowFactory.create(
var1,
var2,
result.getOrDefault("some_id", null)));
}
}
} finally {
if (redis != null) {
try {
redis.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close Redis
connection: " + e);
}
}
}
return output.iterator();
}
} {code}
It actually works couple of days then dies. Can't figure out what does cause
memory leak in the Driver?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]