[
https://issues.apache.org/jira/browse/SPARK-47859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leo Timofeyev updated SPARK-47859:
----------------------------------
Description:
Hello Spark community. I have an Java Spark Structured Streaming application:
JedisCluster closed in finally block, but still some memory leak.
{code:java}
FlatMapFunction<Iterator<Row>, Row> myFunction = new
MyFunction(jedisConfiguration);
StructType structSchema = getSchema();
VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
Dataset<Row> dataset = getDataset();
dataset.persist();
JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction);
Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD,
structSchema);
parquetWriter.write(processedDS);
dataset.unpersist();
};
DataStreamWriter<Row> dataStream = dataset
.writeStream()
.foreachBatch(forEachFunc)
.outputMode(outputMode)
.option("checkpointLocation", checkpointLocation);
....<stream dataStream> {code}
And function
{code:java}
public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
<constructor with jedisConfiguration parameter>...
@Override
public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
List<Row> output;
JedisCluster redis = new JedisCluster(jedisConfiguration);
try {
output = new ArrayList<>();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
Long var1 = row.getAs("var1");
Long var2 = row.getAs("var2");
var redisKey = "some_key";
var result = redis.hgetAll(redisKey);
if (!result.isEmpty()) {
output.add(RowFactory.create(
var1,
var2,
result.getOrDefault("some_id", null)));
}
}
} finally {
if (redis != null) {
try {
redis.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close Redis
connection: " + e);
}
}
}
return output.iterator();
}
} {code}
It actually works couple of days then dies. Can't figure out what does cause
memory leak in the Driver?
Grafana board of the Driver's Memory Pool
!Screenshot 2024-04-15 at 20.43.22.png|width=875,height=169!
was:
Hello Spark community. I have an Java Spark Structured Streaming application:
JedisCluster closed in finally block, but still some memory leak.
{code:java}
FlatMapFunction<Iterator<Row>, Row> myFunction = new
MyFunction(jedisConfiguration);
StructType structSchema = getSchema();
VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
Dataset<Row> dataset = getDataset();
dataset.persist();
JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction);
Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD,
structSchema);
parquetWriter.write(processedDS);
dataset.unpersist();
};
DataStreamWriter<Row> dataStream = dataset
.writeStream()
.foreachBatch(forEachFunc)
.outputMode(outputMode)
.option("checkpointLocation", checkpointLocation);
....<stream dataStream> {code}
And function
{code:java}
public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
<constructor with jedisConfiguration parameter>...
@Override
public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
List<Row> output;
JedisCluster redis = new JedisCluster(jedisConfiguration);
try {
output = new ArrayList<>();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
Long var1 = row.getAs("var1");
Long var2 = row.getAs("var2");
var redisKey = "some_key";
var result = redis.hgetAll(redisKey);
if (!result.isEmpty()) {
output.add(RowFactory.create(
var1,
var2,
result.getOrDefault("some_id", null)));
}
}
} finally {
if (redis != null) {
try {
redis.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close Redis
connection: " + e);
}
}
}
return output.iterator();
}
} {code}
It actually works couple of days then dies. Can't figure out what does cause
memory leak in the Driver?
> Why does this lead to the memory leak?
> --------------------------------------
>
> Key: SPARK-47859
> URL: https://issues.apache.org/jira/browse/SPARK-47859
> Project: Spark
> Issue Type: IT Help
> Components: Spark Core
> Affects Versions: 3.3.2, 3.5.0
> Reporter: Leo Timofeyev
> Priority: Major
> Attachments: Screenshot 2024-04-15 at 20.43.22.png
>
>
> Hello Spark community. I have an Java Spark Structured Streaming application:
> JedisCluster closed in finally block, but still some memory leak.
> {code:java}
> FlatMapFunction<Iterator<Row>, Row> myFunction = new
> MyFunction(jedisConfiguration);
> StructType structSchema = getSchema();
> VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
> Dataset<Row> dataset = getDataset();
> dataset.persist();
> JavaRDD<Row> processedRDD =
> dataset.javaRDD().mapPartitions(myFunction);
> Dataset<Row> processedDS =
> sparkSession().createDataFrame(processedRDD, structSchema);
> parquetWriter.write(processedDS);
> dataset.unpersist();
> };
> DataStreamWriter<Row> dataStream = dataset
> .writeStream()
> .foreachBatch(forEachFunc)
> .outputMode(outputMode)
> .option("checkpointLocation", checkpointLocation);
> ....<stream dataStream> {code}
> And function
> {code:java}
> public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
> <constructor with jedisConfiguration parameter>...
> @Override
> public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
> List<Row> output;
> JedisCluster redis = new JedisCluster(jedisConfiguration);
> try {
> output = new ArrayList<>();
> while (rowIterator.hasNext()) {
> Row row = rowIterator.next();
> Long var1 = row.getAs("var1");
> Long var2 = row.getAs("var2");
> var redisKey = "some_key";
> var result = redis.hgetAll(redisKey);
> if (!result.isEmpty()) {
> output.add(RowFactory.create(
> var1,
> var2,
> result.getOrDefault("some_id", null)));
> }
> }
> } finally {
> if (redis != null) {
> try {
> redis.close();
> } catch (Exception e) {
> throw new RuntimeException("Failed to close Redis
> connection: " + e);
> }
> }
> }
> return output.iterator();
> }
> } {code}
> It actually works couple of days then dies. Can't figure out what does cause
> memory leak in the Driver?
> Grafana board of the Driver's Memory Pool
> !Screenshot 2024-04-15 at 20.43.22.png|width=875,height=169!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]