[
https://issues.apache.org/jira/browse/SPARK-35722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yikf updated SPARK-35722:
-------------------------
Description:
If nothing has been added to the queue, should wait until something does get
queued instead of loop after timeout, It prevents an ineffective loop.
Currently, We will cotinue loop after timeout of the 100ms if the queue is
empty, It prevents an ineffective cycle.
eg:
1. referenceQueue.remove(timeout), if the queue is empty
2. remove will return directly after wait timeout
3. continue the loop.
reference
# Spark ContextCleaner
{code:java}
while (!stopped) {
try {
val reference =
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
...
}
...
}{code}
2. JDK ReferenceQueue
{code:java}
public Reference<? extends T> remove(long timeout)
throws IllegalArgumentException, InterruptedException
{
if (timeout < 0) {
throw new IllegalArgumentException("Negative timeout value");
}
synchronized (lock) {
Reference<? extends T> r = reallyPoll();
if (r != null) return r;
long start = (timeout == 0) ? 0 : System.nanoTime();
for (;;) {
lock.wait(timeout);
r = reallyPoll();
if (r != null) return r;
if (timeout != 0) {
long end = System.nanoTime();
timeout -= (end - start) / 1000_000;
if (timeout <= 0) return null;
start = end;
}
}
}
}
{code}
If we have not timeout, Thread will wait until Reference.enqueue be called.
i.e. something that need to be cleanUp be add to the queue.
was:
If nothing has been added to the queue, should wait until something does get
queued instead of loop after timeout, It prevents an ineffective loop.
Currently, We will cotinue loop after timeout of the 100ms if the queue is
empty, It prevents an ineffective cycle.
eg:
1. referenceQueue.remove(timeout), if the queue is empty
2. remove will return directly after wait timeout
3. continue the loop.
reference
# Spark ContextCleaner
{code:java}
while (!stopped) {
try {
val reference =
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
...
}
...
}{code}
2.
{code:java}
public Reference<? extends T> remove(long timeout)
throws IllegalArgumentException, InterruptedException
{
if (timeout < 0) {
throw new IllegalArgumentException("Negative timeout value");
}
synchronized (lock) {
Reference<? extends T> r = reallyPoll();
if (r != null) return r;
long start = (timeout == 0) ? 0 : System.nanoTime();
for (;;) {
lock.wait(timeout);
r = reallyPoll();
if (r != null) return r;
if (timeout != 0) {
long end = System.nanoTime();
timeout -= (end - start) / 1000_000;
if (timeout <= 0) return null;
start = end;
}
}
}
}
{code}
If we have not timeout, Thread will wait until Reference.enqueue be called.
i.e. something that need to be cleanUp be add to the queue.
> wait until something does get queued
> ------------------------------------
>
> Key: SPARK-35722
> URL: https://issues.apache.org/jira/browse/SPARK-35722
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 3.0.0
> Reporter: yikf
> Priority: Minor
> Fix For: 3.2.0
>
>
> If nothing has been added to the queue, should wait until something does get
> queued instead of loop after timeout, It prevents an ineffective loop.
>
> Currently, We will cotinue loop after timeout of the 100ms if the queue is
> empty, It prevents an ineffective cycle.
> eg:
> 1. referenceQueue.remove(timeout), if the queue is empty
> 2. remove will return directly after wait timeout
> 3. continue the loop.
> reference
> # Spark ContextCleaner
> {code:java}
> while (!stopped) {
> try {
> val reference =
> Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
> .map(_.asInstanceOf[CleanupTaskWeakReference])
> ...
> }
> ...
> }{code}
> 2. JDK ReferenceQueue
> {code:java}
> public Reference<? extends T> remove(long timeout)
> throws IllegalArgumentException, InterruptedException
> {
> if (timeout < 0) {
> throw new IllegalArgumentException("Negative timeout value");
> }
> synchronized (lock) {
> Reference<? extends T> r = reallyPoll();
> if (r != null) return r;
> long start = (timeout == 0) ? 0 : System.nanoTime();
> for (;;) {
> lock.wait(timeout);
> r = reallyPoll();
> if (r != null) return r;
> if (timeout != 0) {
> long end = System.nanoTime();
> timeout -= (end - start) / 1000_000;
> if (timeout <= 0) return null;
> start = end;
> }
> }
> }
> }
> {code}
> If we have not timeout, Thread will wait until Reference.enqueue be called.
> i.e. something that need to be cleanUp be add to the queue.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]