This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7fc4aa3f5ff [SPARK-45533][CORE] Use j.l.r.Cleaner instead of finalize
for RocksDBIterator/LevelDBIterator
7fc4aa3f5ff is described below
commit 7fc4aa3f5ff98f871725f7cab027067b900e6706
Author: zhaomin <[email protected]>
AuthorDate: Tue Nov 21 22:48:03 2023 +0800
[SPARK-45533][CORE] Use j.l.r.Cleaner instead of finalize for
RocksDBIterator/LevelDBIterator
### What changes were proposed in this pull request?
use java.lang.ref.Cleaner instead of finalize() for RocksDBIterator
### Why are the changes needed?
The finalize() method has been marked as deprecated since Java 9 and will
be removed in the future, java.lang.ref.Cleaner is the more recommended
solution.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass actions and new tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43502 from zhaomin1423/45533_2.
Lead-authored-by: zhaomin <[email protected]>
Co-authored-by: Min Zhao <[email protected]>
Co-authored-by: yanbei.zm <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
common/kvstore/pom.xml | 9 ++-
.../org/apache/spark/util/kvstore/LevelDB.java | 10 ++-
.../apache/spark/util/kvstore/LevelDBIterator.java | 77 ++++++++++++++++++----
.../org/apache/spark/util/kvstore/RocksDB.java | 9 ++-
.../apache/spark/util/kvstore/RocksDBIterator.java | 65 +++++++++++++++---
.../apache/spark/util/kvstore/LevelDBSuite.java | 39 +++++++++++
.../apache/spark/util/kvstore/RocksDBSuite.java | 39 +++++++++++
7 files changed, 216 insertions(+), 32 deletions(-)
diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
index 54b7f401cc4..a9b5a463471 100644
--- a/common/kvstore/pom.xml
+++ b/common/kvstore/pom.xml
@@ -60,6 +60,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
<dependency>
<groupId>commons-io</groupId>
@@ -82,11 +86,6 @@
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index b50906e2cba..13a9d89f470 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -34,6 +34,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
@@ -326,7 +327,7 @@ public class LevelDB implements KVStore {
* Closes the given iterator if the DB is still open. Trying to close a JNI
LevelDB handle
* with a closed DB can cause JVM crashes, so this ensures that situation
does not happen.
*/
- void closeIterator(LevelDBIterator<?> it) throws IOException {
+ void closeIterator(DBIterator it) throws IOException {
notifyIteratorClosed(it);
synchronized (this._db) {
DB _db = this._db.get();
@@ -340,8 +341,11 @@ public class LevelDB implements KVStore {
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to
notify
* iterator is closed.
*/
- void notifyIteratorClosed(LevelDBIterator<?> it) {
- iteratorTracker.removeIf(ref -> it.equals(ref.get()));
+ void notifyIteratorClosed(DBIterator dbIterator) {
+ iteratorTracker.removeIf(ref -> {
+ LevelDBIterator<?> it = ref.get();
+ return it != null && dbIterator.equals(it.internalIterator());
+ });
}
/** Returns metadata about indices for the given type. */
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
index 35d0c6065fb..b830e6afc61 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
@@ -18,18 +18,24 @@
package org.apache.spark.util.kvstore;
import java.io.IOException;
+import java.lang.ref.Cleaner;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.iq80.leveldb.DBIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class LevelDBIterator<T> implements KVStoreIterator<T> {
+ private static final Cleaner CLEANER = Cleaner.create();
+
private final LevelDB db;
private final boolean ascending;
private final DBIterator it;
@@ -40,6 +46,9 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
private final byte[] end;
private final long max;
+ private final ResourceCleaner resourceCleaner;
+ private final Cleaner.Cleanable cleanable;
+
private boolean checkedNext;
private byte[] next;
private boolean closed;
@@ -53,6 +62,8 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
this.ti = db.getTypeInfo(type);
this.index = ti.index(params.index);
this.max = params.max;
+ this.resourceCleaner = new ResourceCleaner(it, db);
+ this.cleanable = CLEANER.register(this, this.resourceCleaner);
Preconditions.checkArgument(!index.isChild() || params.parent != null,
"Cannot iterate over child index %s without parent value.",
params.index);
@@ -182,23 +193,33 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
@Override
public synchronized void close() throws IOException {
- db.notifyIteratorClosed(this);
+ db.notifyIteratorClosed(it);
if (!closed) {
- it.close();
- closed = true;
- next = null;
+ try {
+ it.close();
+ } finally {
+ closed = true;
+ next = null;
+ cancelResourceClean();
+ }
}
}
/**
- * Because it's tricky to expose closeable iterators through many internal
APIs, especially
- * when Scala wrappers are used, this makes sure that, hopefully, the JNI
resources held by
- * the iterator will eventually be released.
+ * Prevent ResourceCleaner from trying to release resources after close.
*/
- @SuppressWarnings("deprecation")
- @Override
- protected void finalize() throws Throwable {
- db.closeIterator(this);
+ private void cancelResourceClean() {
+ this.resourceCleaner.setStartedToFalse();
+ this.cleanable.clean();
+ }
+
+ DBIterator internalIterator() {
+ return it;
+ }
+
+ @VisibleForTesting
+ ResourceCleaner getResourceCleaner() {
+ return resourceCleaner;
}
private byte[] loadNext() {
@@ -280,4 +301,38 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
return a.length - b.length;
}
+ static class ResourceCleaner implements Runnable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ResourceCleaner.class);
+
+ private final DBIterator dbIterator;
+
+ private final LevelDB levelDB;
+
+ private final AtomicBoolean started = new AtomicBoolean(true);
+
+ ResourceCleaner(DBIterator dbIterator, LevelDB levelDB) {
+ this.dbIterator = dbIterator;
+ this.levelDB = levelDB;
+ }
+
+ @Override
+ public void run() {
+ if (started.compareAndSet(true, false)) {
+ try {
+ levelDB.closeIterator(dbIterator);
+ } catch (IOException e) {
+ LOG.warn("Failed to close iterator", e);
+ }
+ }
+ }
+
+ void setStartedToFalse() {
+ started.set(false);
+ }
+
+ @VisibleForTesting
+ boolean isCompleted() {
+ return !started.get();
+ }
+ }
}
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
index d328e5c79d3..dc7ad0be5c0 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
@@ -359,7 +359,7 @@ public class RocksDB implements KVStore {
* Closes the given iterator if the DB is still open. Trying to close a JNI
RocksDB handle
* with a closed DB can cause JVM crashes, so this ensures that situation
does not happen.
*/
- void closeIterator(RocksDBIterator<?> it) throws IOException {
+ void closeIterator(RocksIterator it) {
notifyIteratorClosed(it);
synchronized (this._db) {
org.rocksdb.RocksDB _db = this._db.get();
@@ -373,8 +373,11 @@ public class RocksDB implements KVStore {
* Remove iterator from iterator tracker. `RocksDBIterator` calls it to
notify
* iterator is closed.
*/
- void notifyIteratorClosed(RocksDBIterator<?> it) {
- iteratorTracker.removeIf(ref -> it.equals(ref.get()));
+ void notifyIteratorClosed(RocksIterator rocksIterator) {
+ iteratorTracker.removeIf(ref -> {
+ RocksDBIterator<?> rocksDBIterator = ref.get();
+ return rocksDBIterator != null &&
rocksIterator.equals(rocksDBIterator.internalIterator());
+ });
}
/** Returns metadata about indices for the given type. */
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
index ba7b8d8b813..a98b0482e35 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
@@ -18,7 +18,9 @@
package org.apache.spark.util.kvstore;
import java.io.IOException;
+import java.lang.ref.Cleaner;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -27,6 +29,7 @@ import org.rocksdb.RocksIterator;
class RocksDBIterator<T> implements KVStoreIterator<T> {
+ private static final Cleaner CLEANER = Cleaner.create();
private final RocksDB db;
private final boolean ascending;
private final RocksIterator it;
@@ -36,6 +39,8 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
private final byte[] indexKeyPrefix;
private final byte[] end;
private final long max;
+ private final Cleaner.Cleanable cleanable;
+ private final RocksDBIterator.ResourceCleaner resourceCleaner;
private boolean checkedNext;
private byte[] next;
@@ -50,6 +55,8 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
this.ti = db.getTypeInfo(type);
this.index = ti.index(params.index);
this.max = params.max;
+ this.resourceCleaner = new RocksDBIterator.ResourceCleaner(it, db);
+ this.cleanable = CLEANER.register(this, resourceCleaner);
Preconditions.checkArgument(!index.isChild() || params.parent != null,
"Cannot iterate over child index %s without parent value.",
params.index);
@@ -176,22 +183,33 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
@Override
public synchronized void close() throws IOException {
- db.notifyIteratorClosed(this);
+ db.notifyIteratorClosed(it);
if (!closed) {
- it.close();
- closed = true;
- next = null;
+ try {
+ it.close();
+ } finally {
+ closed = true;
+ next = null;
+ cancelResourceClean();
+ }
}
}
/**
- * Because it's tricky to expose closeable iterators through many internal
APIs, especially
- * when Scala wrappers are used, this makes sure that, hopefully, the JNI
resources held by
- * the iterator will eventually be released.
+ * Prevent ResourceCleaner from actually releasing resources after close it.
*/
- @Override
- protected void finalize() throws Throwable {
- db.closeIterator(this);
+ private void cancelResourceClean() {
+ this.resourceCleaner.setStartedToFalse();
+ this.cleanable.clean();
+ }
+
+ @VisibleForTesting
+ ResourceCleaner getResourceCleaner() {
+ return resourceCleaner;
+ }
+
+ RocksIterator internalIterator() {
+ return it;
}
private byte[] loadNext() {
@@ -272,4 +290,31 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
return a.length - b.length;
}
+ static class ResourceCleaner implements Runnable {
+
+ private final RocksIterator rocksIterator;
+ private final RocksDB rocksDB;
+ private final AtomicBoolean started = new AtomicBoolean(true);
+
+ ResourceCleaner(RocksIterator rocksIterator, RocksDB rocksDB) {
+ this.rocksIterator = rocksIterator;
+ this.rocksDB = rocksDB;
+ }
+
+ @Override
+ public void run() {
+ if (started.compareAndSet(true, false)) {
+ rocksDB.closeIterator(rocksIterator);
+ }
+ }
+
+ void setStartedToFalse() {
+ started.set(false);
+ }
+
+ @VisibleForTesting
+ boolean isCompleted() {
+ return !started.get();
+ }
+ }
}
diff --git
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index ec1c8101737..c22aea821af 100644
---
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.util.kvstore;
import java.io.File;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -33,6 +35,7 @@ import org.iq80.leveldb.DBIterator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
@@ -383,6 +386,42 @@ public class LevelDBSuite {
assertFalse(iter.skip(1));
}
+ @Test
+ public void testResourceCleaner() throws Exception {
+ File dbPathForCleanerTest = File.createTempFile(
+ "test_db_cleaner.", ".rdb");
+ dbPathForCleanerTest.delete();
+
+ LevelDB dbForCleanerTest = new LevelDB(dbPathForCleanerTest);
+ try {
+ for (int i = 0; i < 8192; i++) {
+ dbForCleanerTest.write(createCustomType1(i));
+ }
+ LevelDBIterator<CustomType1> levelDBIterator =
+ (LevelDBIterator<CustomType1>)
dbForCleanerTest.view(CustomType1.class).iterator();
+ Reference<LevelDBIterator<?>> reference = new
WeakReference<>(levelDBIterator);
+ assertNotNull(reference);
+ LevelDBIterator.ResourceCleaner resourceCleaner =
levelDBIterator.getResourceCleaner();
+ assertFalse(resourceCleaner.isCompleted());
+ // Manually set levelDBIterator to null, to be GC.
+ levelDBIterator = null;
+ // 100 times gc, the levelDBIterator should be GCed.
+ int count = 0;
+ while (count < 100 && !reference.refersTo(null)) {
+ System.gc();
+ count++;
+ Thread.sleep(100);
+ }
+ // check rocksDBIterator should be GCed
+ assertTrue(reference.refersTo(null));
+ // Verify that the Cleaner will be executed after a period of time,
isAllocated is true.
+ assertTrue(resourceCleaner.isCompleted());
+ } finally {
+ dbForCleanerTest.close();
+ FileUtils.deleteQuietly(dbPathForCleanerTest);
+ }
+ }
+
private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
diff --git
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
index b61a7afcd07..61f18a9a26d 100644
---
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
+++
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.util.kvstore;
import java.io.File;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -381,6 +383,43 @@ public class RocksDBSuite {
assertFalse(iter.skip(1));
}
+ @Test
+ public void testResourceCleaner() throws Exception {
+ File dbPathForCleanerTest = File.createTempFile(
+ "test_db_cleaner.", ".rdb");
+ dbPathForCleanerTest.delete();
+
+ RocksDB dbForCleanerTest = new RocksDB(dbPathForCleanerTest);
+ try {
+ for (int i = 0; i < 8192; i++) {
+ dbForCleanerTest.write(createCustomType1(i));
+ }
+ RocksDBIterator<CustomType1> rocksDBIterator =
+ (RocksDBIterator<CustomType1>)
dbForCleanerTest.view(CustomType1.class).iterator();
+ Reference<RocksDBIterator<?>> reference = new
WeakReference<>(rocksDBIterator);
+ assertNotNull(reference);
+ RocksDBIterator.ResourceCleaner resourceCleaner =
rocksDBIterator.getResourceCleaner();
+ assertFalse(resourceCleaner.isCompleted());
+ // Manually set rocksDBIterator to null, to be GC.
+ rocksDBIterator = null;
+ // 100 times gc, the rocksDBIterator should be GCed.
+ int count = 0;
+ while (count < 100 && !reference.refersTo(null)) {
+ System.gc();
+ count++;
+ Thread.sleep(100);
+ }
+ // check rocksDBIterator should be GCed
+ assertTrue(reference.refersTo(null));
+ // Verify that the Cleaner will be executed after a period of time,
+ // and status will become false.
+ assertTrue(resourceCleaner.isCompleted());
+ } finally {
+ dbForCleanerTest.close();
+ FileUtils.deleteQuietly(dbPathForCleanerTest);
+ }
+ }
+
private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]