This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 3af5c0d [ISSUE #67] Bugfix for consume blocking
3af5c0d is described below
commit 3af5c0df8d577494100edfa96df198ff93fb9868
Author: 高思伟 <[email protected]>
AuthorDate: Fri Oct 28 10:47:41 2022 +0800
[ISSUE #67] Bugfix for consume blocking
Bugfix for consume blocking
Co-authored-by: 高思伟 <[email protected]>
---
.../flink/legacy/RocketMQSourceFunction.java | 3 +-
.../flink/legacy/common/util/RetryUtil.java | 10 ++++
.../flink/legacy/common/util/RetryUtilTest.java | 66 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 1 deletion(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 29272d8..97037a1 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -353,7 +353,8 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
}
return true;
},
- "RuntimeException"));
+ "RuntimeException",
+ runningChecker));
}
awaitTermination();
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
index e53caf1..aae2148 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.flink.legacy.common.util;
+import org.apache.rocketmq.flink.legacy.RunningChecker;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,11 @@ public class RetryUtil {
}
public static <T> T call(Callable<T> callable, String errorMsg) throws
RuntimeException {
+ return call(callable, errorMsg, null);
+ }
+
+ public static <T> T call(Callable<T> callable, String errorMsg,
RunningChecker runningChecker)
+ throws RuntimeException {
long backoff = INITIAL_BACKOFF;
int retries = 0;
do {
@@ -46,6 +53,9 @@ public class RetryUtil {
return callable.call();
} catch (Exception ex) {
if (retries >= MAX_ATTEMPTS) {
+ if (null != runningChecker) {
+ runningChecker.setRunning(false);
+ }
throw new RuntimeException(ex);
}
log.error("{}, retry {}/{}", errorMsg, retries, MAX_ATTEMPTS,
ex);
diff --git
a/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java
b/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java
new file mode 100644
index 0000000..0b7178b
--- /dev/null
+++
b/src/test/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtilTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.legacy.common.util;
+
+import org.apache.rocketmq.flink.legacy.RunningChecker;
+
+import junit.framework.TestCase;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Tests for {@link RetryUtil}. */
+@Slf4j
+public class RetryUtilTest extends TestCase {
+
+ public void testCall() {
+ try {
+ User user = new User();
+ RunningChecker runningChecker = new RunningChecker();
+ runningChecker.setRunning(true);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.execute(
+ () ->
+ RetryUtil.call(
+ () -> {
+ user.setName("test");
+ user.setAge(Integer.parseInt("12e"));
+ return true;
+ },
+ "Something is error",
+ runningChecker));
+ Thread.sleep(10000);
+ executorService.shutdown();
+ log.info("Thread has finished");
+ assertEquals(0, user.age);
+ assertEquals("test", user.name);
+ assertEquals(false, runningChecker.isRunning());
+ } catch (Exception e) {
+ log.warn("Exception has been caught");
+ }
+ }
+
+ @Data
+ public class User {
+ String name;
+ int age;
+ }
+}