[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642939#comment-16642939
 ] 

ASF GitHub Bot commented on FLINK-10289:
----------------------------------------

asfgit closed pull request #6739: [FLINK-10289] [JobManager] Classify 
Exceptions to different category for apply different failover strategy
URL: https://github.com/apache/flink/pull/6739
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
index 61a9064ccbb..45ef760d25e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.execution;
 
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
 /**
  * Exception thrown in order to suppress job restarts.
  *
@@ -25,6 +28,7 @@
  * job restarts. The JobManager will <strong>not</strong> restart a job, which
  * fails with this Exception.
  */
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
 public class SuppressRestartsException extends RuntimeException {
 
        private static final long serialVersionUID = 221873676920848349L;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index e1c1657af44..bea7b8a06bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -19,7 +19,10 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
 
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
 public class NoResourceAvailableException extends JobException {
 
        private static final long serialVersionUID = -2249953165298717803L;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
new file mode 100644
index 00000000000..86733501fbc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableAnnotation.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * for annotate the type of an {@link Throwable}.
+ */
+@Inherited
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ThrowableAnnotation {
+
+       /** get ThrowableType.*/
+       ThrowableType value() default ThrowableType.RecoverableError;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
new file mode 100644
index 00000000000..087d6d4e034
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+import java.lang.annotation.Annotation;
+
+/**
+ * Helper class, given a exception do the classification.
+ */
+public class ThrowableClassifier {
+
+       /**
+        * classify the exceptions by extract the {@link ThrowableAnnotation} 
of it, that will be handled different failover logic.
+        * @param cause
+        * @return ThrowableType.RecoverableError if there is no such annotation
+        */
+       public static ThrowableType getThrowableType(Throwable cause) {
+               final Annotation annotation = 
cause.getClass().getAnnotation(ThrowableAnnotation.class);
+               return annotation == null ? ThrowableType.RecoverableError : 
((ThrowableAnnotation) annotation).value();
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
new file mode 100644
index 00000000000..5fb9a8efdad
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.throwable;
+
+/**
+ * */
+public enum ThrowableType {
+
+       /**
+        * this indicates error that would not succeed even with retry, such as 
DivideZeroExeception.
+        * No failover should be attempted with such error. Instead, the job 
should fail immediately.
+        */
+       NonRecoverableError,
+
+       /**
+        * data consumption error, which indicates that we should revoke the 
producer.
+        * */
+       PartitionDataMissingError,
+
+       /**
+        * this indicates error related to running environment, such as 
hardware error, service issue, in which case we should consider blacklist the 
machine.
+        * */
+       EnvironmentError,
+
+       /**
+        * this indicates other errors that is recoverable.
+        * */
+       RecoverableError
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
new file mode 100644
index 00000000000..57b330e3997
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableClassifier;
+import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test throwable classifier
+ * */
+public class ThrowableClassifierTest extends TestLogger {
+
+       @Test
+       public void testThrowableType_NonRecoverable() {
+               assertEquals(ThrowableType.NonRecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
SuppressRestartsException(new Exception(""))));
+
+               assertEquals(ThrowableType.NonRecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
NoResourceAvailableException()));
+       }
+
+       @Test
+       public void testThrowableType_Recoverable() {
+               assertEquals(ThrowableType.RecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
Exception("")));
+               assertEquals(ThrowableType.RecoverableError,
+                       ThrowableClassifier.getThrowableType(new 
ThrowableType_RecoverableFailure_Exception()));
+       }
+
+       @Test
+       public void testThrowableType_EnvironmentError() {
+               assertEquals(ThrowableType.EnvironmentError,
+                       ThrowableClassifier.getThrowableType(new 
ThrowableType_EnvironmentError_Exception()));
+       }
+
+       @Test
+       public void testThrowableType_PartitionDataMissingError() {
+               assertEquals(ThrowableType.PartitionDataMissingError,
+                       ThrowableClassifier.getThrowableType(new 
ThrowableType_PartitionDataMissingError_Exception()));
+       }
+
+       @Test
+       public void testThrowableType_InheritError() {
+               assertEquals(ThrowableType.PartitionDataMissingError,
+                       ThrowableClassifier.getThrowableType(new 
Sub_ThrowableType_PartitionDataMissingError_Exception()));
+       }
+
+       @ThrowableAnnotation(ThrowableType.PartitionDataMissingError)
+       private class ThrowableType_PartitionDataMissingError_Exception extends 
Exception {
+       }
+
+       @ThrowableAnnotation(ThrowableType.EnvironmentError)
+       private class ThrowableType_EnvironmentError_Exception extends 
Exception {
+       }
+
+       @ThrowableAnnotation(ThrowableType.RecoverableError)
+       private class ThrowableType_RecoverableFailure_Exception extends 
Exception {
+       }
+
+       private class Sub_ThrowableType_PartitionDataMissingError_Exception 
extends ThrowableType_PartitionDataMissingError_Exception {
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Classify Exceptions to different category for apply different failover 
> strategy
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-10289
>                 URL: https://issues.apache.org/jira/browse/FLINK-10289
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>            Reporter: JIN SUN
>            Assignee: JIN SUN
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to