[
https://issues.apache.org/jira/browse/SPARK-47859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leo Timofeyev updated SPARK-47859:
----------------------------------
Attachment: Screenshot 2024-04-15 at 20.43.22.png
> Why does this lead to the memory leak?
> --------------------------------------
>
> Key: SPARK-47859
> URL: https://issues.apache.org/jira/browse/SPARK-47859
> Project: Spark
> Issue Type: IT Help
> Components: Spark Core
> Affects Versions: 3.3.2, 3.5.0
> Reporter: Leo Timofeyev
> Priority: Major
> Attachments: Screenshot 2024-04-15 at 20.43.22.png
>
>
> Hello Spark community. I have an Java Spark Structured Streaming application:
> JedisCluster closed in finally block, but still some memory leak.
> {code:java}
> FlatMapFunction<Iterator<Row>, Row> myFunction = new
> MyFunction(jedisConfiguration);
> StructType structSchema = getSchema();
> VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
> Dataset<Row> dataset = getDataset();
> dataset.persist();
> JavaRDD<Row> processedRDD =
> dataset.javaRDD().mapPartitions(myFunction);
> Dataset<Row> processedDS =
> sparkSession().createDataFrame(processedRDD, structSchema);
> parquetWriter.write(processedDS);
> dataset.unpersist();
> };
> DataStreamWriter<Row> dataStream = dataset
> .writeStream()
> .foreachBatch(forEachFunc)
> .outputMode(outputMode)
> .option("checkpointLocation", checkpointLocation);
> ....<stream dataStream> {code}
> And function
> {code:java}
> public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
> <constructor with jedisConfiguration parameter>...
> @Override
> public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
> List<Row> output;
> JedisCluster redis = new JedisCluster(jedisConfiguration);
> try {
> output = new ArrayList<>();
> while (rowIterator.hasNext()) {
> Row row = rowIterator.next();
> Long var1 = row.getAs("var1");
> Long var2 = row.getAs("var2");
> var redisKey = "some_key";
> var result = redis.hgetAll(redisKey);
> if (!result.isEmpty()) {
> output.add(RowFactory.create(
> var1,
> var2,
> result.getOrDefault("some_id", null)));
> }
> }
> } finally {
> if (redis != null) {
> try {
> redis.close();
> } catch (Exception e) {
> throw new RuntimeException("Failed to close Redis
> connection: " + e);
> }
> }
> }
> return output.iterator();
> }
> } {code}
> It actually works couple of days then dies. Can't figure out what does cause
> memory leak in the Driver?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]