Repository: spark
Updated Branches:
  refs/heads/master b5ccf0d39 -> 90da7dc24


[SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple

## What changes were proposed in this pull request?

This PR fixes possible overflow in int add or multiply. In particular, their 
overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/)

The following assignments may cause overflow in right hand side. As a result, 
the result may be negative.
```
long = int * int
long = int + int
```

To avoid this problem, this PR performs cast from int to long in right hand 
side.

## How was this patch tested?

Existing UTs.

Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>

Closes #21481 from kiszk/SPARK-24452.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90da7dc2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90da7dc2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90da7dc2

Branch: refs/heads/master
Commit: 90da7dc241f8eec2348c0434312c97c116330bc4
Parents: b5ccf0d
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Authored: Fri Jun 15 13:47:48 2018 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Jun 15 13:47:48 2018 -0700

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       |   2 +-
 .../spark/deploy/worker/DriverRunner.scala      |   2 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |   2 +-
 .../org/apache/spark/storage/BlockManager.scala |   2 +-
 .../catalyst/expressions/UnsafeArrayData.java   |  14 +--
 .../VariableLengthRowBasedKeyValueBatch.java    |   2 +-
 .../vectorized/OffHeapColumnVector.java         | 106 +++++++++----------
 .../vectorized/OnHeapColumnVector.java          |  10 +-
 .../sources/RateStreamMicroBatchReader.scala    |   2 +-
 .../apache/spark/sql/hive/client/HiveShim.scala |   2 +-
 .../streaming/util/FileBasedWriteAheadLog.scala |   2 +-
 11 files changed, 73 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5f00455..9a767dd 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -703,7 +703,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       // must be stored in the same memory page.
       // (8 byte key length) (key) (value) (8 byte pointer to next value)
       int uaoSize = UnsafeAlignedOffset.getUaoSize();
-      final long recordLength = (2 * uaoSize) + klen + vlen + 8;
+      final long recordLength = (2L * uaoSize) + klen + vlen + 8;
       if (currentPage == null || currentPage.size() - pageCursor < 
recordLength) {
         if (!acquireNewPage(recordLength + uaoSize)) {
           return false;

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 58a1811..a6d13d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
       // check if attempting another run
       keepTrying = supervise && exitCode != 0 && !killed
       if (keepTrying) {
-        if (clock.getTimeMillis() - processStart > successfulRunDuration * 
1000) {
+        if (clock.getTimeMillis() - processStart > successfulRunDuration * 
1000L) {
           waitSeconds = 1
         }
         logInfo(s"Command exited with status $exitCode, re-launching after 
$waitSeconds s.")

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 13db498..ba9dae4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
             // the left side of max is >=1 whenever partsScanned >= 2
             numPartsToTry = Math.max(1,
               (1.5 * num * partsScanned / results.size).toInt - partsScanned)
-            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
+            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
           }
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e0276a4..df1a4be 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -291,7 +291,7 @@ private[spark] class BlockManager(
         case e: Exception if i < MAX_ATTEMPTS =>
           logError(s"Failed to connect to external shuffle server, will retry 
${MAX_ATTEMPTS - i}"
             + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
-          Thread.sleep(SLEEP_TIME_SECS * 1000)
+          Thread.sleep(SLEEP_TIME_SECS * 1000L)
         case NonFatal(e) =>
           throw new SparkException("Unable to register with external shuffle 
server due to : " +
             e.getMessage, e)

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index d5d934b..4dd2b73 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -83,7 +83,7 @@ public final class UnsafeArrayData extends ArrayData {
   private long elementOffset;
 
   private long getElementOffset(int ordinal, int elementSize) {
-    return elementOffset + ordinal * elementSize;
+    return elementOffset + ordinal * (long)elementSize;
   }
 
   public Object getBaseObject() { return baseObject; }
@@ -414,7 +414,7 @@ public final class UnsafeArrayData extends ArrayData {
   public short[] toShortArray() {
     short[] values = new short[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, 
numElements * 2);
+      baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, 
numElements * 2L);
     return values;
   }
 
@@ -422,7 +422,7 @@ public final class UnsafeArrayData extends ArrayData {
   public int[] toIntArray() {
     int[] values = new int[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, 
numElements * 4);
+      baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, 
numElements * 4L);
     return values;
   }
 
@@ -430,7 +430,7 @@ public final class UnsafeArrayData extends ArrayData {
   public long[] toLongArray() {
     long[] values = new long[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, 
numElements * 8);
+      baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, 
numElements * 8L);
     return values;
   }
 
@@ -438,7 +438,7 @@ public final class UnsafeArrayData extends ArrayData {
   public float[] toFloatArray() {
     float[] values = new float[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, 
numElements * 4);
+      baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, 
numElements * 4L);
     return values;
   }
 
@@ -446,14 +446,14 @@ public final class UnsafeArrayData extends ArrayData {
   public double[] toDoubleArray() {
     double[] values = new double[numElements];
     Platform.copyMemory(
-      baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, 
numElements * 8);
+      baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, 
numElements * 8L);
     return values;
   }
 
   private static UnsafeArrayData fromPrimitiveArray(
        Object arr, int offset, int length, int elementSize) {
     final long headerInBytes = calculateHeaderPortionInBytes(length);
-    final long valueRegionInBytes = elementSize * length;
+    final long valueRegionInBytes = (long)elementSize * length;
     final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
     if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
       throw new UnsupportedOperationException("Cannot convert this array to 
unsafe format as " +

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
index 905e682..c823de4 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
@@ -41,7 +41,7 @@ public final class VariableLengthRowBasedKeyValueBatch 
extends RowBasedKeyValueB
   @Override
   public UnsafeRow appendRow(Object kbase, long koff, int klen,
                              Object vbase, long voff, int vlen) {
-    final long recordLength = 8 + klen + vlen + 8;
+    final long recordLength = 8L + klen + vlen + 8;
     // if run out of max supported rows or page size, return null
     if (numRows >= capacity || page == null || page.size() - pageCursor < 
recordLength) {
       return null;

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 4733f36..6fdadde 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -216,12 +216,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putShort(int rowId, short value) {
-    Platform.putShort(null, data + 2 * rowId, value);
+    Platform.putShort(null, data + 2L * rowId, value);
   }
 
   @Override
   public void putShorts(int rowId, int count, short value) {
-    long offset = data + 2 * rowId;
+    long offset = data + 2L * rowId;
     for (int i = 0; i < count; ++i, offset += 2) {
       Platform.putShort(null, offset, value);
     }
@@ -229,20 +229,20 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putShorts(int rowId, int count, short[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2,
-        null, data + 2 * rowId, count * 2);
+    Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2L,
+        null, data + 2L * rowId, count * 2L);
   }
 
   @Override
   public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-      null, data + rowId * 2, count * 2);
+      null, data + rowId * 2L, count * 2L);
   }
 
   @Override
   public short getShort(int rowId) {
     if (dictionary == null) {
-      return Platform.getShort(null, data + 2 * rowId);
+      return Platform.getShort(null, data + 2L * rowId);
     } else {
       return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
@@ -252,7 +252,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   public short[] getShorts(int rowId, int count) {
     assert(dictionary == null);
     short[] array = new short[count];
-    Platform.copyMemory(null, data + rowId * 2, array, 
Platform.SHORT_ARRAY_OFFSET, count * 2);
+    Platform.copyMemory(null, data + rowId * 2L, array, 
Platform.SHORT_ARRAY_OFFSET, count * 2L);
     return array;
   }
 
@@ -262,12 +262,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putInt(int rowId, int value) {
-    Platform.putInt(null, data + 4 * rowId, value);
+    Platform.putInt(null, data + 4L * rowId, value);
   }
 
   @Override
   public void putInts(int rowId, int count, int value) {
-    long offset = data + 4 * rowId;
+    long offset = data + 4L * rowId;
     for (int i = 0; i < count; ++i, offset += 4) {
       Platform.putInt(null, offset, value);
     }
@@ -275,24 +275,24 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putInts(int rowId, int count, int[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4,
-        null, data + 4 * rowId, count * 4);
+    Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4L,
+        null, data + 4L * rowId, count * 4L);
   }
 
   @Override
   public void putInts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-      null, data + rowId * 4, count * 4);
+      null, data + rowId * 4L, count * 4L);
   }
 
   @Override
   public void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
-          null, data + 4 * rowId, count * 4);
+          null, data + 4L * rowId, count * 4L);
     } else {
       int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
-      long offset = data + 4 * rowId;
+      long offset = data + 4L * rowId;
       for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) {
         Platform.putInt(null, offset,
             java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset)));
@@ -303,7 +303,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   @Override
   public int getInt(int rowId) {
     if (dictionary == null) {
-      return Platform.getInt(null, data + 4 * rowId);
+      return Platform.getInt(null, data + 4L * rowId);
     } else {
       return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
@@ -313,7 +313,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   public int[] getInts(int rowId, int count) {
     assert(dictionary == null);
     int[] array = new int[count];
-    Platform.copyMemory(null, data + rowId * 4, array, 
Platform.INT_ARRAY_OFFSET, count * 4);
+    Platform.copyMemory(null, data + rowId * 4L, array, 
Platform.INT_ARRAY_OFFSET, count * 4L);
     return array;
   }
 
@@ -325,7 +325,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   public int getDictId(int rowId) {
     assert(dictionary == null)
             : "A ColumnVector dictionary should not have a dictionary for 
itself.";
-    return Platform.getInt(null, data + 4 * rowId);
+    return Platform.getInt(null, data + 4L * rowId);
   }
 
   //
@@ -334,12 +334,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putLong(int rowId, long value) {
-    Platform.putLong(null, data + 8 * rowId, value);
+    Platform.putLong(null, data + 8L * rowId, value);
   }
 
   @Override
   public void putLongs(int rowId, int count, long value) {
-    long offset = data + 8 * rowId;
+    long offset = data + 8L * rowId;
     for (int i = 0; i < count; ++i, offset += 8) {
       Platform.putLong(null, offset, value);
     }
@@ -347,24 +347,24 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putLongs(int rowId, int count, long[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8,
-        null, data + 8 * rowId, count * 8);
+    Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8L,
+        null, data + 8L * rowId, count * 8L);
   }
 
   @Override
   public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-      null, data + rowId * 8, count * 8);
+      null, data + rowId * 8L, count * 8L);
   }
 
   @Override
   public void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
-          null, data + 8 * rowId, count * 8);
+          null, data + 8L * rowId, count * 8L);
     } else {
       int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
-      long offset = data + 8 * rowId;
+      long offset = data + 8L * rowId;
       for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) {
         Platform.putLong(null, offset,
             java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset)));
@@ -375,7 +375,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   @Override
   public long getLong(int rowId) {
     if (dictionary == null) {
-      return Platform.getLong(null, data + 8 * rowId);
+      return Platform.getLong(null, data + 8L * rowId);
     } else {
       return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
     }
@@ -385,7 +385,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   public long[] getLongs(int rowId, int count) {
     assert(dictionary == null);
     long[] array = new long[count];
-    Platform.copyMemory(null, data + rowId * 8, array, 
Platform.LONG_ARRAY_OFFSET, count * 8);
+    Platform.copyMemory(null, data + rowId * 8L, array, 
Platform.LONG_ARRAY_OFFSET, count * 8L);
     return array;
   }
 
@@ -395,12 +395,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putFloat(int rowId, float value) {
-    Platform.putFloat(null, data + rowId * 4, value);
+    Platform.putFloat(null, data + rowId * 4L, value);
   }
 
   @Override
   public void putFloats(int rowId, int count, float value) {
-    long offset = data + 4 * rowId;
+    long offset = data + 4L * rowId;
     for (int i = 0; i < count; ++i, offset += 4) {
       Platform.putFloat(null, offset, value);
     }
@@ -408,18 +408,18 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putFloats(int rowId, int count, float[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4,
-        null, data + 4 * rowId, count * 4);
+    Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4L,
+        null, data + 4L * rowId, count * 4L);
   }
 
   @Override
   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-          null, data + rowId * 4, count * 4);
+          null, data + rowId * 4L, count * 4L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
-      long offset = data + 4 * rowId;
+      long offset = data + 4L * rowId;
       for (int i = 0; i < count; ++i, offset += 4) {
         Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
       }
@@ -429,7 +429,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   @Override
   public float getFloat(int rowId) {
     if (dictionary == null) {
-      return Platform.getFloat(null, data + rowId * 4);
+      return Platform.getFloat(null, data + rowId * 4L);
     } else {
       return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
     }
@@ -439,7 +439,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   public float[] getFloats(int rowId, int count) {
     assert(dictionary == null);
     float[] array = new float[count];
-    Platform.copyMemory(null, data + rowId * 4, array, 
Platform.FLOAT_ARRAY_OFFSET, count * 4);
+    Platform.copyMemory(null, data + rowId * 4L, array, 
Platform.FLOAT_ARRAY_OFFSET, count * 4L);
     return array;
   }
 
@@ -450,12 +450,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putDouble(int rowId, double value) {
-    Platform.putDouble(null, data + rowId * 8, value);
+    Platform.putDouble(null, data + rowId * 8L, value);
   }
 
   @Override
   public void putDoubles(int rowId, int count, double value) {
-    long offset = data + 8 * rowId;
+    long offset = data + 8L * rowId;
     for (int i = 0; i < count; ++i, offset += 8) {
       Platform.putDouble(null, offset, value);
     }
@@ -463,18 +463,18 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8,
-      null, data + 8 * rowId, count * 8);
+    Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8L,
+      null, data + 8L * rowId, count * 8L);
   }
 
   @Override
   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-        null, data + rowId * 8, count * 8);
+        null, data + rowId * 8L, count * 8L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
-      long offset = data + 8 * rowId;
+      long offset = data + 8L * rowId;
       for (int i = 0; i < count; ++i, offset += 8) {
         Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
       }
@@ -484,7 +484,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   @Override
   public double getDouble(int rowId) {
     if (dictionary == null) {
-      return Platform.getDouble(null, data + rowId * 8);
+      return Platform.getDouble(null, data + rowId * 8L);
     } else {
       return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
     }
@@ -494,7 +494,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   public double[] getDoubles(int rowId, int count) {
     assert(dictionary == null);
     double[] array = new double[count];
-    Platform.copyMemory(null, data + rowId * 8, array, 
Platform.DOUBLE_ARRAY_OFFSET, count * 8);
+    Platform.copyMemory(null, data + rowId * 8L, array, 
Platform.DOUBLE_ARRAY_OFFSET, count * 8L);
     return array;
   }
 
@@ -504,26 +504,26 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   @Override
   public void putArray(int rowId, int offset, int length) {
     assert(offset >= 0 && offset + length <= childColumns[0].capacity);
-    Platform.putInt(null, lengthData + 4 * rowId, length);
-    Platform.putInt(null, offsetData + 4 * rowId, offset);
+    Platform.putInt(null, lengthData + 4L * rowId, length);
+    Platform.putInt(null, offsetData + 4L * rowId, offset);
   }
 
   @Override
   public int getArrayLength(int rowId) {
-    return Platform.getInt(null, lengthData + 4 * rowId);
+    return Platform.getInt(null, lengthData + 4L * rowId);
   }
 
   @Override
   public int getArrayOffset(int rowId) {
-    return Platform.getInt(null, offsetData + 4 * rowId);
+    return Platform.getInt(null, offsetData + 4L * rowId);
   }
 
   // APIs dealing with ByteArrays
   @Override
   public int putByteArray(int rowId, byte[] value, int offset, int length) {
     int result = arrayData().appendBytes(length, value, offset);
-    Platform.putInt(null, lengthData + 4 * rowId, length);
-    Platform.putInt(null, offsetData + 4 * rowId, result);
+    Platform.putInt(null, lengthData + 4L * rowId, length);
+    Platform.putInt(null, offsetData + 4L * rowId, result);
     return result;
   }
 
@@ -533,19 +533,19 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
     int oldCapacity = (nulls == 0L) ? 0 : capacity;
     if (isArray() || type instanceof MapType) {
       this.lengthData =
-          Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);
+          Platform.reallocateMemory(lengthData, oldCapacity * 4L, newCapacity 
* 4L);
       this.offsetData =
-          Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 
4);
+          Platform.reallocateMemory(offsetData, oldCapacity * 4L, newCapacity 
* 4L);
     } else if (type instanceof ByteType || type instanceof BooleanType) {
       this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
     } else if (type instanceof ShortType) {
-      this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity 
* 2);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 2L, 
newCapacity * 2L);
     } else if (type instanceof IntegerType || type instanceof FloatType ||
         type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
-      this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity 
* 4);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 4L, 
newCapacity * 4L);
     } else if (type instanceof LongType || type instanceof DoubleType ||
         DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) 
{
-      this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity 
* 8);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 8L, 
newCapacity * 8L);
     } else if (childColumns != null) {
       // Nothing to store.
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 23dcc10..577eab6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -231,7 +231,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   @Override
   public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, shortData,
-      Platform.SHORT_ARRAY_OFFSET + rowId * 2, count * 2);
+      Platform.SHORT_ARRAY_OFFSET + rowId * 2L, count * 2L);
   }
 
   @Override
@@ -276,7 +276,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   @Override
   public void putInts(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, intData,
-      Platform.INT_ARRAY_OFFSET + rowId * 4, count * 4);
+      Platform.INT_ARRAY_OFFSET + rowId * 4L, count * 4L);
   }
 
   @Override
@@ -342,7 +342,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   @Override
   public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, longData,
-      Platform.LONG_ARRAY_OFFSET + rowId * 8, count * 8);
+      Platform.LONG_ARRAY_OFFSET + rowId * 8L, count * 8L);
   }
 
   @Override
@@ -394,7 +394,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, 
floatData,
-          Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
+          Platform.DOUBLE_ARRAY_OFFSET + rowId * 4L, count * 4L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
       for (int i = 0; i < count; ++i) {
@@ -443,7 +443,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
     if (!bigEndianPlatform) {
       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, 
doubleData,
-          Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+          Platform.DOUBLE_ARRAY_OFFSET + rowId * 8L, count * 8L);
     } else {
       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
       for (int i = 0; i < count; ++i) {

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
index fbff8db..b393c48 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
@@ -202,7 +202,7 @@ class RateStreamMicroBatchInputPartitionReader(
     rangeEnd: Long,
     localStartTimeMs: Long,
     relativeMsPerValue: Double) extends InputPartitionReader[Row] {
-  private var count = 0
+  private var count: Long = 0
 
   override def next(): Boolean = {
     rangeStart + partitionId + numPartitions * count < rangeEnd

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 130e258..8620f3f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -342,7 +342,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
   }
 
   override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long 
= {
-    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 
1000
+    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 
1000L
   }
 
   override def loadPartition(

http://git-wip-us.apache.org/repos/asf/spark/blob/90da7dc2/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index ab7c855..2e85990 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -222,7 +222,7 @@ private[streaming] class FileBasedWriteAheadLog(
         pastLogs += LogInfo(currentLogWriterStartTime, 
currentLogWriterStopTime, _)
       }
       currentLogWriterStartTime = currentTime
-      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
+      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000L)
       val newLogPath = new Path(logDirectory,
         timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
       currentLogPath = Some(newLogPath.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to