[
https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230315#comment-17230315
]
Kezhu Wang commented on FLINK-19864:
------------------------------------
I think this is probably caused by misuse of {{Thread.getState}} as
synchronization tool in {{StreamTaskTestHarness.waitForInputProcessing}}.
{code:java}
public void waitForInputProcessing() throws Exception {
while (true) {
checkForErrorInTaskThread()
if (allInputConsumed()) {
break
}
}
// then wait for the Task Thread to be in a blocked state
// Check whether the state is blocked, this should be the case
if it cannot
// notifyNonEmpty more input, i.e. all currently available
input has been processed.
while (true) {
Thread.State state = taskThread.getState();
if (state == Thread.State.BLOCKED || state ==
Thread.State.TERMINATED ||
state == Thread.State.WAITING || state
== Thread.State.TIMED_WAITING) {
break;
}
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {}
}
}
{code}
Herre is what javadoc says about {{Thread.getState}}:
{quote}
Returns the state of this thread. This method is designed for use in monitoring
of the system state, not for synchronization control.
{quote}
Even though {{Thread.threadStatus}} is volatile in JDK, it is not in JVM side.
{code:c++}
// Write the thread status value to threadStatus field in java.lang.Thread java
class.
void java_lang_Thread::set_thread_status(oop java_thread,
java_lang_Thread::ThreadStatus status)
{
// The threadStatus is only present starting in 1.5
if (_thread_status_offset > 0) {
java_thread->int_field_put(_thread_status_offset, status);
}
}
{code}
I can't give an reliable example to prove JVM code without help of additional
synchronization tool, it is a is a chicken-and-egg problem in my know
knowledge. This is also not the case we encounter here, as we have explicit
synchronization tool in this test case: {{ConcurrentLinkedList.size}} and
{{ConcurrentLinkedList.poll}}.
Also I didn't find explicit blocking statement after
{{ConcurrentLinkedList.poll}} and before
{{inputWatermarkGauge.setCurrentWatermark}}. But *there are implicit blocking
entry points: concurrent class loading.* I writes following code to verify this:
{code:java}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Main {
private static final List<String> unloadedClassNames = Arrays.asList(
"java.sql.DriverManager",
"java.io.Console",
"java.io.FileInputStream",
"java.io.FilePermission"
);
public static void main(String[] args) throws Exception {
final CountDownLatch readyLatch = new CountDownLatch(1);
final CountDownLatch classLoadingLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(1);
Thread pollingThread = new Thread(() -> {
try {
readyLatch.countDown();
while (classLoadingLatch.getCount() != 0) {
Thread.yield();
}
unloadedClassNames.forEach(className -> {
try {
Class.forName(className);
Thread.yield();
} catch (Exception ex) {
ex.printStackTrace();
System.exit(2);
}
});
while (doneLatch.getCount() != 0) {
Thread.yield();
}
} catch (Exception ex) {
ex.printStackTrace();
System.exit(2);
}
});
pollingThread.start();
readyLatch.await();
classLoadingLatch.countDown();
unloadedClassNames.forEach(className -> {
try {
Class.forName(className);
} catch (Exception ex) {
ex.printStackTrace();
System.exit(2);
}
});
Thread.State pollingThreadState = pollingThread.getState();
if (pollingThreadState != Thread.State.RUNNABLE) {
System.err.format("polling thread state: %s\n", pollingThreadState);
System.exit(1);
}
doneLatch.countDown();
pollingThread.join();
}
}
{code}
Here, I choose four classes, which both have static initialization block. The
above code fails quite often, roughly rate 10%, in my local environment. This
is probably why JDK declares that statement, *class loading is everywhere in
java*.
> TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but
> was:<-9223372036854775808>"
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-19864
> URL: https://issues.apache.org/jira/browse/FLINK-19864
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Metrics, Runtime / Task
> Affects Versions: 1.12.0
> Reporter: Dian Fu
> Priority: Critical
> Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-10-28T22:40:44.2528420Z [ERROR]
> testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest)
> Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z
> java.lang.AssertionError: expected:<1> but was:<-9223372036854775808>
> 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88)
> 2020-10-28T22:40:44.2542157Z at
> org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z
> at org.junit.Assert.assertEquals(Assert.java:645)
> 2020-10-28T22:40:44.2543456Z at
> org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z
> at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)