[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15320670#comment-15320670
 ] 

ASF GitHub Bot commented on FLINK-3713:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/2083

    [FLINK-3713] [clients, runtime] Use user code class loader when disposing 
savepoints

    Disposing savepoints via the JobManager fails for state handles or 
descriptors, which contain user classes (for example custom folding state or 
RocksDB handles).
    
    With this change, the user has to provide the job ID of a running job when 
disposing a savepoint in order to use the user code class loader of that job or 
provide the job JARs.
    
    This version breaks the API as the CLI now requires either a JobID or a 
JAR. I think this is reasonable, because the current approach only works for a 
subset of the available state variants.
    
    We can port this back for 1.0.4 and make the JobID or JAR arguments 
optional. What do you think?
    
    I've tested this with a job running on RocksDB state both while the job was 
running and after it terminated. This was not working with the current 1.0.3 
version.
    
    Ideally, we will get rid of the whole disposal business when we make 
savepoints properly self-contained. I'm going to open a JIRA issue with a 
proposal to do so soon.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3713-dispose_savepoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2083
    
----
commit 1cdfe9b4df3584b3c3c48168cd3f17100dbebf4c
Author: Ufuk Celebi <[email protected]>
Date:   2016-06-08T08:59:24Z

    [FLINK-3713] [clients, runtime] Use user code class loader when disposing 
savepoint
    
    Disposing savepoints via the JobManager fails for state handles or 
descriptors,
    which contain user classes (for example custom folding state or RocksDB 
handles).
    
    With this change, the user has to provide the job ID of a running job when 
disposing
    a savepoint in order to use the user code class loader of that job or 
provide the
    job JARs.
    
    This version breaks the API as the CLI now requires either a JobID or a 
JAR. I think
    this is reasonable, because the current approach only works for a subset of 
the
    available state variants.

----


> DisposeSavepoint message uses system classloader to discard state
> -----------------------------------------------------------------
>
>                 Key: FLINK-3713
>                 URL: https://issues.apache.org/jira/browse/FLINK-3713
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>            Reporter: Robert Metzger
>            Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager            
>               - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask              - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> <some_package>.MetricsProcessor$CombinedKeysFoldFunction
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:270)
>       at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at java.util.HashMap.readObject(HashMap.java:1184)
>       at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>       at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>       at 
> org.apache.flink.runtime.checkpoint.StateForTask.discard(StateForTask.java:109)
>       at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discard(CompletedCheckpoint.java:85)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:635)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:627)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:627)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> The issue was reported by [~knaufk] and analyzed by [~aljoscha].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to