[ https://issues.apache.org/jira/browse/FLINK-2543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709341#comment-14709341 ]
ASF GitHub Bot commented on FLINK-2543: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1048#discussion_r37754281 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.util; + +import com.google.common.base.Preconditions; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.InstantiationUtil; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Utility class for dealing with serialized Throwables. + * Needed to send around user-specific exception classes with Akka. + */ +public class SerializedThrowable implements Serializable { + private final byte[] serializedError; + + // The exception must not be (de)serialized with the class, as its + // class may not be part of the system class loader. + private transient Throwable cachedError; + + + /** + * Create a new SerializedThrowable. + * @param error The exception to serialize. + */ + public SerializedThrowable(Throwable error) { + Preconditions.checkNotNull(error, "The exception to serialize has to be set"); + this.cachedError = error; + byte[] serializedError; + try { + serializedError = InstantiationUtil.serializeObject(error); + } + catch (Throwable t) { + // could not serialize exception. send the stringified version instead + try { + this.cachedError = new Exception(ExceptionUtils.stringifyException(error)); + serializedError = InstantiationUtil.serializeObject(this.cachedError); + } + catch (Throwable tt) { + // seems like we cannot do much to report the actual exception + // report a placeholder instead + try { + this.cachedError = new Exception("Cause is a '" + error.getClass().getName() + + "' (failed to serialize or stringify)"); + serializedError = InstantiationUtil.serializeObject(this.cachedError); + } + catch (Throwable ttt) { + // this should never happen unless the JVM is fubar. + // we just report the state without the error + this.cachedError = null; + serializedError = null; + } + } + } + this.serializedError = serializedError; + } + + public Throwable getError(ClassLoader usercodeClassloader) { --- End diff -- What kind of camel case is `usercodeClassloader`? > State handling does not support deserializing classes through the > UserCodeClassloader > ------------------------------------------------------------------------------------- > > Key: FLINK-2543 > URL: https://issues.apache.org/jira/browse/FLINK-2543 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 0.9, 0.10 > Reporter: Robert Metzger > Assignee: Robert Metzger > Priority: Blocker > > The current implementation of the state checkpointing does not support custom > classes, because the UserCodeClassLoader is not used to deserialize the state. > {code} > Error: java.lang.RuntimeException: Failed to deserialize state handle and > setup initial operator state. > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: > com.ottogroup.bi.searchlab.searchsessionizer.OperatorState > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63) > at > org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276) > at > org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541) > {code} > The issue has been reported by a user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Class-for-state-checkpointing-td2415.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)