Repository: systemml
Updated Branches:
  refs/heads/master 20f97e0b5 -> 5b9c12df6


[SYSTEMML-2027] Fix codegen race conditions in spark executors

For large-scale perftest runs, we've seen intermittent task failures due
to various non-reproducible janino class compilation issues. The root
cause seems to be concurrent compilation and loading of the same class
by multiple executor threads. We now simply synchronize this code path,
which ensures that only the first thread compiles the class and all
other threads (and across tasks) reuse the compiled class as before from
the static class cache (with unique names for distinct classes). The
parfor codepath already used synchronized access, while all
data-parallel operations (row, cell, magg, outer) did not.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/73394999
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/73394999
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/73394999

Branch: refs/heads/master
Commit: 73394999640969294c94a9b0c7db7cc2d3a1e81f
Parents: 20f97e0
Author: Matthias Boehm <[email protected]>
Authored: Tue Nov 21 19:19:41 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Tue Nov 21 19:31:26 2017 -0800

----------------------------------------------------------------------
 .../apache/sysml/runtime/codegen/CodegenUtils.java    | 14 +++++++++++++-
 .../parfor/RemoteDPParForSparkWorker.java             | 10 ++++------
 .../parfor/RemoteParForSparkWorker.java               |  8 +++-----
 .../instructions/spark/SpoofSPInstruction.java        |  8 ++++----
 4 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java 
b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
index 8b3ea5f..726e267 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
@@ -68,7 +68,7 @@ public class CodegenUtils
        private static String _workingDir = null;
        
        public static Class<?> compileClass(String name, String src) 
-                       throws DMLRuntimeException
+               throws DMLRuntimeException
        {
                //reuse existing compiled class
                Class<?> ret = _cache.get(name);
@@ -98,6 +98,18 @@ public class CodegenUtils
                return getClass(name, null);
        }
        
+       public synchronized static Class<?> getClassSync(String name, byte[] 
classBytes)
+               throws DMLRuntimeException 
+       {
+               //In order to avoid anomalies of concurrently compiling and 
loading the same
+               //class with the same name multiple times in spark executors, 
this indirection
+               //synchronizes the class compilation. This synchronization 
leads to the first
+               //thread compiling the common class and all other threads 
simply reusing the
+               //cached class instance, which also ensures that the same class 
is not loaded
+               //multiple times which causes unnecessary JIT compilation 
overhead.
+               return getClass(name, classBytes);
+       }
+       
        public static Class<?> getClass(String name, byte[] classBytes) 
                throws DMLRuntimeException 
        {

http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 9f2658c..06be2b1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -147,15 +147,13 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                _workerID = ID;
                
                //initialize codegen class cache (before program parsing)
-               synchronized( CodegenUtils.class ) {
-                       for( Entry<String, byte[]> e : _clsMap.entrySet() )
-                               CodegenUtils.getClass(e.getKey(), e.getValue());
-               }
-               
+               for( Entry<String, byte[]> e : _clsMap.entrySet() )
+                       CodegenUtils.getClassSync(e.getKey(), e.getValue());
+       
                //parse and setup parfor body program
                ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
                _childBlocks = body.getChildBlocks();
-               _ec          = body.getEc();                            
+               _ec          = body.getEc();
                _resultVars  = body.getResultVarNames();
                _numTasks    = 0;
                _numIters    = 0;

http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 614f946..9753635 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -100,11 +100,9 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                _workerID = taskID;
                
                //initialize codegen class cache (before program parsing)
-               synchronized( CodegenUtils.class ) {
-                       for( Entry<String, byte[]> e : _clsMap.entrySet() )
-                               CodegenUtils.getClass(e.getKey(), e.getValue());
-               }
-               
+               for( Entry<String, byte[]> e : _clsMap.entrySet() )
+                       CodegenUtils.getClassSync(e.getKey(), e.getValue());
+       
                //parse and setup parfor body program
                ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
                _childBlocks = body.getChildBlocks();

http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index cb3ad14..f5b1576 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -456,7 +456,7 @@ public class SpoofSPInstruction extends SPInstruction {
                {
                        //lazy load of shipped class
                        if( _op == null ) {
-                               Class<?> loadedClass = 
CodegenUtils.getClass(_className, _classBytes);
+                               Class<?> loadedClass = 
CodegenUtils.getClassSync(_className, _classBytes);
                                _op = (SpoofRowwise) 
CodegenUtils.createInstance(loadedClass); 
                        }
                        
@@ -513,7 +513,7 @@ public class SpoofSPInstruction extends SPInstruction {
                {
                        //lazy load of shipped class
                        if( _op == null ) {
-                               Class<?> loadedClass = 
CodegenUtils.getClass(_className, _classBytes);
+                               Class<?> loadedClass = 
CodegenUtils.getClassSync(_className, _classBytes);
                                _op = (SpoofOperator) 
CodegenUtils.createInstance(loadedClass); 
                        }
                        
@@ -565,7 +565,7 @@ public class SpoofSPInstruction extends SPInstruction {
                {
                        //lazy load of shipped class
                        if( _op == null ) {
-                               Class<?> loadedClass = 
CodegenUtils.getClass(_className, _classBytes);
+                               Class<?> loadedClass = 
CodegenUtils.getClassSync(_className, _classBytes);
                                _op = (SpoofOperator) 
CodegenUtils.createInstance(loadedClass); 
                        }
                                
@@ -627,7 +627,7 @@ public class SpoofSPInstruction extends SPInstruction {
                {
                        //lazy load of shipped class
                        if( _op == null ) {
-                               Class<?> loadedClass = 
CodegenUtils.getClass(_className, _classBytes);
+                               Class<?> loadedClass = 
CodegenUtils.getClassSync(_className, _classBytes);
                                _op = (SpoofOperator) 
CodegenUtils.createInstance(loadedClass); 
                        }
                        

Reply via email to