Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b6420db9e -> 201d5e8db


[SPARK-16129][CORE][SQL] Eliminate direct use of commons-lang classes in favor 
of commons-lang3

## What changes were proposed in this pull request?

Replace use of `commons-lang` in favor of `commons-lang3` and forbid the former 
via scalastyle; remove `NotImplementedException` from `comons-lang` in favor of 
JDK `UnsupportedOperationException`

## How was this patch tested?

Jenkins tests

Author: Sean Owen <so...@cloudera.com>

Closes #13843 from srowen/SPARK-16129.

(cherry picked from commit 158af162eac7348464c6751c8acd48fc6c117688)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/201d5e8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/201d5e8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/201d5e8d

Branch: refs/heads/branch-2.0
Commit: 201d5e8db3fd29898a6cd69e015ca491e5721b08
Parents: b6420db
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Jun 24 10:35:54 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jun 24 10:36:04 2016 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  5 ++--
 scalastyle-config.xml                           |  6 +++++
 .../sql/catalyst/expressions/TimeWindow.scala   |  2 +-
 .../spark/sql/catalyst/trees/TreeNode.scala     |  2 +-
 .../parquet/VectorizedColumnReader.java         | 25 ++++++++++----------
 .../sql/execution/vectorized/ColumnVector.java  | 17 +++++++------
 .../execution/vectorized/ColumnVectorUtils.java |  6 ++---
 .../sql/execution/vectorized/ColumnarBatch.java | 12 ++++------
 .../spark/sql/execution/ExistingRDD.scala       |  2 +-
 .../execution/columnar/InMemoryRelation.scala   |  2 +-
 .../service/cli/session/HiveSessionImpl.java    |  2 +-
 .../spark/streaming/StreamingContext.scala      |  5 ++--
 .../streaming/scheduler/JobScheduler.scala      |  6 ++---
 13 files changed, 44 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d870181..fe15052 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -24,7 +24,6 @@ import java.util.{Arrays, Locale, Properties, ServiceLoader, 
UUID}
 import java.util.concurrent.ConcurrentMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
 
-import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.generic.Growable
@@ -34,7 +33,7 @@ import scala.reflect.{classTag, ClassTag}
 import scala.util.control.NonFatal
 
 import com.google.common.collect.MapMaker
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, 
DoubleWritable,
@@ -334,7 +333,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     override protected def childValue(parent: Properties): Properties = {
       // Note: make a clone such that changes in the parent properties aren't 
reflected in
       // the those of the children threads, which has confusing semantics 
(SPARK-10563).
-      SerializationUtils.clone(parent).asInstanceOf[Properties]
+      SerializationUtils.clone(parent)
     }
     override protected def initialValue(): Properties = new Properties()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 270104f..9a35183 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -210,6 +210,12 @@ This file is divided into 3 sections:
     scala.collection.JavaConverters._ and use .asScala / .asJava 
methods</customMessage>
   </check>
 
+  <check customId="commonslang2" level="error" 
class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+    <parameters><parameter 
name="regex">org\.apache\.commons\.lang\.</parameter></parameters>
+    <customMessage>Use Commons Lang 3 classes (package 
org.apache.commons.lang3.*) instead
+    of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
+  </check>
+
   <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" 
enabled="true">
     <parameters>
       <parameter name="groups">java,scala,3rdParty,spark</parameter>

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index 83fa447..66c4bf2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 3cc7a1a..072445a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -23,7 +23,7 @@ import scala.collection.Map
 import scala.collection.mutable.Stack
 import scala.reflect.ClassTag
 
-import org.apache.commons.lang.ClassUtils
+import org.apache.commons.lang3.ClassUtils
 import org.json4s.JsonAST._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 662a03d..a18b881 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -228,7 +227,7 @@ public class VectorizedColumnReader {
             column.putShort(i, (short) 
dictionary.decodeToInt(dictionaryIds.getInt(i)));
           }
         } else {
-          throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+          throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
         }
         break;
 
@@ -239,7 +238,7 @@ public class VectorizedColumnReader {
             column.putLong(i, 
dictionary.decodeToLong(dictionaryIds.getInt(i)));
           }
         } else {
-          throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+          throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
         }
         break;
 
@@ -262,7 +261,7 @@ public class VectorizedColumnReader {
             column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
           }
         } else {
-          throw new NotImplementedException();
+          throw new UnsupportedOperationException();
         }
         break;
       case BINARY:
@@ -293,12 +292,12 @@ public class VectorizedColumnReader {
             column.putByteArray(i, v.getBytes());
           }
         } else {
-          throw new NotImplementedException();
+          throw new UnsupportedOperationException();
         }
         break;
 
       default:
-        throw new NotImplementedException("Unsupported type: " + 
descriptor.getType());
+        throw new UnsupportedOperationException("Unsupported type: " + 
descriptor.getType());
     }
   }
 
@@ -327,7 +326,7 @@ public class VectorizedColumnReader {
       defColumn.readShorts(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
     } else {
-      throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
     }
   }
 
@@ -360,7 +359,7 @@ public class VectorizedColumnReader {
       defColumn.readDoubles(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
     } else {
-      throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
     }
   }
 
@@ -381,7 +380,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
     }
   }
 
@@ -417,7 +416,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
     }
   }
 
@@ -459,13 +458,13 @@ public class VectorizedColumnReader {
       @SuppressWarnings("deprecation")
       Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning 
suppression
       if (dataEncoding != plainDict && dataEncoding != 
Encoding.RLE_DICTIONARY) {
-        throw new NotImplementedException("Unsupported encoding: " + 
dataEncoding);
+        throw new UnsupportedOperationException("Unsupported encoding: " + 
dataEncoding);
       }
       this.dataColumn = new VectorizedRleValuesReader();
       this.useDictionary = true;
     } else {
       if (dataEncoding != Encoding.PLAIN) {
-        throw new NotImplementedException("Unsupported encoding: " + 
dataEncoding);
+        throw new UnsupportedOperationException("Unsupported encoding: " + 
dataEncoding);
       }
       this.dataColumn = new VectorizedPlainValuesReader();
       this.useDictionary = false;
@@ -485,7 +484,7 @@ public class VectorizedColumnReader {
 
     // Initialize the decoders.
     if (page.getDlEncoding() != Encoding.RLE && 
descriptor.getMaxDefinitionLevel() != 0) {
-      throw new NotImplementedException("Unsupported encoding: " + 
page.getDlEncoding());
+      throw new UnsupportedOperationException("Unsupported encoding: " + 
page.getDlEncoding());
     }
     int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
     this.defColumn = new VectorizedRleValuesReader(bitWidth);

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 80c84b1..bbbb796 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.io.api.Binary;
 
@@ -100,7 +99,7 @@ public abstract class ColumnVector implements AutoCloseable {
 
     @Override
     public ArrayData copy() {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     // TODO: this is extremely expensive.
@@ -171,7 +170,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
           }
         }
       } else {
-        throw new NotImplementedException("Type " + dt);
+        throw new UnsupportedOperationException("Type " + dt);
       }
       return list;
     }
@@ -181,7 +180,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
 
     @Override
     public boolean getBoolean(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -189,7 +188,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
 
     @Override
     public short getShort(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -200,7 +199,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
 
     @Override
     public float getFloat(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -240,12 +239,12 @@ public abstract class ColumnVector implements 
AutoCloseable {
 
     @Override
     public MapData getMap(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
     public Object get(int ordinal, DataType dataType) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
   }
 
@@ -562,7 +561,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
    * Returns the value for rowId.
    */
   public MapData getMap(int ordinal) {
-    throw new NotImplementedException();
+    throw new UnsupportedOperationException();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index f50c35f..2fa476b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -23,8 +23,6 @@ import java.sql.Date;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.commons.lang.NotImplementedException;
-
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -112,7 +110,7 @@ public class ColumnVectorUtils {
       }
       return result;
     } else {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
   }
 
@@ -161,7 +159,7 @@ public class ColumnVectorUtils {
       } else if (t instanceof DateType) {
         dst.appendInt(DateTimeUtils.fromJavaDate((Date)o));
       } else {
-        throw new NotImplementedException("Type " + t);
+        throw new UnsupportedOperationException("Type " + t);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 8cece73..f3afa8f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.vectorized;
 import java.math.BigDecimal;
 import java.util.*;
 
-import org.apache.commons.lang.NotImplementedException;
-
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
@@ -166,7 +164,7 @@ public final class ColumnarBatch {
 
     @Override
     public boolean anyNull() {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -227,12 +225,12 @@ public final class ColumnarBatch {
 
     @Override
     public MapData getMap(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
     public Object get(int ordinal, DataType dataType) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -258,7 +256,7 @@ public final class ColumnarBatch {
           setDecimal(ordinal, Decimal.apply((BigDecimal) value, t.precision(), 
t.scale()),
               t.precision());
         } else {
-          throw new NotImplementedException("Datatype not supported " + dt);
+          throw new UnsupportedOperationException("Datatype not supported " + 
dt);
         }
       }
     }
@@ -430,7 +428,7 @@ public final class ColumnarBatch {
    */
   public void setColumn(int ordinal, ColumnVector column) {
     if (column instanceof OffHeapColumnVector) {
-      throw new NotImplementedException("Need to ref count columns.");
+      throw new UnsupportedOperationException("Need to ref count columns.");
     }
     columns[ordinal] = column;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index e2c23a4..09203e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 02866c7..079e122 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import scala.collection.JavaConverters._
 
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index cc3e807..47bfaa8 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.cli.HiveFileProcessor;

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b524af9..6046426 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.Queue
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -579,8 +579,7 @@ class StreamingContext private[streaming] (
               sparkContext.setCallSite(startSite.get)
               sparkContext.clearJobGroup()
               
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"false")
-              savedProperties.set(SerializationUtils.clone(
-                sparkContext.localProperties.get()).asInstanceOf[Properties])
+              
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
               scheduler.start()
             }
             state = StreamingContextState.ACTIVE

http://git-wip-us.apache.org/repos/asf/spark/blob/201d5e8d/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index ac18f73..79d6254 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.Properties
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.util.Failure
 
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{PairRDDFunctions, RDD}
@@ -219,8 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
     def run() {
       val oldProps = ssc.sparkContext.getLocalProperties
       try {
-        ssc.sparkContext.setLocalProperties(
-          
SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties])
+        
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
         val formattedTime = UIUtils.formatBatchTime(
           job.time.milliseconds, ssc.graph.batchDuration.milliseconds, 
showYYYYMMSS = false)
         val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to