This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 31740b3 Fix : Druid throws
java.util.concurrent.RejectedExecutionException when ingest task is stopping.
(#10555)
31740b3 is described below
commit 31740b3b29cac6d81d3cc19374dbf04836e41436
Author: zhangyue19921010 <[email protected]>
AuthorDate: Tue Nov 24 06:52:03 2020 +0800
Fix : Druid throws java.util.concurrent.RejectedExecutionException when
ingest task is stopping. (#10555)
* check exec status before return Signal
* add more log
* change log level to debug and add UT
* change log leverl to warn and merge master
Co-authored-by: yuezhang <[email protected]>
---
.../util/common/concurrent/ScheduledExecutors.java | 7 +++-
.../common/concurrent/ScheduledExecutorsTest.java | 46 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index 2850c50..97f43f6 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -53,7 +53,12 @@ public class ScheduledExecutors
public Signal call()
{
runnable.run(); // (Exceptions are handled for us)
- return Signal.REPEAT;
+ if (exec.isShutdown()) {
+ log.warn("ScheduledExecutorService is ShutDown. Return
'Signal.STOP' and stopped rescheduling %s (delay %s)", this, delay);
+ return Signal.STOP;
+ } else {
+ return Signal.REPEAT;
+ }
}
}
);
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
new file mode 100644
index 0000000..9bc67ea
--- /dev/null
+++
b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.concurrent;
+
+import org.joda.time.Duration;
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+public class ScheduledExecutorsTest
+{
+ @Test
+ public void testscheduleWithFixedDelay() throws InterruptedException
+ {
+ Duration initialDelay = new Duration(1000);
+ Duration delay = new Duration(1000);
+ ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("BasicAuthenticatorCacheManager-Exec--%d");
+ ScheduledExecutors.scheduleWithFixedDelay(
+ exec,
+ initialDelay,
+ delay,
+ () -> {
+ System.out.println("TEST!");
+ }
+ );
+ Thread.sleep(5 * 1000);
+ exec.shutdown();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]