asfgit closed pull request #6739: [FLINK-10289] [JobManager] Classify 
Exceptions to different category for apply different failover strategy
URL: https://github.com/apache/flink/pull/6739
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
index 61a9064ccbb..45ef760d25e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.execution;
 
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
 /**
  * Exception thrown in order to suppress job restarts.
  *
@@ -25,6 +28,7 @@
  * job restarts. The JobManager will <strong>not</strong> restart a job, which
  * fails with this Exception.
  */
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
 public class SuppressRestartsException extends RuntimeException {
 
        private static final long serialVersionUID = 221873676920848349L;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index e1c1657af44..bea7b8a06bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -19,7 +19,10 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
 
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
 public class NoResourceAvailableException extends JobException {
 
        private static final long serialVersionUID = -2249953165298717803L;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
new file mode 100644
index 00000000000..86733501fbc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * for annotate the type of an {@link Throwable}.
+ */
+@Inherited
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ThrowableAnnotation {
+
+       /** get ThrowableType.*/
+       ThrowableType value() default ThrowableType.RecoverableError;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
new file mode 100644
index 00000000000..087d6d4e034
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import java.lang.annotation.Annotation;
+
+/**
+ * Helper class, given a exception do the classification.
+ */
+public class ThrowableClassifier {
+
+       /**
+        * classify the exceptions by extract the {@link ThrowableAnnotation} 
of it, that will be handled different failover logic.
+        * @param cause
+        * @return ThrowableType.RecoverableError if there is no such annotation
+        */
+       public static ThrowableType getThrowableType(Throwable cause) {
+               final Annotation annotation = 
cause.getClass().getAnnotation(ThrowableAnnotation.class);
+               return annotation == null ? ThrowableType.RecoverableError : 
((ThrowableAnnotation) annotation).value();
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
new file mode 100644
index 00000000000..5fb9a8efdad
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+       /**
+        * this indicates error that would not succeed even with retry, such as 
DivideZeroExeception.
+        * No failover should be attempted with such error. Instead, the job 
should fail immediately.
+        */
+       NonRecoverableError,
+
+       /**
+        * data consumption error, which indicates that we should revoke the 
producer.
+        * */
+       PartitionDataMissingError,
+
+       /**
+        * this indicates error related to running environment, such as 
hardware error, service issue, in which case we should consider blacklist the 
machine.
+        * */
+       EnvironmentError,
+
+       /**
+        * this indicates other errors that is recoverable.
+        * */
+       RecoverableError
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
new file mode 100644
index 00000000000..57b330e3997
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableClassifier;
+import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test throwable classifier
+ * */
+public class ThrowableClassifierTest extends TestLogger {
+
+       @Test
+       public void testThrowableType_NonRecoverable() {
+               assertEquals(ThrowableType.NonRecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
SuppressRestartsException(new Exception(""))));
+
+               assertEquals(ThrowableType.NonRecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
NoResourceAvailableException()));
+       }
+
+       @Test
+       public void testThrowableType_Recoverable() {
+               assertEquals(ThrowableType.RecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
Exception("")));
+               assertEquals(ThrowableType.RecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
ThrowableType_RecoverableFailure_Exception()));
+       }
+
+       @Test
+       public void testThrowableType_EnvironmentError() {
+               assertEquals(ThrowableType.EnvironmentError,
+                       ThrowableClassifier.getThrowableType(new 
ThrowableType_EnvironmentError_Exception()));
+       }
+
+       @Test
+       public void testThrowableType_PartitionDataMissingError() {
+               assertEquals(ThrowableType.PartitionDataMissingError,
+                       ThrowableClassifier.getThrowableType(new 
ThrowableType_PartitionDataMissingError_Exception()));
+       }
+
+       @Test
+       public void testThrowableType_InheritError() {
+               assertEquals(ThrowableType.PartitionDataMissingError,
+                       ThrowableClassifier.getThrowableType(new 
Sub_ThrowableType_PartitionDataMissingError_Exception()));
+       }
+
+       @ThrowableAnnotation(ThrowableType.PartitionDataMissingError)
+       private class ThrowableType_PartitionDataMissingError_Exception extends 
Exception {
+       }
+
+       @ThrowableAnnotation(ThrowableType.EnvironmentError)
+       private class ThrowableType_EnvironmentError_Exception extends 
Exception {
+       }
+
+       @ThrowableAnnotation(ThrowableType.RecoverableError)
+       private class ThrowableType_RecoverableFailure_Exception extends 
Exception {
+       }
+
+       private class Sub_ThrowableType_PartitionDataMissingError_Exception 
extends ThrowableType_PartitionDataMissingError_Exception {
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to