Repository: incubator-systemml
Updated Branches:
  refs/heads/master 132a43d38 -> c360304eb


[SYSTEMML-1281] Fix memory efficiency csv/dataset-binary rdd converters

This patch makes the following two improvements wrt memory efficiency to
our csv-binaryblock and dataset-binaryblock matrix rdd converters:

(1) Shallow block copy on createCombiner for combineByKey to reduce the
temporary memory consumption per task, especially for scenarios with
almost no local aggregation. The shallow copy is safe, as the inputs are
temporary partial blocks that are not accessible to any other operation. 

(2) Explicitly controlled number of output partitions according to its
size in binary block representation. So far we simply used the number of
input partitions. For compressed dataset inputs this lead to
unnecessarily large output partitions, and thus creating memory pressure
for subsequent tasks. For csv inputs, this lead to unnecessarily small
output partitions that are later coalesce to the preferred number of
partitions - however since coalesce only balances the number of merged
partitions, this could lead to load imbalance. Both problems are now
systematically solved at its root cause.  

Finally, this patch also includes a number of minor cleanups with regard
to probing the number of partitions, missing tests, etc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c360304e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c360304e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c360304e

Branch: refs/heads/master
Commit: c360304ebbf2d05d25067bbded49a3fbdb1edaac
Parents: 132a43d
Author: Matthias Boehm <[email protected]>
Authored: Fri Feb 17 23:41:55 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Fri Feb 17 23:56:12 2017 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |  5 ++-
 .../instructions/gpu/context/JCudaContext.java  |  2 +-
 .../spark/CheckpointSPInstruction.java          | 18 ++---------
 .../instructions/spark/MapmmSPInstruction.java  |  4 +--
 .../spark/utils/RDDAggregateUtils.java          | 33 +++++++++++++++++---
 .../spark/utils/RDDConverterUtils.java          | 20 ++++++------
 .../instructions/spark/utils/SparkUtils.java    | 22 +++++++------
 .../functions/io/csv/ReadCSVTest.java           | 29 ++++++++++++++---
 8 files changed, 83 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 77bcc8d..b7fe6e8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -49,7 +49,6 @@ import 
org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.instructions.spark.CheckpointSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.SPInstruction;
 import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject;
 import org.apache.sysml.runtime.instructions.spark.data.LineageObject;
@@ -1181,8 +1180,8 @@ public class SparkExecutionContext extends 
ExecutionContext
                                        
((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
                        
                        //investigate issue of unnecessarily large number of 
partitions
-                       int numPartitions = 
CheckpointSPInstruction.getNumCoalescePartitions(mcIn, in);
-                       if( numPartitions < in.partitions().size() )
+                       int numPartitions = 
SparkUtils.getNumPreferredPartitions(mcIn, in);
+                       if( numPartitions < in.getNumPartitions() )
                                in = in.coalesce( numPartitions );
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
index af4cfbd..38f4e4c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java
@@ -133,7 +133,7 @@ public class JCudaContext extends GPUContext {
                        long free[] = {0};
                        long total[] = {0};
                        if (cudaMemGetInfo(free, total) == cudaSuccess) {
-                               long totalNumBytes = total[0];
+                               //long totalNumBytes = total[0];
                                deviceMemBytes.set(free[0]);
                        } else {
                                throw new RuntimeException("ERROR: Unable to 
get memory information of the GPU.");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 952e43c..1fa30b6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -27,7 +27,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.BooleanObject;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
@@ -103,8 +102,8 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction
                {
                        //(trigger coalesce if intended number of partitions 
exceeded by 20%
                        //and not hash partitioned to avoid losing the existing 
partitioner)
-                       int numPartitions = getNumCoalescePartitions(mcIn, in);
-                       boolean coalesce = ( 1.2*numPartitions < 
in.partitions().size() 
+                       int numPartitions = 
SparkUtils.getNumPreferredPartitions(mcIn, in);
+                       boolean coalesce = ( 1.2*numPartitions < 
in.getNumPartitions() 
                                        && !SparkUtils.isHashPartitioned(in) );
                        
                        //checkpoint pre-processing rdd operations
@@ -157,17 +156,4 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction
                }
                sec.setVariable( output.getName(), cd);
        }
-
-       public static int getNumCoalescePartitions(MatrixCharacteristics mc, 
JavaPairRDD<?,?> in)
-       {
-               if( mc.dimsKnown(true) ) {
-                       double hdfsBlockSize = 
InfrastructureAnalyzer.getHDFSBlockSize();
-                       double matrixPSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
-                       return (int) 
Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
-               }
-               else {
-                       return in.partitions().size();
-               }
-       }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 310664f..33eacbe 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -135,8 +135,8 @@ public class MapmmSPInstruction extends BinarySPInstruction
                {
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
                        if( requiresFlatMapFunction(_type, mcBc) ) {
-                               if( requiresRepartitioning(_type, mcRdd, mcBc, 
in1.partitions().size()) )
-                                       in1 = 
in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, 
in1.partitions().size()));
+                               if( requiresRepartitioning(_type, mcRdd, mcBc, 
in1.getNumPartitions()) )
+                                       in1 = 
in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, 
in1.getNumPartitions()));
                                out = in1.flatMapToPair( new 
RDDFlatMapMMFunction(_type, in2) );
                        }
                        else if( preservesPartitioning(mcRdd, _type) )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 8038157..61c950a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -155,13 +155,30 @@ public class RDDAggregateUtils
         * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
         * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
         */
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in )
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in ) {
+               return mergeByKey(in, in.getNumPartitions(), true);
+       }
+       
+       /**
+        * Merges disjoint data of all blocks per key.
+        * 
+        * Note: The behavior of this method is undefined for both sparse and 
dense data if the 
+        * assumption of disjoint data is violated.
+        * 
+        * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
+        * @param numPartitions number of output partitions
+        * @param deepCopyCombiner indicator if the createCombiner functions 
needs to deep copy the input block
+        * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
+        */
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
+                       int numPartitions, boolean deepCopyCombiner )
        {
                //use combine by key to avoid unnecessary deep block copies, 
i.e.
                //create combiner block once and merge remaining blocks 
in-place.
-               return in.combineByKey( new CreateBlockCombinerFunction(), 
+               return in.combineByKey( 
+                               new 
CreateBlockCombinerFunction(deepCopyCombiner), 
                            new MergeBlocksFunction(false), 
-                           new MergeBlocksFunction(false) );
+                           new MergeBlocksFunction(false), numPartitions );
        }
        
        /**
@@ -251,13 +268,19 @@ public class RDDAggregateUtils
        private static class CreateBlockCombinerFunction implements 
Function<MatrixBlock, MatrixBlock> 
        {
                private static final long serialVersionUID = 
1987501624176848292L;
-
+               
+               private final boolean _deep;
+               
+               public CreateBlockCombinerFunction(boolean deep) {
+                       _deep = deep;
+               }
+               
                @Override
                public MatrixBlock call(MatrixBlock arg0) 
                        throws Exception 
                {
                        //create deep copy of given block
-                       return new MatrixBlock(arg0);
+                       return _deep ? new MatrixBlock(arg0) : arg0;
                }       
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index e847471..d1e6793 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -185,10 +185,11 @@ public class RDDConverterUtils
                                prepinput.mapPartitionsToPair(new 
CSVToBinaryBlockFunction(
                                                mc, sparse, hasHeader, delim, 
fill, fillValue));
                
-               //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out ); 
-               
-               return out;
+               //aggregate partial matrix blocks (w/ preferred number of 
output 
+               //partitions as the data is likely smaller in binary block 
format,
+               //but also to bound the size of partitions for compressed 
inputs)
+               int parts = SparkUtils.getNumPreferredPartitions(mc, out);
+               return RDDAggregateUtils.mergeByKey(out, parts, false); 
        }
        
        /**
@@ -256,10 +257,11 @@ public class RDDConverterUtils
                                prepinput.mapPartitionsToPair(
                                        new DataFrameToBinaryBlockFunction(mc, 
sparse, containsID, isVector));
                
-               //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out ); 
-               
-               return out;
+               //aggregate partial matrix blocks (w/ preferred number of 
output 
+               //partitions as the data is likely smaller in binary block 
format,
+               //but also to bound the size of partitions for compressed 
inputs)
+               int parts = SparkUtils.getNumPreferredPartitions(mc, out);
+               return RDDAggregateUtils.mergeByKey(out, parts, false); 
        }
 
        public static Dataset<Row> binaryBlockToDataFrame(SparkSession 
sparkSession,
@@ -312,7 +314,7 @@ public class RDDConverterUtils
                double datasize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
                double rowsize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(1, mc.getCols(),
                                mc.getNumRowBlocks(), mc.getColsPerBlock(), 
Math.ceil((double)mc.getNonZeros()/mc.getRows()));
-               double partsize = Math.ceil(datasize/in.partitions().size());
+               double partsize = Math.ceil(datasize/in.getNumPartitions());
                double blksz = Math.min(mc.getRows(), mc.getRowsPerBlock());
                return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index d27e37a..2fe3981 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -29,8 +29,10 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.storage.StorageLevel;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
@@ -62,12 +64,10 @@ public class SparkUtils
                return new Tuple2<MatrixIndexes,MatrixBlock>(in.getIndexes(), 
(MatrixBlock)in.getValue());
        }
 
-       public static ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> 
fromIndexedMatrixBlock( ArrayList<IndexedMatrixValue> in )
-       {
+       public static ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> 
fromIndexedMatrixBlock( ArrayList<IndexedMatrixValue> in ) {
                ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
                for( IndexedMatrixValue imv : in )
                        ret.add(fromIndexedMatrixBlock(imv));
-               
                return ret;
        }
 
@@ -75,12 +75,10 @@ public class SparkUtils
                return new Pair<MatrixIndexes,MatrixBlock>(in.getIndexes(), 
(MatrixBlock)in.getValue());
        }
 
-       public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> 
fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in )
-       {
+       public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> 
fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in ) {
                ArrayList<Pair<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
                for( IndexedMatrixValue imv : in )
                        ret.add(fromIndexedMatrixBlockToPair(imv));
-               
                return ret;
        }
 
@@ -88,12 +86,10 @@ public class SparkUtils
                return new Tuple2<Long, FrameBlock>(in.getKey(), in.getValue());
        }
 
-       public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( 
ArrayList<Pair<Long, FrameBlock>> in )
-       {
+       public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( 
ArrayList<Pair<Long, FrameBlock>> in ) {
                ArrayList<Tuple2<Long, FrameBlock>> ret = new 
ArrayList<Tuple2<Long, FrameBlock>>();
                for( Pair<Long, FrameBlock> ifv : in )
                        ret.add(fromIndexedFrameBlock(ifv));
-               
                return ret;
        }
 
@@ -120,6 +116,14 @@ public class SparkUtils
                        && in.rdd().partitioner().get() instanceof 
HashPartitioner;
        }
        
+       public static int getNumPreferredPartitions(MatrixCharacteristics mc, 
JavaPairRDD<?,?> in) {
+               if( !mc.dimsKnown(true) )
+                       return in.getNumPartitions();
+               double hdfsBlockSize = 
InfrastructureAnalyzer.getHDFSBlockSize();
+               double matrixPSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
+               return (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
+       }
+       
        /**
         * Creates a partitioning-preserving deep copy of the input matrix RDD, 
where 
         * the indexes and values are copied.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
index 65fbfa0..12d07ef 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.test.integration.functions.io.csv;
 
 import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.CompilerConfig;
 import org.apache.sysml.test.integration.AutomatedTestBase;
@@ -37,7 +38,6 @@ import org.apache.sysml.test.utils.TestUtils;
 
 public class ReadCSVTest extends AutomatedTestBase 
 {
-       
        private final static String TEST_NAME = "ReadCSVTest";
        private final static String TEST_DIR = "functions/io/csv/";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
ReadCSVTest.class.getSimpleName() + "/";
@@ -78,6 +78,11 @@ public class ReadCSVTest extends AutomatedTestBase
        }
        
        @Test
+       public void testCSV1_SP() {
+               runCSVTest(1, RUNTIME_PLATFORM.SPARK, true);
+       }
+       
+       @Test
        public void testCSV2_Sequential_CP1() {
                runCSVTest(2, RUNTIME_PLATFORM.SINGLE_NODE, false);
        }
@@ -101,6 +106,11 @@ public class ReadCSVTest extends AutomatedTestBase
        public void testCSV2_MR() {
                runCSVTest(2, RUNTIME_PLATFORM.HADOOP, true);
        }
+       
+       @Test
+       public void testCSV2_SP() {
+               runCSVTest(2, RUNTIME_PLATFORM.SPARK, true);
+       }
 
        @Test
        public void testCSV3_Sequential_CP1() {
@@ -127,6 +137,11 @@ public class ReadCSVTest extends AutomatedTestBase
                runCSVTest(3, RUNTIME_PLATFORM.HADOOP, false);
        }
        
+       @Test
+       public void testCSV3_SP() {
+               runCSVTest(3, RUNTIME_PLATFORM.SPARK, false);
+       }
+       
        /**
         * 
         * @param testNumber
@@ -135,13 +150,17 @@ public class ReadCSVTest extends AutomatedTestBase
         */
        private void runCSVTest(int testNumber, RUNTIME_PLATFORM platform, 
boolean parallel) 
        {
-               
                RUNTIME_PLATFORM oldPlatform = rtplatform;
+               rtplatform = platform;
+               
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
                boolean oldpar = CompilerConfig.FLAG_PARREADWRITE_TEXT;
                
                try
                {
-                       rtplatform = platform;
                        CompilerConfig.FLAG_PARREADWRITE_TEXT = parallel;
                        
                        TestConfiguration config = 
getTestConfiguration(TEST_NAME);
@@ -167,10 +186,10 @@ public class ReadCSVTest extends AutomatedTestBase
                        
                        TestUtils.compareScalars(dmlScalar, rScalar, eps);
                }
-               finally
-               {
+               finally {
                        rtplatform = oldPlatform;
                        CompilerConfig.FLAG_PARREADWRITE_TEXT = oldpar;         
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
                }
        }
        

Reply via email to